diff options
author | Anthony Wang | 2023-05-11 22:17:20 -0400 |
---|---|---|
committer | Anthony Wang | 2023-05-11 22:17:20 -0400 |
commit | cdd295e371d78f02a2051044e8e4417f79b378dd (patch) | |
tree | db4f086ea41d3c23d6da9c0ee52e910c86a55c1a /server | |
parent | d87d4d6492bb3e325b4ea1b3d60677657465b650 (diff) |
Almost done with DHT
Diffstat (limited to 'server')
-rw-r--r-- | server/dht.go | 111 | ||||
-rw-r--r-- | server/main.go | 55 | ||||
-rw-r--r-- | server/user.go | 58 |
3 files changed, 172 insertions, 52 deletions
diff --git a/server/dht.go b/server/dht.go index 8292280..c3e984b 100644 --- a/server/dht.go +++ b/server/dht.go @@ -2,12 +2,14 @@ package main import ( "bytes" + "crypto/ed25519" "crypto/sha256" "encoding/base64" "errors" "fmt" "io" "log" + "math/rand" "net/http" "sort" "strconv" @@ -16,6 +18,12 @@ import ( "time" ) +var myHash string +var myPos int +var hashToDomain map[string]string +var peerHashes []string +var kvstore map[string][]byte + // Get the sha256sum of string as a URL-safe unpadded base64 string func sha256sum(s string) string { b := sha256.Sum256([]byte(s)) @@ -54,7 +62,10 @@ func addPeer(peer string) error { copy(peerHashes[i+1:], peerHashes[i:]) peerHashes[i] = peerHash myPos = sort.SearchStrings(peerHashes, me) + + // TODO: redistribute keys mu.Unlock() + // Read response body body, err := io.ReadAll(resp.Body) if err != nil { @@ -91,13 +102,24 @@ func keyPos(key string) int { return keyPos } +// Get the timestamp of this val +func timestamp(val []byte) int { + if len(val) < ed25519.SignatureSize { + return 0 + } + message := string(val[:len(val)-ed25519.SignatureSize]) + timestamp, err := strconv.Atoi(strings.Split(message, "\n")[0]) + if err != nil { + return 0 + } + return timestamp +} // Get the value for a key from the DHT func dhtGet(key string) ([]byte, error) { keyPos := keyPos(key) var wg sync.WaitGroup - var ret []byte - bestTime := 0 + var latest []byte for i := 0; i < 5 && i < len(peerHashes); i++ { wg.Add(1) j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]] @@ -105,37 +127,30 @@ func dhtGet(key string) ([]byte, error) { defer wg.Done() resp, err := http.Get(j + "/dht/" + key + "?direct") if err != nil { - // TODO: Remove this server from DHT? - // For sanity reasons this might be a bad idea - return - } - b, err := io.ReadAll(resp.Body) - if err != nil { return } - err = verify(key, b) + val, err := io.ReadAll(resp.Body) if err != nil { return } - valTime, err := strconv.Atoi(strings.Split(string(b), "\n")[0]) + err = verify(key, val) if err != nil { return } - if ret == nil || valTime > bestTime { - ret = b - bestTime = valTime + if latest == nil || timestamp(val) > timestamp(latest) { + latest = val } }() } wg.Wait() - if ret == nil { - return nil, errors.New("id not in kvstore") + if latest == nil { + return nil, errors.New("key not found in kvstore") } - return ret, nil + return latest, nil } -// Put a key-value pair into the DHT -func dhtPut(key string, val []byte) error { +// Post a key-value pair into the DHT +func dhtPost(key string, val []byte) error { err := verify(key, val) if err != nil { return err @@ -144,7 +159,7 @@ func dhtPut(key string, val []byte) error { for i := 0; i < 5 && i < len(peerHashes); i++ { j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]] go func() { - http.Post(j + "/dht/" + key + "?direct", "application/octet-stream", bytes.NewBuffer(val)) + http.Post(j+"/dht/"+key+"?direct", "application/octet-stream", bytes.NewBuffer(val)) }() } return nil @@ -156,13 +171,14 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) { r.ParseForm() if r.Form.Get("direct") != "" { // Directly modify kvstore - if keyPos(key) - myPos >= 5 { + if keyPos(key)-myPos >= 5 { w.WriteHeader(http.StatusNotFound) return } + phase := time.Now().Unix()/600 if r.Method == "GET" { mu.Lock() - val, ok := kvstore[key + fmt.Sprint(time.Now().Unix() / 600)] + val, ok := kvstore[key+"\n"+fmt.Sprint(phase)] mu.Unlock() if !ok || verify(key, val) != nil { w.WriteHeader(http.StatusNotFound) @@ -176,11 +192,14 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) { return } mu.Lock() - kvstore[key + fmt.Sprint(time.Now().Unix() / 600)] = val + // Update key for this phase and next one + kvstore[key+"\n"+fmt.Sprint(phase)] = val + kvstore[key+"\n"+fmt.Sprint(phase+1)] = val mu.Unlock() } return } + if r.Method == "GET" { val, err := dhtGet(key) if err != nil { @@ -194,7 +213,7 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } - err = dhtPut(key, val) + err = dhtPost(key, val) if err != nil { w.WriteHeader(http.StatusInternalServerError) return @@ -202,3 +221,49 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } } + +// Clean out offline peers +func cleanPeers() { + for true { + mu.Lock() + peer := hashToDomain[peerHashes[rand.Intn(len(peerHashes))]] + mu.Unlock() + _, err := http.Get(peer) + if err != nil { + // Bad response, so remove peer + mu.Lock() + i := sort.SearchStrings(peerHashes, sha256sum(peer)) + peerHashes = append(peerHashes[:i], peerHashes[i+1:]...) + myPos = sort.SearchStrings(peerHashes, me) + // TODO: redistribute keys + mu.Unlock() + } + time.Sleep(5 * time.Second) + } +} + +// Clean out old keys from the KVStore every minute +func cleanKVStore() { + for true { + // This locks the mutex for a while which sucks + mu.Lock() + for key := range kvstore { + timestamp, err := strconv.Atoi(strings.Split(key, "\n")[1]) + if err != nil || int64(timestamp)+1 < time.Now().Unix()/600 { + delete(kvstore, key) + } + } + mu.Unlock() + time.Sleep(time.Minute) + } +} + +// Redistribute key-value pairs periodically +func redistributeKeys() { + for true { + for id, user := range users { + dhtPost(id, user.dhtVal) + } + time.Sleep(time.Duration(rand.Intn(300)) * time.Second) + } +} diff --git a/server/main.go b/server/main.go index 3ad3060..b4fc654 100644 --- a/server/main.go +++ b/server/main.go @@ -1,49 +1,62 @@ package main import ( - "crypto/ed25519" - "crypto/sha256" - "encoding/base64" + "encoding/gob" "flag" + "fmt" "log" "net/http" + "os" "sync" ) -type user struct { - pubkey []byte - servers []string -} - var mu sync.Mutex +var bindAddr string var me string -var myHash string -var myPos int -var hashToDomain map[string]string -var peerHashes []string -var kvstore map[string][]byte +var initialPeer string +var dataDir string func main() { - bindAddr := flag.String("b", ":4200", "bind address") - publicURL := flag.String("u", "http://localhost:4200", "public URL") - peer := flag.String("i", "", "initial peer") + flag.StringVar(&bindAddr, "b", ":4200", "bind address") + flag.StringVar(&me, "u", "http://localhost:4200", "public URL") + flag.StringVar(&initialPeer, "i", "", "initial peer") + flag.StringVar(&dataDir, "d", ".", "data directory") flag.Parse() - log.Printf("Starting %s %s %s", *bindAddr, *publicURL, *peer) + log.Printf("Starting %s %s %s %s", bindAddr, me, initialPeer, dataDir) // Record myself - me = *publicURL myHash = sha256sum(me) myPos = 0 peerHashes = append(peerHashes, sha256sum(me)) hashToDomain = map[string]string{peerHashes[0]: me} - if *peer != "" { - go addPeer(*peer) + if initialPeer != "" { + go addPeer(initialPeer) + } + go cleanPeers() + go cleanKVStore() + + // Load user data from disk + entries, _ := os.ReadDir(dataDir) + for _, entry := range entries { + id := entry.Name() + reader, err := os.Open(dataDir + "/" + id + "/gob") + if err != nil { + continue + } + var user user + dec := gob.NewDecoder(reader) + dec.Decode(&user) + users[id] = user } + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Hello! This is a Kela server.") + }) http.HandleFunc("/peer", peerHandler) + http.HandleFunc("/user", userHandler) http.HandleFunc("/dht", dhtHandler) http.HandleFunc("/storage", storageHandler) http.HandleFunc("/message", messageHandler) - log.Fatal(http.ListenAndServe(*bindAddr, nil)) + log.Fatal(http.ListenAndServe(bindAddr, nil)) } diff --git a/server/user.go b/server/user.go index 37ceee5..5865c93 100644 --- a/server/user.go +++ b/server/user.go @@ -3,10 +3,20 @@ package main import ( "crypto/ed25519" "encoding/base64" + "encoding/gob" "errors" "net/http" + "os" + "strings" ) +type user struct { + dhtVal []byte +} + +var users map[string]user + +// Verify that a body was signed by this ID func verify(id string, body []byte) error { b, err := base64.RawURLEncoding.DecodeString(id) if err != nil { @@ -23,15 +33,47 @@ func verify(id string, body []byte) error { return nil } -// Create user -func createHandler(w http.ResponseWriter, r *http.Request) { - r.ParseForm() - id := r.Form.Get("id") - dhtGet(id) - +// Persist a user's data to disk +func persist(id string) { + writer, err := os.Open(dataDir + "/" + id + "/gob") + if err != nil { + return + } + enc := gob.NewEncoder(writer) + enc.Encode(users[id]) } -// Delete user -func deleteHandler(w http.ResponseWriter, r *http.Request) { +// Handle user configuration changes +func userHandler(w http.ResponseWriter, r *http.Request) { + id := r.URL.Fragment[6:] + // Resolve ID to server list + val, err := dhtGet(id) + if err != nil || verify(id, val) != nil { + w.WriteHeader(http.StatusNotFound) + return + } + // Check if server list contains this server + message := string(val[:len(val)-ed25519.SignatureSize]) + if !strings.Contains(message, me) { + // Delete user if they are no longer associated with this server + delete(users, id) + err = os.RemoveAll(id) + if err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + w.WriteHeader(http.StatusOK) + return + } + //valSplit := strings.Split(message, "\n") + //servers := valSplit[1:len(valSplit)-1] + if _, ok := users[id]; !ok { + // Add user + users[id] = user{ + dhtVal: val, + } + os.Mkdir(id, 755) + persist(id) + } } |