diff --git a/dataflow/states.go b/dataflow/states.go index 8eab162c6..5a2e21cff 100644 --- a/dataflow/states.go +++ b/dataflow/states.go @@ -9,6 +9,7 @@ import ( ) var BlockBodyDownloadStates *States = NewStates(64 * 1024) +var HeaderDownloadStates *States = NewStates(64 * 1024) const ( BlockBodyCleared byte = iota @@ -22,6 +23,17 @@ const ( BlockBodyInDb ) +const ( + HeaderInvalidated byte = iota + HeaderRequested + HeaderSkeletonRequested + HeaderRetryNotReady + HeaderEmpty + HeaderBad + HeaderEvicted + HeaderInserted +) + type SnapshotItem struct { id uint64 state byte diff --git a/diagnostics/header_downloader_stats.go b/diagnostics/header_downloader_stats.go new file mode 100644 index 000000000..946e9d53f --- /dev/null +++ b/diagnostics/header_downloader_stats.go @@ -0,0 +1,35 @@ +package diagnostics + +import ( + "fmt" + "io" + "net/http" + "strconv" + + "github.com/ledgerwatch/erigon/dataflow" +) + +func SetupHeaderDownloadStats() { + http.HandleFunc("/debug/metrics/headers_download", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + writeHeaderDownload(w, r) + }) +} + +func writeHeaderDownload(w io.Writer, r *http.Request) { + if err := r.ParseForm(); err != nil { + fmt.Fprintf(w, "ERROR: parsing arguments: %v\n", err) + return + } + sinceTickStr := r.Form.Get("sincetick") + var tick int64 + if sinceTickStr != "" { + var err error + if tick, err = strconv.ParseInt(sinceTickStr, 10, 64); err != nil { + fmt.Fprintf(w, "ERROR: parsing sincemilli: %v\n", err) + } + } + fmt.Fprintf(w, "SUCCESS\n") + // fmt.Fprintf(w, "%d,%d\n", p2p.ingressTrafficMeter, ) + dataflow.HeaderDownloadStates.ChangesSince(int(tick), w) +} diff --git a/turbo/debug/flags.go b/turbo/debug/flags.go index af357dd26..73447f0e1 100644 --- a/turbo/debug/flags.go +++ b/turbo/debug/flags.go @@ -196,6 +196,7 @@ func Setup(ctx *cli.Context, rootLogger bool) (log.Logger, error) { diagnostics.SetupFlagsAccess(ctx) diagnostics.SetupVersionAccess() diagnostics.SetupBlockBodyDownload() + diagnostics.SetupHeaderDownloadStats() } // pprof server diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index e5cba8b79..2981ce174 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -20,6 +20,7 @@ import ( "github.com/ledgerwatch/log/v3" "golang.org/x/exp/slices" + "github.com/ledgerwatch/erigon/dataflow" "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/common" @@ -391,11 +392,15 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime time.Time) (*HeaderRequ var req *HeaderRequest hd.anchorTree.Ascend(func(anchor *Anchor) bool { if anchor.nextRetryTime.After(currentTime) { + // We are not ready to retry this anchor yet + dataflow.HeaderDownloadStates.AddChange(anchor.blockHeight-1, dataflow.HeaderRetryNotReady) return true } if anchor.timeouts >= 10 { // Ancestors of this anchor seem to be unavailable, invalidate and move on hd.invalidateAnchor(anchor, "suspected unavailability") + // Add header invalidate + dataflow.HeaderDownloadStates.AddChange(anchor.blockHeight-1, dataflow.HeaderInvalidated) penalties = append(penalties, PenaltyItem{Penalty: AbandonedAnchorPenalty, PeerID: anchor.peerID}) return true } @@ -407,6 +412,7 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime time.Time) (*HeaderRequ Skip: 0, Reverse: true, } + // Add header requested return false }) return req, penalties @@ -415,6 +421,7 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime time.Time) (*HeaderRequ func (hd *HeaderDownload) requestMoreHeadersForPOS(currentTime time.Time) (timeout bool, request *HeaderRequest, penalties []PenaltyItem) { anchor := hd.posAnchor if anchor == nil { + dataflow.HeaderDownloadStates.AddChange(anchor.blockHeight-1, dataflow.HeaderEmpty) hd.logger.Debug("[downloader] No PoS anchor") return } @@ -449,6 +456,7 @@ func (hd *HeaderDownload) UpdateStats(req *HeaderRequest, skeleton bool, peer [6 defer hd.lock.Unlock() if skeleton { hd.stats.SkeletonRequests++ + dataflow.HeaderDownloadStates.AddChange(req.Number, dataflow.HeaderSkeletonRequested) if hd.stats.SkeletonReqMinBlock == 0 || req.Number < hd.stats.SkeletonReqMinBlock { hd.stats.SkeletonReqMinBlock = req.Number } @@ -457,6 +465,7 @@ func (hd *HeaderDownload) UpdateStats(req *HeaderRequest, skeleton bool, peer [6 } } else { hd.stats.Requests++ + dataflow.HeaderDownloadStates.AddChange(req.Number, dataflow.HeaderRequested) // We know that req is reverse request, with Skip == 0, therefore comparing Number with reqMax if req.Number > hd.stats.ReqMaxBlock { hd.stats.ReqMaxBlock = req.Number @@ -519,6 +528,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult hd.moveLinkToQueue(link, NoQueue) delete(hd.links, link.hash) hd.removeUpwards(link) + dataflow.HeaderDownloadStates.AddChange(link.blockHeight, dataflow.HeaderBad) hd.logger.Warn("[downloader] Rejected header marked as bad", "hash", link.hash, "height", link.blockHeight) return true, false, 0, lastTime, nil } @@ -534,6 +544,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult hd.moveLinkToQueue(link, NoQueue) delete(hd.links, link.hash) hd.removeUpwards(link) + dataflow.HeaderDownloadStates.AddChange(link.blockHeight, dataflow.HeaderEvicted) return true, false, 0, lastTime, nil } } @@ -560,6 +571,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult if td.Cmp(terminalTotalDifficulty) >= 0 { hd.highestInDb = link.blockHeight log.Info(POSPandaBanner) + dataflow.HeaderDownloadStates.AddChange(link.blockHeight, dataflow.HeaderInserted) return true, true, 0, lastTime, nil } returnTd = td @@ -584,11 +596,13 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult } } if link.blockHeight == hd.latestMinedBlockNumber { + dataflow.HeaderDownloadStates.AddChange(link.blockHeight, dataflow.HeaderInserted) return false, true, 0, lastTime, nil } } for hd.persistedLinkQueue.Len() > hd.persistedLinkLimit { link := heap.Pop(&hd.persistedLinkQueue).(*Link) + dataflow.HeaderDownloadStates.AddChange(link.blockHeight, dataflow.HeaderEvicted) delete(hd.links, link.hash) for child := link.fChild; child != nil; child, child.next = child.next, nil { }