diff options
author | Anthony Wang | 2023-05-12 11:09:07 -0400 |
---|---|---|
committer | Anthony Wang | 2023-05-12 11:09:07 -0400 |
commit | ae29c8b047e16eb8357fd30f1cd970d124a71ec1 (patch) | |
tree | 12479e1e526f4675f031f55e14aeddfe99ed2bd1 /server | |
parent | cdd295e371d78f02a2051044e8e4417f79b378dd (diff) |
Finish DHT server and client implementation
Diffstat (limited to 'server')
-rw-r--r-- | server/dht.go | 143 | ||||
-rw-r--r-- | server/main.go | 11 | ||||
-rwxr-xr-x | server/test.sh | 4 | ||||
-rw-r--r-- | server/user.go | 11 |
4 files changed, 106 insertions, 63 deletions
diff --git a/server/dht.go b/server/dht.go index c3e984b..6590199 100644 --- a/server/dht.go +++ b/server/dht.go @@ -5,7 +5,6 @@ import ( "crypto/ed25519" "crypto/sha256" "encoding/base64" - "errors" "fmt" "io" "log" @@ -63,7 +62,16 @@ func addPeer(peer string) error { peerHashes[i] = peerHash myPos = sort.SearchStrings(peerHashes, me) - // TODO: redistribute keys + // Distribute keys to new server + for id, user := range users { + phase := time.Now().Unix() / 600 + if keyPos(id + "\n" + fmt.Sprint(phase))-myPos < 5 { + go http.Post(peer+"/dht/"+id+"?phase="+fmt.Sprint(phase)+"&direct=true", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) + } + if keyPos(id + "\n" + fmt.Sprint(phase+1))-myPos < 5 { + go http.Post(peer+"/dht/"+id+"?phase="+fmt.Sprint(phase+1)+"&direct=true", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) + } + } mu.Unlock() // Read response body @@ -116,8 +124,22 @@ func timestamp(val []byte) int { } // Get the value for a key from the DHT -func dhtGet(key string) ([]byte, error) { - keyPos := keyPos(key) +func dhtGet(key, direct string) []byte { + phase := fmt.Sprint(time.Now().Unix() / 600) + keyPos := keyPos(key + "\n" + phase) + if direct != "" && keyPos-myPos < 5 { + // Directly read from kvstore + mu.Lock() + val, ok := kvstore[key+"\n"+phase] + mu.Unlock() + if !ok || verify(key, val) != nil { + return nil + } + return val + } + + // Contact 5 servers that store this key-value pair + var mu sync.Mutex var wg sync.WaitGroup var latest []byte for i := 0; i < 5 && i < len(peerHashes); i++ { @@ -137,30 +159,56 @@ func dhtGet(key string) ([]byte, error) { if err != nil { return } + mu.Lock() if latest == nil || timestamp(val) > timestamp(latest) { latest = val } + mu.Unlock() }() } + // Wait for all to finish or time out wg.Wait() - if latest == nil { - return nil, errors.New("key not found in kvstore") - } - return latest, nil + return latest } // Post a key-value pair into the DHT -func dhtPost(key string, val []byte) error { +func dhtPost(key, phase, direct string, val []byte) error { err := verify(key, val) if err != nil { return err } - keyPos := keyPos(key) + if phase == "" { + phase = fmt.Sprint(time.Now().Unix() / 600) + } + user, ok := users[key] + if ok { + curPhase, err := strconv.Atoi(phase) + if err != nil { + return err + } + nextPhase := time.Now().Unix()/600 + 1 + if int64(curPhase) < nextPhase && user.phase < nextPhase { + user.phase = nextPhase + go dhtPost(key, fmt.Sprint(nextPhase), "", val) + } + } + + keyPos := keyPos(key + "\n" + phase) + if direct != "" && keyPos-myPos < 5 { + // Directly write to kvstore + mu.Lock() + curVal, ok := kvstore[key+"\n"+phase] + if !ok || timestamp(val) > timestamp(curVal) { + kvstore[key+"\n"+phase] = val + } + mu.Unlock() + return nil + } + + // Contact 5 servers that store this key-value pair for i := 0; i < 5 && i < len(peerHashes); i++ { j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]] - go func() { - http.Post(j+"/dht/"+key+"?direct", "application/octet-stream", bytes.NewBuffer(val)) - }() + go http.Post(j+"/dht/"+key+"?phase="+phase+"&direct=true", "application/octet-stream", bytes.NewBuffer(val)) } return nil } @@ -169,52 +217,16 @@ func dhtPost(key string, val []byte) error { func dhtHandler(w http.ResponseWriter, r *http.Request) { key := r.URL.Path[5:] r.ParseForm() - if r.Form.Get("direct") != "" { - // Directly modify kvstore - if keyPos(key)-myPos >= 5 { - w.WriteHeader(http.StatusNotFound) - return - } - phase := time.Now().Unix()/600 - if r.Method == "GET" { - mu.Lock() - val, ok := kvstore[key+"\n"+fmt.Sprint(phase)] - mu.Unlock() - if !ok || verify(key, val) != nil { - w.WriteHeader(http.StatusNotFound) - return - } - w.Write(val) - } else if r.Method == "POST" { - val, err := io.ReadAll(r.Body) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - mu.Lock() - // Update key for this phase and next one - kvstore[key+"\n"+fmt.Sprint(phase)] = val - kvstore[key+"\n"+fmt.Sprint(phase+1)] = val - mu.Unlock() - } - return - } - if r.Method == "GET" { - val, err := dhtGet(key) - if err != nil { + val := dhtGet(key, r.Form.Get("direct")) + if val == nil { w.WriteHeader(http.StatusNotFound) return } - w.Write([]byte(val)) + w.Write(val) } else if r.Method == "POST" { val, err := io.ReadAll(r.Body) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - err = dhtPost(key, val) - if err != nil { + if err != nil || dhtPost(key, r.Form.Get("phase"), r.Form.Get("direct"), val) != nil { w.WriteHeader(http.StatusInternalServerError) return } @@ -235,7 +247,23 @@ func cleanPeers() { i := sort.SearchStrings(peerHashes, sha256sum(peer)) peerHashes = append(peerHashes[:i], peerHashes[i+1:]...) myPos = sort.SearchStrings(peerHashes, me) - // TODO: redistribute keys + + // Distribute keys on this server to other servers + if len(peerHashes) >= 5 { + for id, user := range users { + phase := time.Now().Unix() / 600 + kpos := keyPos(id + "\n" + fmt.Sprint(phase)) + if kpos-i < 5 { + server := hashToDomain[peerHashes[(kpos+4)%len(peerHashes)]] + go http.Post(server+"/dht/"+id+"?phase="+fmt.Sprint(phase)+"&direct=true", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) + } + kpos = keyPos(id + "\n" + fmt.Sprint(phase+1)) + if kpos-i < 5 { + server := hashToDomain[peerHashes[(kpos+4)%len(peerHashes)]] + go http.Post(server+"/dht/"+id+"?phase="+fmt.Sprint(phase+1)+"&direct=true", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) + } + } + } mu.Unlock() } time.Sleep(5 * time.Second) @@ -261,9 +289,14 @@ func cleanKVStore() { // Redistribute key-value pairs periodically func redistributeKeys() { for true { + mu.Lock() for id, user := range users { - dhtPost(id, user.dhtVal) + nextPhase := time.Now().Unix()/600 + 1 + if user.phase < nextPhase { + go dhtPost(id, fmt.Sprint(nextPhase), "", user.dhtVal) + } } + mu.Unlock() time.Sleep(time.Duration(rand.Intn(300)) * time.Second) } } diff --git a/server/main.go b/server/main.go index b4fc654..a342ba5 100644 --- a/server/main.go +++ b/server/main.go @@ -30,14 +30,23 @@ func main() { peerHashes = append(peerHashes, sha256sum(me)) hashToDomain = map[string]string{peerHashes[0]: me} + // Start background functions if initialPeer != "" { go addPeer(initialPeer) } go cleanPeers() go cleanKVStore() + go redistributeKeys() // Load user data from disk - entries, _ := os.ReadDir(dataDir) + err := os.Mkdir(dataDir, 0755) + if err != nil { + log.Fatal(err) + } + entries, err := os.ReadDir(dataDir) + if err != nil { + log.Fatal(err) + } for _, entry := range entries { id := entry.Name() reader, err := os.Open(dataDir + "/" + id + "/gob") diff --git a/server/test.sh b/server/test.sh index 8ca1d42..592ab6d 100755 --- a/server/test.sh +++ b/server/test.sh @@ -1,10 +1,10 @@ #!/bin/bash trap "kill 0" EXIT go build -./server -b :4200 -d http://localhost:4200 & +./server -d 0 & for i in $(seq 1 9) do sleep 0.1 - ./server -b :420$i -d http://localhost:420$i -i http://localhost:420$((i-1)) & + ./server -d $i -b :420$i -u http://localhost:420$i -i http://localhost:420$((i-1)) & done wait diff --git a/server/user.go b/server/user.go index 5865c93..aa78239 100644 --- a/server/user.go +++ b/server/user.go @@ -11,7 +11,8 @@ import ( ) type user struct { - dhtVal []byte + dhtVal []byte + phase int64 } var users map[string]user @@ -47,8 +48,8 @@ func persist(id string) { func userHandler(w http.ResponseWriter, r *http.Request) { id := r.URL.Fragment[6:] // Resolve ID to server list - val, err := dhtGet(id) - if err != nil || verify(id, val) != nil { + val := dhtGet(id, "") + if verify(id, val) != nil { w.WriteHeader(http.StatusNotFound) return } @@ -57,7 +58,7 @@ func userHandler(w http.ResponseWriter, r *http.Request) { if !strings.Contains(message, me) { // Delete user if they are no longer associated with this server delete(users, id) - err = os.RemoveAll(id) + err := os.RemoveAll(id) if err != nil { w.WriteHeader(http.StatusNotFound) return @@ -73,7 +74,7 @@ func userHandler(w http.ResponseWriter, r *http.Request) { users[id] = user{ dhtVal: val, } - os.Mkdir(id, 755) + os.Mkdir(id, 0755) persist(id) } } |