diff options
author | Anthony Wang | 2023-05-11 22:17:20 -0400 |
---|---|---|
committer | Anthony Wang | 2023-05-11 22:17:20 -0400 |
commit | cdd295e371d78f02a2051044e8e4417f79b378dd (patch) | |
tree | db4f086ea41d3c23d6da9c0ee52e910c86a55c1a | |
parent | d87d4d6492bb3e325b4ea1b3d60677657465b650 (diff) |
Almost done with DHT
-rw-r--r-- | README.md | 6 | ||||
-rw-r--r-- | client/main.go | 48 | ||||
-rw-r--r-- | server/dht.go | 111 | ||||
-rw-r--r-- | server/main.go | 55 | ||||
-rw-r--r-- | server/user.go | 58 |
5 files changed, 222 insertions, 56 deletions
@@ -16,13 +16,13 @@ Alright, let's solve all those problems above with Kela! Kela consists of three ### Name resolution service -In Kela, each user has an ID, which is an Ed5519 public key encoded in URL-safe Base64. Each user is associated with one or more Kela servers, which store that user's data. To find out which servers a user is associated with, you can query the name resolution system. All Kela servers participate in the name resolution system and act as DHT nodes. Each server stores a complete list of all DHT nodes. When a new server joins the DHT, it tries to peer with an existing server in the DHT. Say server `example.com` would like to peer with `test.net`. `example.com` first sends a GET request to `test.net/peer?peer=example.com`. `test.net` replies with its list of DHT nodes. Once `example.com` receives this reply, it adds `test.net` to its list of DHT nodes and attempts to peer with all servers in the reply that it hasn't peered with yet. `test.net` now also tries to peer with the server that just contacted it, in this case `example.com`. Servers periodically go through their list of DHT nodes and remove nodes that are no longer online. +In Kela, each user has an ID, which is an Ed5519 public key encoded in URL-safe Base64. Each user is associated with one or more Kela servers, which store that user's data. To find out which servers a user is associated with, you can query the name resolution system, which acts as a configuration service for the storage and message services. All Kela servers participate in the name resolution system and act as DHT nodes. Each server stores a complete list of all DHT nodes. When a new server joins the DHT, it tries to peer with an existing server in the DHT. Say server `example.com` would like to peer with `test.net`. `example.com` first sends a GET request to `test.net/peer?peer=example.com`. `test.net` replies with its list of DHT nodes. Once `example.com` receives this reply, it adds `test.net` to its list of DHT nodes and attempts to peer with all servers in the reply that it hasn't peered with yet. `test.net` now also tries to peer with the server that just contacted it, in this case `example.com`. Servers periodically go through their list of DHT nodes and remove nodes that are no longer online. -The DHT stores key-value pairs. The key consists of a user's public key and timestamp (the current Unix time in seconds divided by 600, rounded down). The value consists of a timestamp (the current Unix time in seconds), a list of servers that the user is associated with, where the first server is their primary server, and a signature. A key-value pair is assigned to the 5 servers with smallest SHA-256 hashes of their domain name greater than the SHA-256 hash of the key. The purpose of the elaborate timestamp in the key is to ensure that the set of servers assigned to a key-value pair rotates every 600 seconds so an attacker must control a very large portion of the DHT to do a denial-of-service attack against a specific key-value pair. When servers join and leave the DHT, the servers that a user is associated with will ensure that that user's key-value pair is assigned to a new server if necessary to ensure that 5 servers store that key-value pair. The DHT supports two operations, get and put. For put operations, the server checks the signature to ensure the validity of the request. When a server receives either of these two operations, it computes the SHA-256 hash of the key and checks if it is supposed to store that key-value pair or not. If it is supposed to store that key-value pair, it performs the operation on that pair. Otherwise, the server will contact in parallel the 5 servers that store this key-value pair. If the operation is a get, the server will look at the 5 replies and return the value with the most recent timestamp. If the operation is a put, and one of the 5 parallel requests fails, the server will remove that offline server from its DHT node list and assign a new server to this key-value pair to replace the offline one. Each server periodically goes through its stored key-value pairs and deletes old ones. +The DHT stores key-value pairs. The key consists of a user's public key and timestamp (the current Unix time in seconds divided by 600, rounded down). The value consists of a timestamp (the current Unix time in seconds), a list of servers that the user is associated with, where the first server is their primary server, and a signature. A key-value pair is assigned to the 5 servers with smallest SHA-256 hashes of their domain name greater than the SHA-256 hash of the key. The purpose of the elaborate timestamp in the key is to ensure that the set of servers assigned to a key-value pair rotates every 600 seconds so an attacker must control a very large portion of the DHT to do a denial-of-service attack against a specific key-value pair. When servers join and leave the DHT, the servers that a user is associated with will ensure that that user's key-value pair is assigned to a new server if necessary to ensure that 5 servers store that key-value pair. The DHT supports two operations, get and post. For post operations, the server checks the signature to ensure the validity of the request. When a server receives either of these two operations, it computes the SHA-256 hash of the key and checks if it is supposed to store that key-value pair or not. If it is supposed to store that key-value pair, it performs the operation on that pair. Otherwise, the server will contact in parallel the 5 servers that store this key-value pair. If the operation is a get, the server will look at the 5 replies and return the value with the most recent timestamp. If the operation is a post, and one of the 5 parallel requests fails, the server will remove that offline server from its DHT node list and assign a new server to this key-value pair to replace the offline one. Each server periodically goes through its stored key-value pairs and deletes old ones. ### Storage service -The storage service uses a weaker form of primary-backup replication. The storage service supports the three operations get, put and delete, and a user's primary server always handles operations. Get operations are trivial. For a put or delete operation, the primary makes the modification and notifies all the backups about the operation, but responds to the user immediately without ensuring that the backups have performed the operation. All operations are stored in a log, which only stores the operation type and filename of the modified file, but not the contents of the operation. The log and files are persisted to disk. If a backup is offline, the primary maintains a log of all pending operations to be sent to the backup and will keep retrying. If the primary is offline, no progress can be made, but the user can designate any of the backups as the new primary, which also requires a put operation to the DHT to update that user's list of servers. When a backup becomes a primary, it must ensure that any other backups that are ahead of this one rollback their operations to match this backup. To rollback a put or delete operation, a backup can contact the new primary to get the file. +The storage service uses a weaker form of primary-backup replication. The storage service supports the three operations get, post and delete, and a user's primary server always handles operations. Get operations are trivial. For a post or delete operation, the primary makes the modification and notifies all the backups about the operation, but responds to the user immediately without ensuring that the backups have performed the operation. All operations are stored in a log, which only stores the operation type and filename of the modified file, but not the contents of the operation. The log and files are persisted to disk. If a backup is offline, the primary maintains a log of all pending operations to be sent to the backup and will keep retrying. If the primary is offline, no progress can be made, but the user can designate any of the backups as the new primary, which also requires a post operation to the DHT to update that user's list of servers. When a backup becomes a primary, it must ensure that any other backups that are ahead of this one rollback their operations to match this backup. To rollback a post or delete operation, a backup can contact the new primary to get the file. ### Message service diff --git a/client/main.go b/client/main.go index d996531..22e5230 100644 --- a/client/main.go +++ b/client/main.go @@ -2,12 +2,58 @@ package main import ( "crypto/ed25519" + "encoding/base64" "flag" + "fmt" + "os" + "strings" ) func main() { flag.Parse() - if flag.Arg(0) == "register" { + + if flag.Arg(0) == "setup" { + // Create keys + pubKey, privKey, err := ed25519.GenerateKey(nil) + if err != nil { + panic(err) + } + err = os.WriteFile("pubkey", pubKey, 644) + if err != nil { + panic(err) + } + err = os.WriteFile("privkey", privKey, 600) + if err != nil { + panic(err) + } + // Create servers file + _, err = os.Create("servers") + if err != nil { + panic(err) + } + fmt.Printf("Success! Your user ID: %s", base64.RawURLEncoding.EncodeToString(pubKey)) + return + } + + pubKeyBytes, err := os.ReadFile("pubkey") + if err != nil { + panic(err) + } + pubKey := ed25519.PublicKey(pubKeyBytes) + privKeyBytes, err := os.ReadFile("privkey") + if err != nil { + panic(err) + } + privKey := ed25519.PublicKey(privKeyBytes) + serversBytes, err := os.ReadFile("servers") + if err != nil { + panic(err) + } + servers := strings.Split(string(serversBytes), "\n") + + + if flag.Arg(0) == "associate" { + } } diff --git a/server/dht.go b/server/dht.go index 8292280..c3e984b 100644 --- a/server/dht.go +++ b/server/dht.go @@ -2,12 +2,14 @@ package main import ( "bytes" + "crypto/ed25519" "crypto/sha256" "encoding/base64" "errors" "fmt" "io" "log" + "math/rand" "net/http" "sort" "strconv" @@ -16,6 +18,12 @@ import ( "time" ) +var myHash string +var myPos int +var hashToDomain map[string]string +var peerHashes []string +var kvstore map[string][]byte + // Get the sha256sum of string as a URL-safe unpadded base64 string func sha256sum(s string) string { b := sha256.Sum256([]byte(s)) @@ -54,7 +62,10 @@ func addPeer(peer string) error { copy(peerHashes[i+1:], peerHashes[i:]) peerHashes[i] = peerHash myPos = sort.SearchStrings(peerHashes, me) + + // TODO: redistribute keys mu.Unlock() + // Read response body body, err := io.ReadAll(resp.Body) if err != nil { @@ -91,13 +102,24 @@ func keyPos(key string) int { return keyPos } +// Get the timestamp of this val +func timestamp(val []byte) int { + if len(val) < ed25519.SignatureSize { + return 0 + } + message := string(val[:len(val)-ed25519.SignatureSize]) + timestamp, err := strconv.Atoi(strings.Split(message, "\n")[0]) + if err != nil { + return 0 + } + return timestamp +} // Get the value for a key from the DHT func dhtGet(key string) ([]byte, error) { keyPos := keyPos(key) var wg sync.WaitGroup - var ret []byte - bestTime := 0 + var latest []byte for i := 0; i < 5 && i < len(peerHashes); i++ { wg.Add(1) j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]] @@ -105,37 +127,30 @@ func dhtGet(key string) ([]byte, error) { defer wg.Done() resp, err := http.Get(j + "/dht/" + key + "?direct") if err != nil { - // TODO: Remove this server from DHT? - // For sanity reasons this might be a bad idea - return - } - b, err := io.ReadAll(resp.Body) - if err != nil { return } - err = verify(key, b) + val, err := io.ReadAll(resp.Body) if err != nil { return } - valTime, err := strconv.Atoi(strings.Split(string(b), "\n")[0]) + err = verify(key, val) if err != nil { return } - if ret == nil || valTime > bestTime { - ret = b - bestTime = valTime + if latest == nil || timestamp(val) > timestamp(latest) { + latest = val } }() } wg.Wait() - if ret == nil { - return nil, errors.New("id not in kvstore") + if latest == nil { + return nil, errors.New("key not found in kvstore") } - return ret, nil + return latest, nil } -// Put a key-value pair into the DHT -func dhtPut(key string, val []byte) error { +// Post a key-value pair into the DHT +func dhtPost(key string, val []byte) error { err := verify(key, val) if err != nil { return err @@ -144,7 +159,7 @@ func dhtPut(key string, val []byte) error { for i := 0; i < 5 && i < len(peerHashes); i++ { j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]] go func() { - http.Post(j + "/dht/" + key + "?direct", "application/octet-stream", bytes.NewBuffer(val)) + http.Post(j+"/dht/"+key+"?direct", "application/octet-stream", bytes.NewBuffer(val)) }() } return nil @@ -156,13 +171,14 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) { r.ParseForm() if r.Form.Get("direct") != "" { // Directly modify kvstore - if keyPos(key) - myPos >= 5 { + if keyPos(key)-myPos >= 5 { w.WriteHeader(http.StatusNotFound) return } + phase := time.Now().Unix()/600 if r.Method == "GET" { mu.Lock() - val, ok := kvstore[key + fmt.Sprint(time.Now().Unix() / 600)] + val, ok := kvstore[key+"\n"+fmt.Sprint(phase)] mu.Unlock() if !ok || verify(key, val) != nil { w.WriteHeader(http.StatusNotFound) @@ -176,11 +192,14 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) { return } mu.Lock() - kvstore[key + fmt.Sprint(time.Now().Unix() / 600)] = val + // Update key for this phase and next one + kvstore[key+"\n"+fmt.Sprint(phase)] = val + kvstore[key+"\n"+fmt.Sprint(phase+1)] = val mu.Unlock() } return } + if r.Method == "GET" { val, err := dhtGet(key) if err != nil { @@ -194,7 +213,7 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } - err = dhtPut(key, val) + err = dhtPost(key, val) if err != nil { w.WriteHeader(http.StatusInternalServerError) return @@ -202,3 +221,49 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } } + +// Clean out offline peers +func cleanPeers() { + for true { + mu.Lock() + peer := hashToDomain[peerHashes[rand.Intn(len(peerHashes))]] + mu.Unlock() + _, err := http.Get(peer) + if err != nil { + // Bad response, so remove peer + mu.Lock() + i := sort.SearchStrings(peerHashes, sha256sum(peer)) + peerHashes = append(peerHashes[:i], peerHashes[i+1:]...) + myPos = sort.SearchStrings(peerHashes, me) + // TODO: redistribute keys + mu.Unlock() + } + time.Sleep(5 * time.Second) + } +} + +// Clean out old keys from the KVStore every minute +func cleanKVStore() { + for true { + // This locks the mutex for a while which sucks + mu.Lock() + for key := range kvstore { + timestamp, err := strconv.Atoi(strings.Split(key, "\n")[1]) + if err != nil || int64(timestamp)+1 < time.Now().Unix()/600 { + delete(kvstore, key) + } + } + mu.Unlock() + time.Sleep(time.Minute) + } +} + +// Redistribute key-value pairs periodically +func redistributeKeys() { + for true { + for id, user := range users { + dhtPost(id, user.dhtVal) + } + time.Sleep(time.Duration(rand.Intn(300)) * time.Second) + } +} diff --git a/server/main.go b/server/main.go index 3ad3060..b4fc654 100644 --- a/server/main.go +++ b/server/main.go @@ -1,49 +1,62 @@ package main import ( - "crypto/ed25519" - "crypto/sha256" - "encoding/base64" + "encoding/gob" "flag" + "fmt" "log" "net/http" + "os" "sync" ) -type user struct { - pubkey []byte - servers []string -} - var mu sync.Mutex +var bindAddr string var me string -var myHash string -var myPos int -var hashToDomain map[string]string -var peerHashes []string -var kvstore map[string][]byte +var initialPeer string +var dataDir string func main() { - bindAddr := flag.String("b", ":4200", "bind address") - publicURL := flag.String("u", "http://localhost:4200", "public URL") - peer := flag.String("i", "", "initial peer") + flag.StringVar(&bindAddr, "b", ":4200", "bind address") + flag.StringVar(&me, "u", "http://localhost:4200", "public URL") + flag.StringVar(&initialPeer, "i", "", "initial peer") + flag.StringVar(&dataDir, "d", ".", "data directory") flag.Parse() - log.Printf("Starting %s %s %s", *bindAddr, *publicURL, *peer) + log.Printf("Starting %s %s %s %s", bindAddr, me, initialPeer, dataDir) // Record myself - me = *publicURL myHash = sha256sum(me) myPos = 0 peerHashes = append(peerHashes, sha256sum(me)) hashToDomain = map[string]string{peerHashes[0]: me} - if *peer != "" { - go addPeer(*peer) + if initialPeer != "" { + go addPeer(initialPeer) + } + go cleanPeers() + go cleanKVStore() + + // Load user data from disk + entries, _ := os.ReadDir(dataDir) + for _, entry := range entries { + id := entry.Name() + reader, err := os.Open(dataDir + "/" + id + "/gob") + if err != nil { + continue + } + var user user + dec := gob.NewDecoder(reader) + dec.Decode(&user) + users[id] = user } + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Hello! This is a Kela server.") + }) http.HandleFunc("/peer", peerHandler) + http.HandleFunc("/user", userHandler) http.HandleFunc("/dht", dhtHandler) http.HandleFunc("/storage", storageHandler) http.HandleFunc("/message", messageHandler) - log.Fatal(http.ListenAndServe(*bindAddr, nil)) + log.Fatal(http.ListenAndServe(bindAddr, nil)) } diff --git a/server/user.go b/server/user.go index 37ceee5..5865c93 100644 --- a/server/user.go +++ b/server/user.go @@ -3,10 +3,20 @@ package main import ( "crypto/ed25519" "encoding/base64" + "encoding/gob" "errors" "net/http" + "os" + "strings" ) +type user struct { + dhtVal []byte +} + +var users map[string]user + +// Verify that a body was signed by this ID func verify(id string, body []byte) error { b, err := base64.RawURLEncoding.DecodeString(id) if err != nil { @@ -23,15 +33,47 @@ func verify(id string, body []byte) error { return nil } -// Create user -func createHandler(w http.ResponseWriter, r *http.Request) { - r.ParseForm() - id := r.Form.Get("id") - dhtGet(id) - +// Persist a user's data to disk +func persist(id string) { + writer, err := os.Open(dataDir + "/" + id + "/gob") + if err != nil { + return + } + enc := gob.NewEncoder(writer) + enc.Encode(users[id]) } -// Delete user -func deleteHandler(w http.ResponseWriter, r *http.Request) { +// Handle user configuration changes +func userHandler(w http.ResponseWriter, r *http.Request) { + id := r.URL.Fragment[6:] + // Resolve ID to server list + val, err := dhtGet(id) + if err != nil || verify(id, val) != nil { + w.WriteHeader(http.StatusNotFound) + return + } + // Check if server list contains this server + message := string(val[:len(val)-ed25519.SignatureSize]) + if !strings.Contains(message, me) { + // Delete user if they are no longer associated with this server + delete(users, id) + err = os.RemoveAll(id) + if err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + w.WriteHeader(http.StatusOK) + return + } + //valSplit := strings.Split(message, "\n") + //servers := valSplit[1:len(valSplit)-1] + if _, ok := users[id]; !ok { + // Add user + users[id] = user{ + dhtVal: val, + } + os.Mkdir(id, 755) + persist(id) + } } |