package main import ( "bytes" "crypto/ed25519" "crypto/sha256" "encoding/base64" "encoding/binary" "fmt" "io" "log" "math/rand" "net/http" "sort" "strconv" "strings" "sync" "time" ) var myHash string var myPos int var hashToDomain map[string]string var peerHashes []string var kvstore map[string][]byte // Get the sha256sum of string as a URL-safe unpadded base64 string func sha256sum(s string) string { b := sha256.Sum256([]byte(s)) return base64.RawURLEncoding.EncodeToString(b[:]) } // Find the position of a key in the DHT func keyPos(key string) int { keyPos := sort.SearchStrings(peerHashes, sha256sum(key)) if keyPos < myPos { keyPos += len(peerHashes) } return keyPos } // Try to peer with another server func addPeer(peer string) error { peerHash := sha256sum(peer) // Check if already peered mu.Lock() _, ok := hashToDomain[peerHash] mu.Unlock() if ok { return nil } mu.Lock() hashToDomain[peerHash] = peer mu.Unlock() // Try request to peer resp, err := http.Get(peer + "/peer?peer=" + me) if err != nil { // Request failed, delete peer mu.Lock() delete(hashToDomain, peerHash) mu.Unlock() return err } // Add peer mu.Lock() i := sort.SearchStrings(peerHashes, peerHash) peerHashes = append(peerHashes, "") copy(peerHashes[i+1:], peerHashes[i:]) peerHashes[i] = peerHash myPos = sort.SearchStrings(peerHashes, myHash) // Distribute keys to new server for id, user := range users { phase := time.Now().Unix() / 600 if keyPos(id+"\n"+fmt.Sprint(phase))-myPos < 5 { go http.Post(peer+"/dht/"+id+"?phase="+fmt.Sprint(phase)+"&direct", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) } if keyPos(id+"\n"+fmt.Sprint(phase+1))-myPos < 5 { go http.Post(peer+"/dht/"+id+"?phase="+fmt.Sprint(phase+1)+"&direct", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) } } mu.Unlock() log.Printf("%s successfully peered with %s", me, peer) // Read response body body, err := io.ReadAll(resp.Body) if err != nil { return err } // Try adding all peers of this peer newPeers := strings.Split(string(body), "\n") for _, newPeer := range newPeers[:len(newPeers)-1] { go addPeer(newPeer) } return nil } // Handle incoming peer requests func peerHandler(w http.ResponseWriter, r *http.Request) { r.ParseForm() peer := r.Form.Get("peer") if peer == "" { w.WriteHeader(http.StatusBadRequest) return } mu.Lock() for _, p := range hashToDomain { fmt.Fprintf(w, "%s\n", p) } mu.Unlock() go addPeer(peer) } // Get the timestamp of this val func timestamp(val []byte) int64 { if len(val) < 8+ed25519.SignatureSize { return 0 } ret, _ := binary.Varint(val[:8]) return ret } // Get the value for a key from the DHT func dhtGet(key string, direct bool) []byte { phase := fmt.Sprint(time.Now().Unix() / 600) if direct { // Directly read from kvstore mu.Lock() val, ok := kvstore[key+"\n"+phase] mu.Unlock() if !ok || verify(key, val) != nil { return nil } return val } // Contact 5 servers that store this key-value pair var mu2 sync.Mutex var wg sync.WaitGroup var latest []byte mu.Lock() keyPos := keyPos(key + "\n" + phase) for i := 0; i < 5 && i < len(peerHashes); i++ { wg.Add(1) j := hashToDomain[peerHashes[(keyPos-i+len(peerHashes))%len(peerHashes)]] go func() { defer wg.Done() resp, err := http.Get(j + "/dht/" + key + "?direct") if err != nil { return } val, err := io.ReadAll(resp.Body) if err != nil { return } err = verify(key, val) if err != nil { return } mu2.Lock() if latest == nil || timestamp(val) > timestamp(latest) { latest = val } mu2.Unlock() }() } mu.Unlock() // Wait for all to finish or time out wg.Wait() return latest } // Post a key-value pair into the DHT func dhtPost(key, phase string, direct bool, val []byte) error { err := verify(key, val) if err != nil { return err } if phase == "" { phase = fmt.Sprint(time.Now().Unix() / 600) } if direct { // Directly write to kvstore mu.Lock() curVal, ok := kvstore[key+"\n"+phase] if !ok || timestamp(val) > timestamp(curVal) { kvstore[key+"\n"+phase] = val } mu.Unlock() return nil } // Post the key-value pair to the next phase if necessary mu.Lock() curPhase, err := strconv.Atoi(phase) if err != nil { return err } nextPhase := time.Now().Unix()/600 + 1 if int64(curPhase) < nextPhase { user, ok := users[key] if ok && user.phase < nextPhase { user.phase = nextPhase persist(key) } go dhtPost(key, fmt.Sprint(nextPhase), false, val) } keyPos := keyPos(key + "\n" + phase) mu.Unlock() // Contact 5 servers that store this key-value pair mu.Lock() for i := 0; i < 5 && i < len(peerHashes); i++ { j := hashToDomain[peerHashes[(keyPos-i+len(peerHashes))%len(peerHashes)]] go http.Post(j+"/dht/"+key+"?phase="+phase+"&direct", "application/octet-stream", bytes.NewBuffer(val)) } mu.Unlock() return nil } // Handle DHT requests func dhtHandler(w http.ResponseWriter, r *http.Request) { key := r.URL.Path[5:] r.ParseForm() if r.Method == "GET" { val := dhtGet(key, r.Form.Has("direct")) if val == nil { w.WriteHeader(http.StatusNotFound) return } w.Write(val) } else if r.Method == "POST" { val, err := io.ReadAll(r.Body) if err != nil { log.Print(err) w.WriteHeader(http.StatusInternalServerError) return } err = dhtPost(key, r.Form.Get("phase"), r.Form.Has("direct"), val) if err != nil { log.Print(err) w.WriteHeader(http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) } } // Clean out offline peers func cleanPeers() { for true { mu.Lock() peer := hashToDomain[peerHashes[rand.Intn(len(peerHashes))]] mu.Unlock() _, err := http.Get(peer) if err != nil { log.Printf("Removing peer %s", peer) // Bad response, so remove peer mu.Lock() i := sort.SearchStrings(peerHashes, sha256sum(peer)) peerHashes = append(peerHashes[:i], peerHashes[i+1:]...) myPos = sort.SearchStrings(peerHashes, myHash) // Distribute keys on this server to other servers if len(peerHashes) >= 5 { for id, user := range users { phase := time.Now().Unix() / 600 kpos := keyPos(id + "\n" + fmt.Sprint(phase)) if kpos-i < 5 { server := hashToDomain[peerHashes[(kpos-4+len(peerHashes))%len(peerHashes)]] go http.Post(server+"/dht/"+id+"?phase="+fmt.Sprint(phase)+"&direct", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) } kpos = keyPos(id + "\n" + fmt.Sprint(phase+1)) if kpos-i < 5 { server := hashToDomain[peerHashes[(kpos-4+len(peerHashes))%len(peerHashes)]] go http.Post(server+"/dht/"+id+"?phase="+fmt.Sprint(phase+1)+"&direct", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) } } } mu.Unlock() } time.Sleep(5 * time.Second) } } // Clean out old keys from the KVStore every minute func cleanKVStore() { for true { // This locks the mutex for a while which sucks mu.Lock() for key := range kvstore { timestamp, err := strconv.Atoi(strings.Split(key, "\n")[1]) if err != nil || int64(timestamp)+1 < time.Now().Unix()/600 { delete(kvstore, key) } } mu.Unlock() time.Sleep(time.Minute) } } // Redistribute key-value pairs periodically func redistributeKeys() { for true { mu.Lock() for id, user := range users { nextPhase := time.Now().Unix()/600 + 1 if user.phase < nextPhase { go dhtPost(id, fmt.Sprint(nextPhase), false, user.dhtVal) } user.phase = nextPhase persist(id) } mu.Unlock() time.Sleep(time.Duration(rand.Intn(300)) * time.Second) } }