aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnthony Wang2023-05-11 16:31:57 -0400
committerAnthony Wang2023-05-11 16:31:57 -0400
commitd87d4d6492bb3e325b4ea1b3d60677657465b650 (patch)
treefdb39a6e5ee1ef95bb5c93cc6de0a3c40e21e341
parent45e65685417df707eaa128d5fad61f230615092f (diff)
Almost done with DHT
-rw-r--r--README.md2
-rw-r--r--server/dht.go167
-rw-r--r--server/main.go18
-rw-r--r--server/user.go37
4 files changed, 156 insertions, 68 deletions
diff --git a/README.md b/README.md
index 78f3e96..e94e710 100644
--- a/README.md
+++ b/README.md
@@ -16,7 +16,7 @@ Alright, let's solve all those problems above with Kela! Kela consists of three
### Name resolution service
-In Kela, each user has an ID, which is a public key. Each user is associated with one or more Kela servers, which store that user's data. To find out which servers a user is associated with, you can query the name resolution system. All Kela servers participate in the name resolution system and act as DHT nodes. Each server stores a complete list of all DHT nodes. When a new server joins the DHT, it tries to peer with an existing server in the DHT. Say server `example.com` would like to peer with `test.net`. `example.com` first sends a GET request to `test.net/peer?peer=example.com`. `test.net` replies with its list of DHT nodes. Once `example.com` receives this reply, it adds `test.net` to its list of DHT nodes and attempts to peer with all servers in the reply that it hasn't peered with yet. `test.net` now also tries to peer with the server that just contacted it, in this case `example.com`. Servers periodically go through their list of DHT nodes and remove nodes that are no longer online.
+In Kela, each user has an ID, which is an Ed5519 public key encoded in URL-safe Base64. Each user is associated with one or more Kela servers, which store that user's data. To find out which servers a user is associated with, you can query the name resolution system. All Kela servers participate in the name resolution system and act as DHT nodes. Each server stores a complete list of all DHT nodes. When a new server joins the DHT, it tries to peer with an existing server in the DHT. Say server `example.com` would like to peer with `test.net`. `example.com` first sends a GET request to `test.net/peer?peer=example.com`. `test.net` replies with its list of DHT nodes. Once `example.com` receives this reply, it adds `test.net` to its list of DHT nodes and attempts to peer with all servers in the reply that it hasn't peered with yet. `test.net` now also tries to peer with the server that just contacted it, in this case `example.com`. Servers periodically go through their list of DHT nodes and remove nodes that are no longer online.
The DHT stores key-value pairs. The key consists of a user's public key and timestamp (the current Unix time in seconds divided by 600, rounded down). The value consists of a timestamp (the current Unix time in seconds), a list of servers that the user is associated with, where the first server is their primary server, and a signature. A key-value pair is assigned to the 5 servers with smallest SHA-256 hashes of their domain name greater than the SHA-256 hash of the key. The purpose of the elaborate timestamp in the key is to ensure that the set of servers assigned to a key-value pair rotates every 600 seconds so an attacker must control a very large portion of the DHT to do a denial-of-service attack against a specific key-value pair. When servers join and leave the DHT, the servers that a user is associated with will ensure that that user's key-value pair is assigned to a new server if necessary to ensure that 5 servers store that key-value pair. The DHT supports two operations, get and put. For put operations, the server checks the signature to ensure the validity of the request. When a server receives either of these two operations, it computes the SHA-256 hash of the key and checks if it is supposed to store that key-value pair or not. If it is supposed to store that key-value pair, it performs the operation on that pair. Otherwise, the server will contact in parallel the 5 servers that store this key-value pair. If the operation is a get, the server will look at the 5 replies and return the value with the most recent timestamp. If the operation is a put, and one of the 5 parallel requests fails, the server will remove that offline server from its DHT node list and assign a new server to this key-value pair to replace the offline one. Each server periodically goes through its stored key-value pairs and deletes old ones.
diff --git a/server/dht.go b/server/dht.go
index e5d96f1..8292280 100644
--- a/server/dht.go
+++ b/server/dht.go
@@ -1,16 +1,27 @@
package main
import (
- "crypto/ed25519"
+ "bytes"
+ "crypto/sha256"
"encoding/base64"
+ "errors"
"fmt"
"io"
"log"
"net/http"
"sort"
+ "strconv"
"strings"
+ "sync"
+ "time"
)
+// Get the sha256sum of string as a URL-safe unpadded base64 string
+func sha256sum(s string) string {
+ b := sha256.Sum256([]byte(s))
+ return base64.RawURLEncoding.EncodeToString(b[:])
+}
+
// Try to peer with another server
func addPeer(peer string) error {
peerHash := sha256sum(peer)
@@ -71,69 +82,123 @@ func peerHandler(w http.ResponseWriter, r *http.Request) {
go addPeer(peer)
}
-// Handle DHT requests
-func dhtHandler(w http.ResponseWriter, r *http.Request) {
- key := r.URL.String()[5:]
- keyHash := sha256sum(key)
- pubKey := asPubKey(key)
- fmt.Println(key, keyHash, pubKey)
- mu.Lock()
- keyPos := sort.SearchStrings(peerHashes, keyHash)
+// Find the position of a key in the DHT
+func keyPos(key string) int {
+ keyPos := sort.SearchStrings(peerHashes, sha256sum(key))
if keyPos < myPos {
keyPos += len(peerHashes)
}
- mu.Unlock()
- if r.Method == "GET" {
- if keyPos - myPos < 5 {
- mu.Lock()
- if val, ok := kvstore[key]; ok {
- w.Write([]byte(val))
- } else {
- w.WriteHeader(http.StatusNotFound)
- }
- mu.Unlock()
- } else {
- for i := 0; i < 5 && i < len(peerHashes); i++ {
- j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]]
- go func() string {
- resp, err := http.Get(j + r.URL.String())
- if err != nil {
- return ""
- }
- b, err := io.ReadAll(resp.Body)
- if err != nil {
- return ""
- }
- return string(b)
- }()
+ return keyPos
+}
+
+// Get the value for a key from the DHT
+func dhtGet(key string) ([]byte, error) {
+ keyPos := keyPos(key)
+ var wg sync.WaitGroup
+ var ret []byte
+ bestTime := 0
+ for i := 0; i < 5 && i < len(peerHashes); i++ {
+ wg.Add(1)
+ j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]]
+ go func() {
+ defer wg.Done()
+ resp, err := http.Get(j + "/dht/" + key + "?direct")
+ if err != nil {
+ // TODO: Remove this server from DHT?
+ // For sanity reasons this might be a bad idea
+ return
}
+ b, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return
+ }
+ err = verify(key, b)
+ if err != nil {
+ return
+ }
+ valTime, err := strconv.Atoi(strings.Split(string(b), "\n")[0])
+ if err != nil {
+ return
+ }
+ if ret == nil || valTime > bestTime {
+ ret = b
+ bestTime = valTime
+ }
+ }()
+ }
+ wg.Wait()
+ if ret == nil {
+ return nil, errors.New("id not in kvstore")
+ }
+ return ret, nil
+}
+// Put a key-value pair into the DHT
+func dhtPut(key string, val []byte) error {
+ err := verify(key, val)
+ if err != nil {
+ return err
+ }
+ keyPos := keyPos(key)
+ for i := 0; i < 5 && i < len(peerHashes); i++ {
+ j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]]
+ go func() {
+ http.Post(j + "/dht/" + key + "?direct", "application/octet-stream", bytes.NewBuffer(val))
+ }()
+ }
+ return nil
+}
+// Handle DHT requests
+func dhtHandler(w http.ResponseWriter, r *http.Request) {
+ key := r.URL.Path[5:]
+ r.ParseForm()
+ if r.Form.Get("direct") != "" {
+ // Directly modify kvstore
+ if keyPos(key) - myPos >= 5 {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+ if r.Method == "GET" {
+ mu.Lock()
+ val, ok := kvstore[key + fmt.Sprint(time.Now().Unix() / 600)]
+ mu.Unlock()
+ if !ok || verify(key, val) != nil {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+ w.Write(val)
+ } else if r.Method == "POST" {
+ val, err := io.ReadAll(r.Body)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+ mu.Lock()
+ kvstore[key + fmt.Sprint(time.Now().Unix() / 600)] = val
+ mu.Unlock()
}
- } else if r.Method == "PUT" {
- // Read request body
- b, err := io.ReadAll(r.Body)
+ return
+ }
+ if r.Method == "GET" {
+ val, err := dhtGet(key)
if err != nil {
- w.WriteHeader(http.StatusInternalServerError)
+ w.WriteHeader(http.StatusNotFound)
return
}
- // Extract signature
- valSplit := strings.Split(string(b), "\n")
- sig := valSplit[len(valSplit)-1]
- // Verify signature
- if !ed25519.Verify(pubKey, b[:len(b)-len(sig)-1], []byte(sig)) {
- w.WriteHeader(http.StatusUnauthorized)
+ w.Write([]byte(val))
+ } else if r.Method == "POST" {
+ val, err := io.ReadAll(r.Body)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
return
}
- if keyPos - myPos < 5 {
- mu.Lock()
- kvstore[key] = string(b[:len(b)-len(sig)-1])
- mu.Unlock()
- } else {
-
+ err = dhtPut(key, val)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ return
}
- } else {
- w.WriteHeader(http.StatusMethodNotAllowed)
+ w.WriteHeader(http.StatusOK)
}
}
diff --git a/server/main.go b/server/main.go
index c0f7450..3ad3060 100644
--- a/server/main.go
+++ b/server/main.go
@@ -12,6 +12,7 @@ import (
type user struct {
pubkey []byte
+ servers []string
}
var mu sync.Mutex
@@ -20,22 +21,7 @@ var myHash string
var myPos int
var hashToDomain map[string]string
var peerHashes []string
-var kvstore map[string]string
-
-// Get the sha256sum of string as a URL-safe unpadded base64 string
-func sha256sum(s string) string {
- b := sha256.Sum256([]byte(s))
- return base64.RawURLEncoding.EncodeToString(b[:])
-}
-
-// Decode an ID to a public key
-func asPubKey(s string) ed25519.PublicKey {
- b, err := base64.RawURLEncoding.DecodeString(s)
- if err != nil {
- return nil
- }
- return ed25519.PublicKey(b)
-}
+var kvstore map[string][]byte
func main() {
bindAddr := flag.String("b", ":4200", "bind address")
diff --git a/server/user.go b/server/user.go
new file mode 100644
index 0000000..37ceee5
--- /dev/null
+++ b/server/user.go
@@ -0,0 +1,37 @@
+package main
+
+import (
+ "crypto/ed25519"
+ "encoding/base64"
+ "errors"
+ "net/http"
+)
+
+func verify(id string, body []byte) error {
+ b, err := base64.RawURLEncoding.DecodeString(id)
+ if err != nil {
+ return err
+ }
+ if len(body) < ed25519.SignatureSize {
+ return errors.New("body too short")
+ }
+ message := body[:len(body)-ed25519.SignatureSize]
+ sig := body[len(body)-ed25519.SignatureSize:]
+ if !ed25519.Verify(ed25519.PublicKey(b), message, sig) {
+ return errors.New("signature verification failed")
+ }
+ return nil
+}
+
+// Create user
+func createHandler(w http.ResponseWriter, r *http.Request) {
+ r.ParseForm()
+ id := r.Form.Get("id")
+ dhtGet(id)
+
+}
+
+// Delete user
+func deleteHandler(w http.ResponseWriter, r *http.Request) {
+
+}