From ec08a284a6a01a4147430a2b3ab8ba619d1d1f51 Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Tue, 17 May 2022 19:31:17 +0100 Subject: [PATCH] Reduce insertQueue usage, throttle Skeleton requests (#4185) * Reduce insertQueue usage, throttle Skeleton requests * Cleanup Co-authored-by: Alexey Sharp --- eth/stagedsync/stage_headers.go | 26 ++++++++++++--------- turbo/stages/headerdownload/header_algos.go | 18 ++++++-------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index a1033f338..d6c0afd7a 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -709,6 +709,7 @@ func HeadersPOW( var skeletonReqMin, skeletonReqMax, reqMin, reqMax uint64 // min and max block height for skeleton and non-skeleton requests var noProgressCounter int var wasProgress bool + var lastSkeletonTime time.Time Loop: for !stopped { @@ -764,17 +765,20 @@ Loop: } // Send skeleton request if required - req = cfg.hd.RequestSkeleton() - if req != nil { - _, sentToPeer = cfg.headerReqSend(ctx, req) - if sentToPeer { - log.Trace("Sent skeleton request", "height", req.Number) - skeletonReqCount++ - if skeletonReqMin == 0 || req.Number < skeletonReqMin { - skeletonReqMin = req.Number - } - if req.Number+req.Length*req.Skip > skeletonReqMax { - skeletonReqMax = req.Number + req.Length*req.Skip + if time.Since(lastSkeletonTime) > 1*time.Second { + req = cfg.hd.RequestSkeleton() + if req != nil { + _, sentToPeer = cfg.headerReqSend(ctx, req) + if sentToPeer { + log.Trace("Sent skeleton request", "height", req.Number) + skeletonReqCount++ + if skeletonReqMin == 0 || req.Number < skeletonReqMin { + skeletonReqMin = req.Number + } + if req.Number+req.Length*req.Skip > skeletonReqMax { + skeletonReqMax = req.Number + req.Length*req.Skip + } + lastSkeletonTime = time.Now() } } } diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 3a91d3130..5c7bd1dde 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -501,7 +501,7 @@ func (hd *HeaderDownload) InsertHeaders(hf FeedHeaderFunc, terminalTotalDifficul // Make sure long insertions do not appear as a stuck stage 1 select { case <-logChannel: - log.Info(fmt.Sprintf("[%s] Inserting headers", logPrefix), "progress", hd.highestInDb) + log.Info(fmt.Sprintf("[%s] Inserting headers", logPrefix), "progress", hd.highestInDb, "queue", hd.insertQueue.Len()) default: } td, err := hf(link.header, link.headerRaw, link.hash, link.blockHeight) @@ -530,6 +530,11 @@ func (hd *HeaderDownload) InsertHeaders(hf FeedHeaderFunc, terminalTotalDifficul link.header = nil // Drop header reference to free memory, as we won't need it anymore link.headerRaw = nil hd.moveLinkToQueue(link, PersistedQueueID) + for _, nextLink := range link.next { + if !nextLink.persisted { + hd.moveLinkToQueue(nextLink, InsertQueueID) + } + } } for hd.persistedLinkQueue.Len() > hd.persistedLinkLimit { link := heap.Pop(&hd.persistedLinkQueue).(*Link) @@ -848,14 +853,6 @@ func (hi *HeaderInserter) BestHeaderChanged() bool { return hi.newCanonical } -func (hd *HeaderDownload) recursiveLinked(link *Link) { - for _, n := range link.next { - n.linked = true - hd.moveLinkToQueue(n, InsertQueueID) - hd.recursiveLinked(n) - } -} - func (hd *HeaderDownload) ProcessHeaders(csHeaders []ChainSegmentHeader, newBlock bool, peerID [64]byte) bool { hd.lock.Lock() defer hd.lock.Unlock() @@ -900,10 +897,9 @@ func (hd *HeaderDownload) ProcessHeaders(csHeaders []ChainSegmentHeader, newBloc if foundParent { //fmt.Printf("sh = %d %x, found parent\n", sh.Number, sh.Hash) parent.next = append(parent.next, link) - if parent.linked { + if parent.persisted { link.linked = true hd.moveLinkToQueue(link, InsertQueueID) - hd.recursiveLinked(link) } } else { if sh.Number+params.FullImmutabilityThreshold < hd.highestInDb {