diff --git a/cmd/headers/download/downloader.go b/cmd/headers/download/downloader.go index bdf8c362d..6de0f47c7 100644 --- a/cmd/headers/download/downloader.go +++ b/cmd/headers/download/downloader.go @@ -507,9 +507,22 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, req *proto_sentry if segments, penalty, err := cs.hd.SplitIntoSegments(headersRaw, headers); err == nil { if penalty == headerdownload.NoPenalty { + var canRequestMore bool for _, segment := range segments { - cs.hd.ProcessSegment(segment, false /* newBlock */) + requestMore := cs.hd.ProcessSegment(segment, false /* newBlock */) + canRequestMore = canRequestMore || requestMore } + + if canRequestMore { + currentTime := uint64(time.Now().Unix()) + if req := cs.hd.RequestMoreHeaders(currentTime); req != nil { + if peer := cs.sendHeaderRequest(ctx, req); peer != nil { + cs.hd.SentRequest(req, currentTime, 5 /* timeout */) + log.Debug("Sent request", "height", req.Number) + } + } + } + } else { outreq := proto_sentry.PenalizePeerRequest{ PeerId: req.PeerId, diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 45e40d3b0..39c9d039f 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -204,7 +204,7 @@ func (hd *HeaderDownload) StageReadyChannel() chan struct{} { // ExtendDown extends some working trees down from the anchor, using given chain segment // it creates a new anchor and collects all the links from the attached anchors to it func (hd *HeaderDownload) extendDown(segment *ChainSegment, start, end int) error { - // Find attachement anchor again + // Find attachment anchor again anchorHeader := segment.Headers[start] if anchor, attaching := hd.anchors[anchorHeader.Hash()]; attaching { anchorPreverified := false @@ -733,8 +733,11 @@ func (hi *HeaderInserter) AnythingDone() bool { return hi.newCanonical } -//nolint:interfacer -func (hd *HeaderDownload) ProcessSegment(segment *ChainSegment, newBlock bool) { +// ProcessSegment - handling single segment. +// If segment were processed by extendDown or newAnchor method, then it returns `requestMore=true` +// it allows higher-level algo immediately request more headers without waiting all stages precessing, +// speeds up visibility of new blocks +func (hd *HeaderDownload) ProcessSegment(segment *ChainSegment, newBlock bool) (requestMore bool) { log.Debug("processSegment", "from", segment.Headers[0].Number.Uint64(), "to", segment.Headers[len(segment.Headers)-1].Number.Uint64()) hd.lock.Lock() defer hd.lock.Unlock() @@ -767,6 +770,7 @@ func (hd *HeaderDownload) ProcessSegment(segment *ChainSegment, newBlock bool) { log.Error("ExtendDown failed", "error", err) return } + requestMore = true log.Debug("Extended Down", "start", startNum, "end", endNum) } } else if foundTip { @@ -784,6 +788,7 @@ func (hd *HeaderDownload) ProcessSegment(segment *ChainSegment, newBlock bool) { log.Error("NewAnchor failed", "error", err) return } + requestMore = true log.Debug("NewAnchor", "start", startNum, "end", endNum) } //log.Info(hd.anchorState()) @@ -819,6 +824,8 @@ func (hd *HeaderDownload) ProcessSegment(segment *ChainSegment, newBlock bool) { } } } + + return requestMore } func (hd *HeaderDownload) TopSeenHeight() uint64 {