erigon-pulse/cmd/snapshots/tracker/commands/root.go

396 lines
10 KiB
Go

package commands
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/tracker"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/ethdb/kv"
"github.com/ledgerwatch/erigon/ethdb/mdbx"
"github.com/ledgerwatch/erigon/internal/debug"
"github.com/ledgerwatch/erigon/log"
"github.com/spf13/cobra"
)
const DefaultInterval = 60 //in seconds
const SoftLimit = 5 //in seconds
const DisconnectInterval = time.Minute //in seconds
var trackerID = "erigon snapshot tracker"
func init() {
utils.CobraFlags(rootCmd, append(debug.Flags, utils.MetricFlags...))
}
func Execute() {
if err := rootCmd.ExecuteContext(rootContext()); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
func rootContext() context.Context {
ctx, cancel := context.WithCancel(context.Background())
go func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(ch)
select {
case <-ch:
log.Info("Got interrupt, shutting down...")
case <-ctx.Done():
}
cancel()
}()
return ctx
}
var rootCmd = &cobra.Command{
Use: "start",
Short: "start tracker",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
if err := debug.SetupCobra(cmd); err != nil {
panic(err)
}
},
PersistentPostRun: func(cmd *cobra.Command, args []string) {
debug.Exit()
},
Args: cobra.ExactArgs(1),
ArgAliases: []string{"snapshots dir"},
RunE: func(cmd *cobra.Command, args []string) error {
db := mdbx.MustOpen(args[0])
m := http.NewServeMux()
m.Handle("/announce", &Tracker{db: db})
m.HandleFunc("/scrape", func(writer http.ResponseWriter, request *http.Request) {
log.Warn("scrape", "url", request.RequestURI)
ih := request.URL.Query().Get("info_hash")
if len(ih) != 20 {
log.Error("wronng infohash", "ih", ih, "l", len(ih))
WriteResp(writer, ErrResponse{FailureReason: "incorrect infohash"}, false)
return
}
resp := ScrapeResponse{Files: map[string]*ScrapeData{
ih: {},
}}
err := db.View(context.Background(), func(tx kv.Tx) error {
c, err := tx.Cursor(kv.SnapshotInfo)
if err != nil {
return err
}
defer c.Close()
return ethdb.Walk(c, append([]byte(ih), make([]byte, 20)...), 20*8, func(k, v []byte) (bool, error) {
a := AnnounceReqWithTime{}
err := json.Unmarshal(v, &a)
if err != nil {
log.Error("Fail to unmarshall", "k", common.Bytes2Hex(k), "err", err)
//skip failed
return true, nil
}
if time.Since(a.UpdatedAt) > 24*time.Hour {
log.Debug("Skipped", "k", common.Bytes2Hex(k), "last updated", a.UpdatedAt)
return true, nil
}
if a.Left == 0 {
resp.Files[ih].Downloaded++
resp.Files[ih].Complete++
} else {
resp.Files[ih].Incomplete++
}
return true, nil
})
})
if err != nil {
log.Error("Walk", "err", err)
WriteResp(writer, ErrResponse{FailureReason: err.Error()}, false)
return
}
jsonResp, err := json.Marshal(resp)
if err == nil {
log.Info("scrape resp", "v", string(jsonResp))
} else {
log.Info("marshall scrape resp", "err", err)
}
WriteResp(writer, resp, false)
})
m.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
log.Warn("404", "url", request.RequestURI)
})
log.Info("Listen1")
go func() {
err := http.ListenAndServe(":80", m)
log.Error("error", "err", err)
}()
<-cmd.Context().Done()
return nil
},
}
type Tracker struct {
db kv.RwDB
}
/*
/announce?compact=1
&downloaded=0
&event="started"
&info_hash=D%22%5C%80%F7%FD%12Z%EA%9B%F0%A5z%DA%AF%1F%A4%E1je
&left=0
&peer_id=-GT0002-9%EA%FB+%BF%B3%AD%DE%8Ae%D0%B7
&port=53631
&supportcrypto=1
&uploaded=0"
*/
type AnnounceReqWithTime struct {
AnnounceReq
UpdatedAt time.Time
}
type AnnounceReq struct {
InfoHash []byte
PeerID []byte
RemoteAddr net.IP
Port int
Event string
Uploaded int64
Downloaded int64
SupportCrypto bool
Left int64
Compact bool
}
type Peer struct {
IP string
Port int
PeerID []byte
}
func (t *Tracker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Info("call", "url", r.RequestURI)
req, err := ParseRequest(r)
if err != nil {
log.Error("Parse request", "err", err)
WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact)
return
}
if err = ValidateReq(req); err != nil {
log.Error("Validate failed", "err", err)
WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact)
return
}
toSave := AnnounceReqWithTime{
req,
time.Now(),
}
peerBytes, err := json.Marshal(toSave)
if err != nil {
log.Error("Json marshal", "err", err)
WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact)
return
}
key := append(req.InfoHash, req.PeerID...)
if req.Event == tracker.Stopped.String() {
err = t.db.Update(context.Background(), func(tx kv.RwTx) error {
return tx.Delete(kv.SnapshotInfo, key, nil)
})
if err != nil {
log.Error("Json marshal", "err", err)
WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact)
return
}
} else {
var prevBytes []byte
err = t.db.View(context.Background(), func(tx kv.Tx) error {
prevBytes, err = tx.GetOne(kv.SnapshotInfo, key)
return err
})
if err != nil {
log.Error("get from db is return error", "err", err)
WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact)
return
}
if prevBytes == nil {
return
}
prev := new(AnnounceReqWithTime)
err = json.Unmarshal(prevBytes, prev)
if err != nil {
log.Error("Unable to unmarshall", "err", err)
}
if time.Since(prev.UpdatedAt) < time.Second*SoftLimit {
//too early to update
WriteResp(w, ErrResponse{FailureReason: "too early to update"}, req.Compact)
return
}
if err = t.db.Update(context.Background(), func(tx kv.RwTx) error {
return tx.Put(kv.SnapshotInfo, key, peerBytes)
}); err != nil {
log.Error("db.Put", "err", err)
WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact)
return
}
}
resp := HttpResponse{
Interval: DefaultInterval,
TrackerId: trackerID,
}
if err := t.db.View(context.Background(), func(tx kv.Tx) error {
return tx.ForPrefix(kv.SnapshotInfo, append(req.InfoHash, make([]byte, 20)...), func(k, v []byte) error {
a := AnnounceReqWithTime{}
err = json.Unmarshal(v, &a)
if err != nil {
log.Error("Fail to unmarshall", "k", common.Bytes2Hex(k), "err", err)
//skip failed
return nil
}
if time.Since(a.UpdatedAt) > 5*DisconnectInterval {
log.Debug("Skipped requset", "peer", common.Bytes2Hex(a.PeerID), "last updated", a.UpdatedAt, "now", time.Now())
return nil
}
if a.Left == 0 {
resp.Complete++
} else {
resp.Incomplete++
}
resp.Peers = append(resp.Peers, map[string]interface{}{
"ip": a.RemoteAddr.String(),
"peer id": a.PeerID,
"port": a.Port,
})
return nil
})
}); err != nil {
log.Error("Walk", "err", err)
WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact)
return
}
jsonResp, err := json.Marshal(resp)
if err == nil {
log.Info("announce resp", "v", string(jsonResp))
} else {
log.Info("marshall announce resp", "err", err)
}
WriteResp(w, resp, req.Compact)
}
func WriteResp(w http.ResponseWriter, res interface{}, compact bool) {
if _, ok := res.(ErrResponse); ok {
log.Error("Err", "err", res)
}
if compact {
err := bencode.NewEncoder(w).Encode(res)
if err != nil {
log.Error("Bencode encode", "err", err)
}
} else {
err := json.NewEncoder(w).Encode(res)
if err != nil {
log.Error("Json marshal", "err", err)
return
}
}
}
func ParseRequest(r *http.Request) (AnnounceReq, error) {
q := r.URL.Query()
var remoteAddr net.IP
if strings.Contains(r.RemoteAddr, ":") {
remoteAddr = net.ParseIP(strings.Split(r.RemoteAddr, ":")[0])
} else {
remoteAddr = net.ParseIP(r.RemoteAddr)
}
downloaded, err := strconv.ParseInt(q.Get("downloaded"), 10, 64)
if err != nil {
log.Warn("downloaded", "err", err)
return AnnounceReq{}, fmt.Errorf("downloaded %v - %w", q.Get("downloaded"), err)
}
uploaded, err := strconv.ParseInt(q.Get("uploaded"), 10, 64)
if err != nil {
log.Warn("uploaded", "err", err)
return AnnounceReq{}, fmt.Errorf("uploaded %v - %w", q.Get("uploaded"), err)
}
left, err := strconv.ParseInt(q.Get("left"), 10, 64)
if err != nil {
log.Warn("left", "err", err)
return AnnounceReq{}, fmt.Errorf("left: %v - %w", q.Get("left"), err)
}
port, err := strconv.Atoi(q.Get("port"))
if err != nil {
return AnnounceReq{}, fmt.Errorf("port: %v - %w", q.Get("port"), err)
}
res := AnnounceReq{
InfoHash: []byte(q.Get("info_hash")),
PeerID: []byte(q.Get("peer_id")),
RemoteAddr: remoteAddr,
Event: q.Get("event"),
Compact: q.Get("compact") == "1",
SupportCrypto: q.Get("supportcrypto") == "1",
Downloaded: downloaded,
Uploaded: uploaded,
Left: left,
Port: port,
}
return res, nil
}
func ValidateReq(req AnnounceReq) error {
if len(req.InfoHash) != 20 {
return errors.New("invalid infohash")
}
if len(req.PeerID) != 20 {
return errors.New("invalid peer id")
}
if req.Port == 0 {
return errors.New("invalid port")
}
return nil
}
type HttpResponse struct {
Interval int32 `bencode:"interval" json:"interval"`
TrackerId string `bencode:"tracker id" json:"tracker_id"`
Complete int32 `bencode:"complete" json:"complete"`
Incomplete int32 `bencode:"incomplete" json:"incomplete"`
Peers []map[string]interface{} `bencode:"peers" json:"peers"`
}
type ErrResponse struct {
FailureReason string `bencode:"failure reason" json:"failure_reason"`
}
type ScrapeResponse struct {
Files map[string]*ScrapeData `json:"files" bencode:"files"`
}
type ScrapeData struct {
Complete int32 `bencode:"complete" json:"complete"`
Downloaded int32 `json:"downloaded" bencode:"downloaded"`
Incomplete int32 `json:"incomplete" bencode:"incomplete"`
}