aboutsummaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
authorAnthony Wang2023-05-11 22:17:20 -0400
committerAnthony Wang2023-05-11 22:17:20 -0400
commitcdd295e371d78f02a2051044e8e4417f79b378dd (patch)
treedb4f086ea41d3c23d6da9c0ee52e910c86a55c1a /server
parentd87d4d6492bb3e325b4ea1b3d60677657465b650 (diff)
Almost done with DHT
Diffstat (limited to 'server')
-rw-r--r--server/dht.go111
-rw-r--r--server/main.go55
-rw-r--r--server/user.go58
3 files changed, 172 insertions, 52 deletions
diff --git a/server/dht.go b/server/dht.go
index 8292280..c3e984b 100644
--- a/server/dht.go
+++ b/server/dht.go
@@ -2,12 +2,14 @@ package main
import (
"bytes"
+ "crypto/ed25519"
"crypto/sha256"
"encoding/base64"
"errors"
"fmt"
"io"
"log"
+ "math/rand"
"net/http"
"sort"
"strconv"
@@ -16,6 +18,12 @@ import (
"time"
)
+var myHash string
+var myPos int
+var hashToDomain map[string]string
+var peerHashes []string
+var kvstore map[string][]byte
+
// Get the sha256sum of string as a URL-safe unpadded base64 string
func sha256sum(s string) string {
b := sha256.Sum256([]byte(s))
@@ -54,7 +62,10 @@ func addPeer(peer string) error {
copy(peerHashes[i+1:], peerHashes[i:])
peerHashes[i] = peerHash
myPos = sort.SearchStrings(peerHashes, me)
+
+ // TODO: redistribute keys
mu.Unlock()
+
// Read response body
body, err := io.ReadAll(resp.Body)
if err != nil {
@@ -91,13 +102,24 @@ func keyPos(key string) int {
return keyPos
}
+// Get the timestamp of this val
+func timestamp(val []byte) int {
+ if len(val) < ed25519.SignatureSize {
+ return 0
+ }
+ message := string(val[:len(val)-ed25519.SignatureSize])
+ timestamp, err := strconv.Atoi(strings.Split(message, "\n")[0])
+ if err != nil {
+ return 0
+ }
+ return timestamp
+}
// Get the value for a key from the DHT
func dhtGet(key string) ([]byte, error) {
keyPos := keyPos(key)
var wg sync.WaitGroup
- var ret []byte
- bestTime := 0
+ var latest []byte
for i := 0; i < 5 && i < len(peerHashes); i++ {
wg.Add(1)
j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]]
@@ -105,37 +127,30 @@ func dhtGet(key string) ([]byte, error) {
defer wg.Done()
resp, err := http.Get(j + "/dht/" + key + "?direct")
if err != nil {
- // TODO: Remove this server from DHT?
- // For sanity reasons this might be a bad idea
- return
- }
- b, err := io.ReadAll(resp.Body)
- if err != nil {
return
}
- err = verify(key, b)
+ val, err := io.ReadAll(resp.Body)
if err != nil {
return
}
- valTime, err := strconv.Atoi(strings.Split(string(b), "\n")[0])
+ err = verify(key, val)
if err != nil {
return
}
- if ret == nil || valTime > bestTime {
- ret = b
- bestTime = valTime
+ if latest == nil || timestamp(val) > timestamp(latest) {
+ latest = val
}
}()
}
wg.Wait()
- if ret == nil {
- return nil, errors.New("id not in kvstore")
+ if latest == nil {
+ return nil, errors.New("key not found in kvstore")
}
- return ret, nil
+ return latest, nil
}
-// Put a key-value pair into the DHT
-func dhtPut(key string, val []byte) error {
+// Post a key-value pair into the DHT
+func dhtPost(key string, val []byte) error {
err := verify(key, val)
if err != nil {
return err
@@ -144,7 +159,7 @@ func dhtPut(key string, val []byte) error {
for i := 0; i < 5 && i < len(peerHashes); i++ {
j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]]
go func() {
- http.Post(j + "/dht/" + key + "?direct", "application/octet-stream", bytes.NewBuffer(val))
+ http.Post(j+"/dht/"+key+"?direct", "application/octet-stream", bytes.NewBuffer(val))
}()
}
return nil
@@ -156,13 +171,14 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
if r.Form.Get("direct") != "" {
// Directly modify kvstore
- if keyPos(key) - myPos >= 5 {
+ if keyPos(key)-myPos >= 5 {
w.WriteHeader(http.StatusNotFound)
return
}
+ phase := time.Now().Unix()/600
if r.Method == "GET" {
mu.Lock()
- val, ok := kvstore[key + fmt.Sprint(time.Now().Unix() / 600)]
+ val, ok := kvstore[key+"\n"+fmt.Sprint(phase)]
mu.Unlock()
if !ok || verify(key, val) != nil {
w.WriteHeader(http.StatusNotFound)
@@ -176,11 +192,14 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) {
return
}
mu.Lock()
- kvstore[key + fmt.Sprint(time.Now().Unix() / 600)] = val
+ // Update key for this phase and next one
+ kvstore[key+"\n"+fmt.Sprint(phase)] = val
+ kvstore[key+"\n"+fmt.Sprint(phase+1)] = val
mu.Unlock()
}
return
}
+
if r.Method == "GET" {
val, err := dhtGet(key)
if err != nil {
@@ -194,7 +213,7 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
return
}
- err = dhtPut(key, val)
+ err = dhtPost(key, val)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
@@ -202,3 +221,49 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
}
+
+// Clean out offline peers
+func cleanPeers() {
+ for true {
+ mu.Lock()
+ peer := hashToDomain[peerHashes[rand.Intn(len(peerHashes))]]
+ mu.Unlock()
+ _, err := http.Get(peer)
+ if err != nil {
+ // Bad response, so remove peer
+ mu.Lock()
+ i := sort.SearchStrings(peerHashes, sha256sum(peer))
+ peerHashes = append(peerHashes[:i], peerHashes[i+1:]...)
+ myPos = sort.SearchStrings(peerHashes, me)
+ // TODO: redistribute keys
+ mu.Unlock()
+ }
+ time.Sleep(5 * time.Second)
+ }
+}
+
+// Clean out old keys from the KVStore every minute
+func cleanKVStore() {
+ for true {
+ // This locks the mutex for a while which sucks
+ mu.Lock()
+ for key := range kvstore {
+ timestamp, err := strconv.Atoi(strings.Split(key, "\n")[1])
+ if err != nil || int64(timestamp)+1 < time.Now().Unix()/600 {
+ delete(kvstore, key)
+ }
+ }
+ mu.Unlock()
+ time.Sleep(time.Minute)
+ }
+}
+
+// Redistribute key-value pairs periodically
+func redistributeKeys() {
+ for true {
+ for id, user := range users {
+ dhtPost(id, user.dhtVal)
+ }
+ time.Sleep(time.Duration(rand.Intn(300)) * time.Second)
+ }
+}
diff --git a/server/main.go b/server/main.go
index 3ad3060..b4fc654 100644
--- a/server/main.go
+++ b/server/main.go
@@ -1,49 +1,62 @@
package main
import (
- "crypto/ed25519"
- "crypto/sha256"
- "encoding/base64"
+ "encoding/gob"
"flag"
+ "fmt"
"log"
"net/http"
+ "os"
"sync"
)
-type user struct {
- pubkey []byte
- servers []string
-}
-
var mu sync.Mutex
+var bindAddr string
var me string
-var myHash string
-var myPos int
-var hashToDomain map[string]string
-var peerHashes []string
-var kvstore map[string][]byte
+var initialPeer string
+var dataDir string
func main() {
- bindAddr := flag.String("b", ":4200", "bind address")
- publicURL := flag.String("u", "http://localhost:4200", "public URL")
- peer := flag.String("i", "", "initial peer")
+ flag.StringVar(&bindAddr, "b", ":4200", "bind address")
+ flag.StringVar(&me, "u", "http://localhost:4200", "public URL")
+ flag.StringVar(&initialPeer, "i", "", "initial peer")
+ flag.StringVar(&dataDir, "d", ".", "data directory")
flag.Parse()
- log.Printf("Starting %s %s %s", *bindAddr, *publicURL, *peer)
+ log.Printf("Starting %s %s %s %s", bindAddr, me, initialPeer, dataDir)
// Record myself
- me = *publicURL
myHash = sha256sum(me)
myPos = 0
peerHashes = append(peerHashes, sha256sum(me))
hashToDomain = map[string]string{peerHashes[0]: me}
- if *peer != "" {
- go addPeer(*peer)
+ if initialPeer != "" {
+ go addPeer(initialPeer)
+ }
+ go cleanPeers()
+ go cleanKVStore()
+
+ // Load user data from disk
+ entries, _ := os.ReadDir(dataDir)
+ for _, entry := range entries {
+ id := entry.Name()
+ reader, err := os.Open(dataDir + "/" + id + "/gob")
+ if err != nil {
+ continue
+ }
+ var user user
+ dec := gob.NewDecoder(reader)
+ dec.Decode(&user)
+ users[id] = user
}
+ http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
+ fmt.Fprintf(w, "Hello! This is a Kela server.")
+ })
http.HandleFunc("/peer", peerHandler)
+ http.HandleFunc("/user", userHandler)
http.HandleFunc("/dht", dhtHandler)
http.HandleFunc("/storage", storageHandler)
http.HandleFunc("/message", messageHandler)
- log.Fatal(http.ListenAndServe(*bindAddr, nil))
+ log.Fatal(http.ListenAndServe(bindAddr, nil))
}
diff --git a/server/user.go b/server/user.go
index 37ceee5..5865c93 100644
--- a/server/user.go
+++ b/server/user.go
@@ -3,10 +3,20 @@ package main
import (
"crypto/ed25519"
"encoding/base64"
+ "encoding/gob"
"errors"
"net/http"
+ "os"
+ "strings"
)
+type user struct {
+ dhtVal []byte
+}
+
+var users map[string]user
+
+// Verify that a body was signed by this ID
func verify(id string, body []byte) error {
b, err := base64.RawURLEncoding.DecodeString(id)
if err != nil {
@@ -23,15 +33,47 @@ func verify(id string, body []byte) error {
return nil
}
-// Create user
-func createHandler(w http.ResponseWriter, r *http.Request) {
- r.ParseForm()
- id := r.Form.Get("id")
- dhtGet(id)
-
+// Persist a user's data to disk
+func persist(id string) {
+ writer, err := os.Open(dataDir + "/" + id + "/gob")
+ if err != nil {
+ return
+ }
+ enc := gob.NewEncoder(writer)
+ enc.Encode(users[id])
}
-// Delete user
-func deleteHandler(w http.ResponseWriter, r *http.Request) {
+// Handle user configuration changes
+func userHandler(w http.ResponseWriter, r *http.Request) {
+ id := r.URL.Fragment[6:]
+ // Resolve ID to server list
+ val, err := dhtGet(id)
+ if err != nil || verify(id, val) != nil {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+ // Check if server list contains this server
+ message := string(val[:len(val)-ed25519.SignatureSize])
+ if !strings.Contains(message, me) {
+ // Delete user if they are no longer associated with this server
+ delete(users, id)
+ err = os.RemoveAll(id)
+ if err != nil {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ return
+ }
+ //valSplit := strings.Split(message, "\n")
+ //servers := valSplit[1:len(valSplit)-1]
+ if _, ok := users[id]; !ok {
+ // Add user
+ users[id] = user{
+ dhtVal: val,
+ }
+ os.Mkdir(id, 755)
+ persist(id)
+ }
}