diff options
author | Anthony Wang | 2023-05-11 16:31:57 -0400 |
---|---|---|
committer | Anthony Wang | 2023-05-11 16:31:57 -0400 |
commit | d87d4d6492bb3e325b4ea1b3d60677657465b650 (patch) | |
tree | fdb39a6e5ee1ef95bb5c93cc6de0a3c40e21e341 | |
parent | 45e65685417df707eaa128d5fad61f230615092f (diff) |
Almost done with DHT
-rw-r--r-- | README.md | 2 | ||||
-rw-r--r-- | server/dht.go | 167 | ||||
-rw-r--r-- | server/main.go | 18 | ||||
-rw-r--r-- | server/user.go | 37 |
4 files changed, 156 insertions, 68 deletions
@@ -16,7 +16,7 @@ Alright, let's solve all those problems above with Kela! Kela consists of three ### Name resolution service -In Kela, each user has an ID, which is a public key. Each user is associated with one or more Kela servers, which store that user's data. To find out which servers a user is associated with, you can query the name resolution system. All Kela servers participate in the name resolution system and act as DHT nodes. Each server stores a complete list of all DHT nodes. When a new server joins the DHT, it tries to peer with an existing server in the DHT. Say server `example.com` would like to peer with `test.net`. `example.com` first sends a GET request to `test.net/peer?peer=example.com`. `test.net` replies with its list of DHT nodes. Once `example.com` receives this reply, it adds `test.net` to its list of DHT nodes and attempts to peer with all servers in the reply that it hasn't peered with yet. `test.net` now also tries to peer with the server that just contacted it, in this case `example.com`. Servers periodically go through their list of DHT nodes and remove nodes that are no longer online. +In Kela, each user has an ID, which is an Ed5519 public key encoded in URL-safe Base64. Each user is associated with one or more Kela servers, which store that user's data. To find out which servers a user is associated with, you can query the name resolution system. All Kela servers participate in the name resolution system and act as DHT nodes. Each server stores a complete list of all DHT nodes. When a new server joins the DHT, it tries to peer with an existing server in the DHT. Say server `example.com` would like to peer with `test.net`. `example.com` first sends a GET request to `test.net/peer?peer=example.com`. `test.net` replies with its list of DHT nodes. Once `example.com` receives this reply, it adds `test.net` to its list of DHT nodes and attempts to peer with all servers in the reply that it hasn't peered with yet. `test.net` now also tries to peer with the server that just contacted it, in this case `example.com`. Servers periodically go through their list of DHT nodes and remove nodes that are no longer online. The DHT stores key-value pairs. The key consists of a user's public key and timestamp (the current Unix time in seconds divided by 600, rounded down). The value consists of a timestamp (the current Unix time in seconds), a list of servers that the user is associated with, where the first server is their primary server, and a signature. A key-value pair is assigned to the 5 servers with smallest SHA-256 hashes of their domain name greater than the SHA-256 hash of the key. The purpose of the elaborate timestamp in the key is to ensure that the set of servers assigned to a key-value pair rotates every 600 seconds so an attacker must control a very large portion of the DHT to do a denial-of-service attack against a specific key-value pair. When servers join and leave the DHT, the servers that a user is associated with will ensure that that user's key-value pair is assigned to a new server if necessary to ensure that 5 servers store that key-value pair. The DHT supports two operations, get and put. For put operations, the server checks the signature to ensure the validity of the request. When a server receives either of these two operations, it computes the SHA-256 hash of the key and checks if it is supposed to store that key-value pair or not. If it is supposed to store that key-value pair, it performs the operation on that pair. Otherwise, the server will contact in parallel the 5 servers that store this key-value pair. If the operation is a get, the server will look at the 5 replies and return the value with the most recent timestamp. If the operation is a put, and one of the 5 parallel requests fails, the server will remove that offline server from its DHT node list and assign a new server to this key-value pair to replace the offline one. Each server periodically goes through its stored key-value pairs and deletes old ones. diff --git a/server/dht.go b/server/dht.go index e5d96f1..8292280 100644 --- a/server/dht.go +++ b/server/dht.go @@ -1,16 +1,27 @@ package main import ( - "crypto/ed25519" + "bytes" + "crypto/sha256" "encoding/base64" + "errors" "fmt" "io" "log" "net/http" "sort" + "strconv" "strings" + "sync" + "time" ) +// Get the sha256sum of string as a URL-safe unpadded base64 string +func sha256sum(s string) string { + b := sha256.Sum256([]byte(s)) + return base64.RawURLEncoding.EncodeToString(b[:]) +} + // Try to peer with another server func addPeer(peer string) error { peerHash := sha256sum(peer) @@ -71,69 +82,123 @@ func peerHandler(w http.ResponseWriter, r *http.Request) { go addPeer(peer) } -// Handle DHT requests -func dhtHandler(w http.ResponseWriter, r *http.Request) { - key := r.URL.String()[5:] - keyHash := sha256sum(key) - pubKey := asPubKey(key) - fmt.Println(key, keyHash, pubKey) - mu.Lock() - keyPos := sort.SearchStrings(peerHashes, keyHash) +// Find the position of a key in the DHT +func keyPos(key string) int { + keyPos := sort.SearchStrings(peerHashes, sha256sum(key)) if keyPos < myPos { keyPos += len(peerHashes) } - mu.Unlock() - if r.Method == "GET" { - if keyPos - myPos < 5 { - mu.Lock() - if val, ok := kvstore[key]; ok { - w.Write([]byte(val)) - } else { - w.WriteHeader(http.StatusNotFound) - } - mu.Unlock() - } else { - for i := 0; i < 5 && i < len(peerHashes); i++ { - j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]] - go func() string { - resp, err := http.Get(j + r.URL.String()) - if err != nil { - return "" - } - b, err := io.ReadAll(resp.Body) - if err != nil { - return "" - } - return string(b) - }() + return keyPos +} + +// Get the value for a key from the DHT +func dhtGet(key string) ([]byte, error) { + keyPos := keyPos(key) + var wg sync.WaitGroup + var ret []byte + bestTime := 0 + for i := 0; i < 5 && i < len(peerHashes); i++ { + wg.Add(1) + j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]] + go func() { + defer wg.Done() + resp, err := http.Get(j + "/dht/" + key + "?direct") + if err != nil { + // TODO: Remove this server from DHT? + // For sanity reasons this might be a bad idea + return } + b, err := io.ReadAll(resp.Body) + if err != nil { + return + } + err = verify(key, b) + if err != nil { + return + } + valTime, err := strconv.Atoi(strings.Split(string(b), "\n")[0]) + if err != nil { + return + } + if ret == nil || valTime > bestTime { + ret = b + bestTime = valTime + } + }() + } + wg.Wait() + if ret == nil { + return nil, errors.New("id not in kvstore") + } + return ret, nil +} +// Put a key-value pair into the DHT +func dhtPut(key string, val []byte) error { + err := verify(key, val) + if err != nil { + return err + } + keyPos := keyPos(key) + for i := 0; i < 5 && i < len(peerHashes); i++ { + j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]] + go func() { + http.Post(j + "/dht/" + key + "?direct", "application/octet-stream", bytes.NewBuffer(val)) + }() + } + return nil +} +// Handle DHT requests +func dhtHandler(w http.ResponseWriter, r *http.Request) { + key := r.URL.Path[5:] + r.ParseForm() + if r.Form.Get("direct") != "" { + // Directly modify kvstore + if keyPos(key) - myPos >= 5 { + w.WriteHeader(http.StatusNotFound) + return + } + if r.Method == "GET" { + mu.Lock() + val, ok := kvstore[key + fmt.Sprint(time.Now().Unix() / 600)] + mu.Unlock() + if !ok || verify(key, val) != nil { + w.WriteHeader(http.StatusNotFound) + return + } + w.Write(val) + } else if r.Method == "POST" { + val, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + mu.Lock() + kvstore[key + fmt.Sprint(time.Now().Unix() / 600)] = val + mu.Unlock() } - } else if r.Method == "PUT" { - // Read request body - b, err := io.ReadAll(r.Body) + return + } + if r.Method == "GET" { + val, err := dhtGet(key) if err != nil { - w.WriteHeader(http.StatusInternalServerError) + w.WriteHeader(http.StatusNotFound) return } - // Extract signature - valSplit := strings.Split(string(b), "\n") - sig := valSplit[len(valSplit)-1] - // Verify signature - if !ed25519.Verify(pubKey, b[:len(b)-len(sig)-1], []byte(sig)) { - w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte(val)) + } else if r.Method == "POST" { + val, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) return } - if keyPos - myPos < 5 { - mu.Lock() - kvstore[key] = string(b[:len(b)-len(sig)-1]) - mu.Unlock() - } else { - + err = dhtPut(key, val) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return } - } else { - w.WriteHeader(http.StatusMethodNotAllowed) + w.WriteHeader(http.StatusOK) } } diff --git a/server/main.go b/server/main.go index c0f7450..3ad3060 100644 --- a/server/main.go +++ b/server/main.go @@ -12,6 +12,7 @@ import ( type user struct { pubkey []byte + servers []string } var mu sync.Mutex @@ -20,22 +21,7 @@ var myHash string var myPos int var hashToDomain map[string]string var peerHashes []string -var kvstore map[string]string - -// Get the sha256sum of string as a URL-safe unpadded base64 string -func sha256sum(s string) string { - b := sha256.Sum256([]byte(s)) - return base64.RawURLEncoding.EncodeToString(b[:]) -} - -// Decode an ID to a public key -func asPubKey(s string) ed25519.PublicKey { - b, err := base64.RawURLEncoding.DecodeString(s) - if err != nil { - return nil - } - return ed25519.PublicKey(b) -} +var kvstore map[string][]byte func main() { bindAddr := flag.String("b", ":4200", "bind address") diff --git a/server/user.go b/server/user.go new file mode 100644 index 0000000..37ceee5 --- /dev/null +++ b/server/user.go @@ -0,0 +1,37 @@ +package main + +import ( + "crypto/ed25519" + "encoding/base64" + "errors" + "net/http" +) + +func verify(id string, body []byte) error { + b, err := base64.RawURLEncoding.DecodeString(id) + if err != nil { + return err + } + if len(body) < ed25519.SignatureSize { + return errors.New("body too short") + } + message := body[:len(body)-ed25519.SignatureSize] + sig := body[len(body)-ed25519.SignatureSize:] + if !ed25519.Verify(ed25519.PublicKey(b), message, sig) { + return errors.New("signature verification failed") + } + return nil +} + +// Create user +func createHandler(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + id := r.Form.Get("id") + dhtGet(id) + +} + +// Delete user +func deleteHandler(w http.ResponseWriter, r *http.Request) { + +} |