package main import ( "crypto/ed25519" "fmt" "io" "log" "net/http" "os" "strconv" "strings" "time" ) // Replicate a user's log to another server func replicate(id, s string) { log.Printf("Starting replication for %s %s", id, s) for true { mu.Lock() // Make sure that this server is still the primary for this user user, ok := users[id] if !ok { mu.Unlock() return } if me != user.servers[0] { user.nextIndex = nil mu.Unlock() return } // Make sure that the target server is still associated with this user idx, ok := user.nextIndex[s] if !ok { mu.Unlock() return } if idx == len(user.log) { // Up to date mu.Unlock() time.Sleep(50 * time.Millisecond) continue } op := user.log[idx] mu.Unlock() file, _ := os.Open(dataDir + "/" + id + "/" + op) resp, err := http.Post(s+"/storage/"+id+"/"+op+"?idx="+fmt.Sprint(idx), "application/octet-stream", file) if err != nil { time.Sleep(50 * time.Millisecond) continue } b, err := io.ReadAll(resp.Body) if err != nil { time.Sleep(50 * time.Millisecond) continue } mu.Lock() user.nextIndex[s], _ = strconv.Atoi(string(b)) mu.Unlock() } } // Handle storage requests func storageHandler(w http.ResponseWriter, r *http.Request) { pathSplit := strings.Split(r.URL.Path, "/") id := pathSplit[2] filename := pathSplit[3] r.ParseForm() if r.Method == "GET" { if r.Form.Has("direct") { // Directly read and respond with file file, err := os.ReadFile(dataDir + "/" + id + "/" + filename) if err != nil { log.Print(err) w.WriteHeader(http.StatusNotFound) return } w.Write(file) return } val := dhtGet(id, false) err := verify(id, val) if err != nil { verify(id, val) w.WriteHeader(http.StatusNotFound) return } if _, ok := users[id]; ok { reconfigure(id, val) } servers := strings.Split(string(val[8:len(val)-ed25519.SignatureSize]), "\n") if servers[0] == me { file, err := os.ReadFile(dataDir + "/" + id + "/" + filename) if err != nil { log.Print(err) w.WriteHeader(http.StatusNotFound) return } w.Write(file) return } for _, server := range servers { resp, err := http.Get(server + "/storage/" + id + "/" + filename) if err != nil { continue } b, err := io.ReadAll(resp.Body) if err != nil { continue } w.Write(b) return } w.WriteHeader(http.StatusNotFound) } else if r.Method == "POST" { mu.Lock() defer mu.Unlock() user, ok := users[id] if !ok { w.WriteHeader(http.StatusNotFound) return } b, err := io.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusInternalServerError) return } err = verify(id, b) if err != nil { log.Print(err) w.WriteHeader(http.StatusUnauthorized) return } if r.Form.Has("idx") { idx, err := strconv.Atoi(r.Form.Get("idx")) if err != nil { w.WriteHeader(http.StatusInternalServerError) return } if idx > len(user.log) { // Missing log entries w.Write([]byte(fmt.Sprint(len(user.log)))) return } if idx < len(user.log) { // Too many log entries ops := make(map[string]interface{}) for i := idx; i < len(user.log); i++ { ops[user.log[i]] = nil } for op := range ops { // Fetch older version of file resp, err := http.Get(user.servers[0] + "/storage/" + id + "/" + op) if err != nil { log.Print(err) w.WriteHeader(http.StatusInternalServerError) return } b, err := io.ReadAll(resp.Body) if err != nil { log.Print(err) w.WriteHeader(http.StatusInternalServerError) return } err = os.WriteFile(dataDir+"/"+id+"/"+op, b, 0644) if err != nil { log.Print(err) w.WriteHeader(http.StatusInternalServerError) return } } user.log = user.log[:idx] } } err = os.WriteFile(dataDir+"/"+id+"/"+filename, b, 0644) if err != nil { w.WriteHeader(http.StatusInternalServerError) return } user.log = append(user.log, filename) w.Write([]byte(fmt.Sprint(len(user.log)))) } }