diff options
Diffstat (limited to 'server/dht.go')
-rw-r--r-- | server/dht.go | 126 |
1 files changed, 69 insertions, 57 deletions
diff --git a/server/dht.go b/server/dht.go index 6590199..7327676 100644 --- a/server/dht.go +++ b/server/dht.go @@ -5,6 +5,7 @@ import ( "crypto/ed25519" "crypto/sha256" "encoding/base64" + "encoding/binary" "fmt" "io" "log" @@ -29,9 +30,19 @@ func sha256sum(s string) string { return base64.RawURLEncoding.EncodeToString(b[:]) } +// Find the position of a key in the DHT +func keyPos(key string) int { + keyPos := sort.SearchStrings(peerHashes, sha256sum(key)) + if keyPos < myPos { + keyPos += len(peerHashes) + } + return keyPos +} + // Try to peer with another server func addPeer(peer string) error { peerHash := sha256sum(peer) + // Check if already peered mu.Lock() _, ok := hashToDomain[peerHash] @@ -44,7 +55,6 @@ func addPeer(peer string) error { mu.Unlock() // Try request to peer - log.Printf("%s trying to peer with %s", me, peer) resp, err := http.Get(peer + "/peer?peer=" + me) if err != nil { // Request failed, delete peer @@ -54,25 +64,25 @@ func addPeer(peer string) error { return err } - log.Printf("%s successfully peered with %s", me, peer) + // Add peer mu.Lock() i := sort.SearchStrings(peerHashes, peerHash) peerHashes = append(peerHashes, "") copy(peerHashes[i+1:], peerHashes[i:]) peerHashes[i] = peerHash - myPos = sort.SearchStrings(peerHashes, me) - + myPos = sort.SearchStrings(peerHashes, myHash) // Distribute keys to new server for id, user := range users { phase := time.Now().Unix() / 600 - if keyPos(id + "\n" + fmt.Sprint(phase))-myPos < 5 { - go http.Post(peer+"/dht/"+id+"?phase="+fmt.Sprint(phase)+"&direct=true", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) + if keyPos(id+"\n"+fmt.Sprint(phase))-myPos < 5 { + go http.Post(peer+"/dht/"+id+"?phase="+fmt.Sprint(phase)+"&direct", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) } - if keyPos(id + "\n" + fmt.Sprint(phase+1))-myPos < 5 { - go http.Post(peer+"/dht/"+id+"?phase="+fmt.Sprint(phase+1)+"&direct=true", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) + if keyPos(id+"\n"+fmt.Sprint(phase+1))-myPos < 5 { + go http.Post(peer+"/dht/"+id+"?phase="+fmt.Sprint(phase+1)+"&direct", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) } } mu.Unlock() + log.Printf("%s successfully peered with %s", me, peer) // Read response body body, err := io.ReadAll(resp.Body) @@ -95,39 +105,27 @@ func peerHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } + mu.Lock() for _, p := range hashToDomain { fmt.Fprintf(w, "%s\n", p) } + mu.Unlock() go addPeer(peer) } -// Find the position of a key in the DHT -func keyPos(key string) int { - keyPos := sort.SearchStrings(peerHashes, sha256sum(key)) - if keyPos < myPos { - keyPos += len(peerHashes) - } - return keyPos -} - // Get the timestamp of this val -func timestamp(val []byte) int { - if len(val) < ed25519.SignatureSize { - return 0 - } - message := string(val[:len(val)-ed25519.SignatureSize]) - timestamp, err := strconv.Atoi(strings.Split(message, "\n")[0]) - if err != nil { +func timestamp(val []byte) int64 { + if len(val) < 8+ed25519.SignatureSize { return 0 } - return timestamp + ret, _ := binary.Varint(val[:8]) + return ret } // Get the value for a key from the DHT -func dhtGet(key, direct string) []byte { +func dhtGet(key string, direct bool) []byte { phase := fmt.Sprint(time.Now().Unix() / 600) - keyPos := keyPos(key + "\n" + phase) - if direct != "" && keyPos-myPos < 5 { + if direct { // Directly read from kvstore mu.Lock() val, ok := kvstore[key+"\n"+phase] @@ -139,12 +137,14 @@ func dhtGet(key, direct string) []byte { } // Contact 5 servers that store this key-value pair - var mu sync.Mutex + var mu2 sync.Mutex var wg sync.WaitGroup var latest []byte + mu.Lock() + keyPos := keyPos(key + "\n" + phase) for i := 0; i < 5 && i < len(peerHashes); i++ { wg.Add(1) - j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]] + j := hashToDomain[peerHashes[(keyPos-i+len(peerHashes))%len(peerHashes)]] go func() { defer wg.Done() resp, err := http.Get(j + "/dht/" + key + "?direct") @@ -159,20 +159,21 @@ func dhtGet(key, direct string) []byte { if err != nil { return } - mu.Lock() + mu2.Lock() if latest == nil || timestamp(val) > timestamp(latest) { latest = val } - mu.Unlock() + mu2.Unlock() }() } + mu.Unlock() // Wait for all to finish or time out wg.Wait() return latest } // Post a key-value pair into the DHT -func dhtPost(key, phase, direct string, val []byte) error { +func dhtPost(key, phase string, direct bool, val []byte) error { err := verify(key, val) if err != nil { return err @@ -180,21 +181,8 @@ func dhtPost(key, phase, direct string, val []byte) error { if phase == "" { phase = fmt.Sprint(time.Now().Unix() / 600) } - user, ok := users[key] - if ok { - curPhase, err := strconv.Atoi(phase) - if err != nil { - return err - } - nextPhase := time.Now().Unix()/600 + 1 - if int64(curPhase) < nextPhase && user.phase < nextPhase { - user.phase = nextPhase - go dhtPost(key, fmt.Sprint(nextPhase), "", val) - } - } - keyPos := keyPos(key + "\n" + phase) - if direct != "" && keyPos-myPos < 5 { + if direct { // Directly write to kvstore mu.Lock() curVal, ok := kvstore[key+"\n"+phase] @@ -205,11 +193,31 @@ func dhtPost(key, phase, direct string, val []byte) error { return nil } + // Post the key-value pair to the next phase if necessary + mu.Lock() + curPhase, err := strconv.Atoi(phase) + if err != nil { + return err + } + nextPhase := time.Now().Unix()/600 + 1 + if int64(curPhase) < nextPhase { + user, ok := users[key] + if ok && user.phase < nextPhase { + user.phase = nextPhase + persist(key) + } + go dhtPost(key, fmt.Sprint(nextPhase), false, val) + } + keyPos := keyPos(key + "\n" + phase) + mu.Unlock() + // Contact 5 servers that store this key-value pair + mu.Lock() for i := 0; i < 5 && i < len(peerHashes); i++ { - j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]] - go http.Post(j+"/dht/"+key+"?phase="+phase+"&direct=true", "application/octet-stream", bytes.NewBuffer(val)) + j := hashToDomain[peerHashes[(keyPos-i+len(peerHashes))%len(peerHashes)]] + go http.Post(j+"/dht/"+key+"?phase="+phase+"&direct", "application/octet-stream", bytes.NewBuffer(val)) } + mu.Unlock() return nil } @@ -218,7 +226,7 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) { key := r.URL.Path[5:] r.ParseForm() if r.Method == "GET" { - val := dhtGet(key, r.Form.Get("direct")) + val := dhtGet(key, r.Form.Has("direct")) if val == nil { w.WriteHeader(http.StatusNotFound) return @@ -226,7 +234,7 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) { w.Write(val) } else if r.Method == "POST" { val, err := io.ReadAll(r.Body) - if err != nil || dhtPost(key, r.Form.Get("phase"), r.Form.Get("direct"), val) != nil { + if err != nil || dhtPost(key, r.Form.Get("phase"), r.Form.Has("direct"), val) != nil { w.WriteHeader(http.StatusInternalServerError) return } @@ -242,11 +250,13 @@ func cleanPeers() { mu.Unlock() _, err := http.Get(peer) if err != nil { + log.Printf("Removing peer %s", peer) + // Bad response, so remove peer mu.Lock() i := sort.SearchStrings(peerHashes, sha256sum(peer)) peerHashes = append(peerHashes[:i], peerHashes[i+1:]...) - myPos = sort.SearchStrings(peerHashes, me) + myPos = sort.SearchStrings(peerHashes, myHash) // Distribute keys on this server to other servers if len(peerHashes) >= 5 { @@ -254,13 +264,13 @@ func cleanPeers() { phase := time.Now().Unix() / 600 kpos := keyPos(id + "\n" + fmt.Sprint(phase)) if kpos-i < 5 { - server := hashToDomain[peerHashes[(kpos+4)%len(peerHashes)]] - go http.Post(server+"/dht/"+id+"?phase="+fmt.Sprint(phase)+"&direct=true", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) + server := hashToDomain[peerHashes[(kpos-4+len(peerHashes))%len(peerHashes)]] + go http.Post(server+"/dht/"+id+"?phase="+fmt.Sprint(phase)+"&direct", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) } kpos = keyPos(id + "\n" + fmt.Sprint(phase+1)) if kpos-i < 5 { - server := hashToDomain[peerHashes[(kpos+4)%len(peerHashes)]] - go http.Post(server+"/dht/"+id+"?phase="+fmt.Sprint(phase+1)+"&direct=true", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) + server := hashToDomain[peerHashes[(kpos-4+len(peerHashes))%len(peerHashes)]] + go http.Post(server+"/dht/"+id+"?phase="+fmt.Sprint(phase+1)+"&direct", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) } } } @@ -293,8 +303,10 @@ func redistributeKeys() { for id, user := range users { nextPhase := time.Now().Unix()/600 + 1 if user.phase < nextPhase { - go dhtPost(id, fmt.Sprint(nextPhase), "", user.dhtVal) + go dhtPost(id, fmt.Sprint(nextPhase), false, user.dhtVal) } + user.phase = nextPhase + persist(id) } mu.Unlock() time.Sleep(time.Duration(rand.Intn(300)) * time.Second) |