From 706a999178345c66a18dac14d157babd071279a4 Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Sun, 22 Jan 2023 12:28:17 +0000 Subject: [PATCH] Improvements for the BSC stuck header sync (#6653) There are 3 changes: 1. Replace `anchorQueue` with `anchorTree` to be able to always walk the anchors in the order of increasing blockHeights (not possible with the queue) to prioritise making progress on the lowest block heights 2. Not increment `nextRetryTime` if the request was not sent 3. Reduce the strides in skeleton from `8*192` to `192` to reduce reliance of the long series of requests to make progress Co-authored-by: Alex Sharp --- cmd/sentry/sentry/sentry_grpc_server.go | 2 +- cmd/sentry/sentry/sentry_multi_client.go | 13 ++-- eth/stagedsync/stage_headers.go | 23 +++---- turbo/stages/headerdownload/header_algos.go | 61 ++++++++---------- .../headerdownload/header_data_struct.go | 63 +++---------------- 5 files changed, 53 insertions(+), 109 deletions(-) diff --git a/cmd/sentry/sentry/sentry_grpc_server.go b/cmd/sentry/sentry/sentry_grpc_server.go index 550bbd27a..fdac089ce 100644 --- a/cmd/sentry/sentry/sentry_grpc_server.go +++ b/cmd/sentry/sentry/sentry_grpc_server.go @@ -741,7 +741,7 @@ func (ss *GrpcServer) PeerUseless(_ context.Context, req *proto_sentry.PeerUsele peerInfo := ss.getPeer(peerID) if ss.statusData != nil && !ss.statusData.PassivePeers && peerInfo != nil && !peerInfo.peer.Info().Network.Static && !peerInfo.peer.Info().Network.Trusted { ss.removePeer(peerID) - log.Debug("Removed useless peer", "peerId", fmt.Sprintf("%x", peerID), "name", peerInfo.peer.Name()) + log.Debug("Removed useless peer", "peerId", fmt.Sprintf("%x", peerID)[:8], "name", peerInfo.peer.Name()) } return &emptypb.Empty{}, nil } diff --git a/cmd/sentry/sentry/sentry_multi_client.go b/cmd/sentry/sentry/sentry_multi_client.go index 47f34a78c..d170f7dc5 100644 --- a/cmd/sentry/sentry/sentry_multi_client.go +++ b/cmd/sentry/sentry/sentry_multi_client.go @@ -400,7 +400,7 @@ func (cs *MultiClient) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPac if _, err := sentry.PeerUseless(ctx, &outreq, &grpc.EmptyCallOption{}); err != nil { return fmt.Errorf("sending peer useless request: %v", err) } - log.Debug("Requested removal of peer for empty header response", "peerId", fmt.Sprintf("%x", ConvertH512ToPeerID(peerID))) + log.Debug("Requested removal of peer for empty header response", "peerId", fmt.Sprintf("%x", ConvertH512ToPeerID(peerID))[:8]) } // No point processing empty response return nil @@ -410,6 +410,7 @@ func (cs *MultiClient) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPac return fmt.Errorf("decode 2 BlockHeadersPacket66: %w", err) } // Extract headers from the block + //var blockNums []int var highestBlock uint64 csHeaders := make([]headerdownload.ChainSegmentHeader, 0, len(pkt)) for _, header := range pkt { @@ -428,7 +429,10 @@ func (cs *MultiClient) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPac Hash: types.RawRlpHash(hRaw), Number: number, }) + //blockNums = append(blockNums, int(number)) } + //sort.Ints(blockNums) + //log.Debug("Delivered headers", "peer", fmt.Sprintf("%x", ConvertH512ToPeerID(peerID))[:8], "blockNums", fmt.Sprintf("%d", blockNums)) if cs.Hd.POSSync() { sort.Sort(headerdownload.HeadersReverseSort(csHeaders)) // Sorting by reverse order of block heights tx, err := cs.db.BeginRo(ctx) @@ -451,11 +455,10 @@ func (cs *MultiClient) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPac currentTime := time.Now() req, penalties := cs.Hd.RequestMoreHeaders(currentTime) if req != nil { - if _, sentToPeer := cs.SendHeaderRequest(ctx, req); sentToPeer { - cs.Hd.UpdateStats(req, false /* skeleton */) + if peer, sentToPeer := cs.SendHeaderRequest(ctx, req); sentToPeer { + cs.Hd.UpdateStats(req, false /* skeleton */, peer) + cs.Hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */) } - // Regardless of whether request was actually sent to a peer, we update retry time to be 5 seconds in the future - cs.Hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */) } if len(penalties) > 0 { cs.Penalize(ctx, penalties) diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 0d36044d4..208026939 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -783,12 +783,12 @@ func HeadersPOW( headerInserter := headerdownload.NewHeaderInserter(logPrefix, localTd, headerProgress, cfg.blockReader) cfg.hd.SetHeaderReader(&ChainReaderImpl{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader}) - var sentToPeer bool stopped := false prevProgress := headerProgress var noProgressCounter int var wasProgress bool var lastSkeletonTime time.Time + var sentToPeer bool Loop: for !stopped { @@ -806,12 +806,12 @@ Loop: currentTime := time.Now() req, penalties := cfg.hd.RequestMoreHeaders(currentTime) if req != nil { - _, sentToPeer = cfg.headerReqSend(ctx, req) + var peer [64]byte + peer, sentToPeer = cfg.headerReqSend(ctx, req) if sentToPeer { - cfg.hd.UpdateStats(req, false /* skeleton */) + cfg.hd.UpdateStats(req, false /* skeleton */, peer) + cfg.hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */) } - // Regardless of whether request was actually sent to a peer, we update retry time to be 5 seconds in the future - cfg.hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */) } if len(penalties) > 0 { cfg.penalize(ctx, penalties) @@ -820,12 +820,12 @@ Loop: for req != nil && sentToPeer && maxRequests > 0 { req, penalties = cfg.hd.RequestMoreHeaders(currentTime) if req != nil { - _, sentToPeer = cfg.headerReqSend(ctx, req) + var peer [64]byte + peer, sentToPeer = cfg.headerReqSend(ctx, req) if sentToPeer { - cfg.hd.UpdateStats(req, false /* skeleton */) + cfg.hd.UpdateStats(req, false /* skeleton */, peer) + cfg.hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */) } - // Regardless of whether request was actually sent to a peer, we update retry time to be 5 seconds in the future - cfg.hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */) } if len(penalties) > 0 { cfg.penalize(ctx, penalties) @@ -837,9 +837,10 @@ Loop: if time.Since(lastSkeletonTime) > 1*time.Second { req = cfg.hd.RequestSkeleton() if req != nil { - _, sentToPeer = cfg.headerReqSend(ctx, req) + var peer [64]byte + peer, sentToPeer = cfg.headerReqSend(ctx, req) if sentToPeer { - cfg.hd.UpdateStats(req, true /* skeleton */) + cfg.hd.UpdateStats(req, true /* skeleton */, peer) lastSkeletonTime = time.Now() } } diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index b2ce38c6d..153671c8b 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -196,8 +196,7 @@ func (hd *HeaderDownload) MarkAllVerified() { func (hd *HeaderDownload) removeAnchor(anchor *Anchor) { // Anchor is removed from the map, and from the priority queue delete(hd.anchors, anchor.parentHash) - heap.Remove(hd.anchorQueue, anchor.idx) - anchor.idx = -1 + hd.anchorTree.Delete(anchor) } func (hd *HeaderDownload) pruneLinkQueue() { @@ -297,12 +296,11 @@ func (hd *HeaderDownload) logAnchorState() { sb.WriteString(fmt.Sprintf("{%8d", anchor.blockHeight)) sb.WriteString(fmt.Sprintf("-%d links=%d (%s)}", end, len(bs), sbb.String())) sb.WriteString(fmt.Sprintf(" => %x", anchorParent)) - sb.WriteString(fmt.Sprintf(", anchorQueue.idx=%d", anchor.idx)) sb.WriteString(fmt.Sprintf(", next retry in %v", anchor.nextRetryTime.Sub(currentTime))) ss = append(ss, sb.String()) } sort.Strings(ss) - log.Debug("[Downloader] Queue sizes", "anchors", hd.anchorQueue.Len(), "links", hd.linkQueue.Len(), "persisted", hd.persistedLinkQueue.Len()) + log.Debug("[Downloader] Queue sizes", "anchors", hd.anchorTree.Len(), "links", hd.linkQueue.Len(), "persisted", hd.persistedLinkQueue.Len()) for _, s := range ss { log.Debug(s) } @@ -390,32 +388,28 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime time.Time) (*HeaderRequ hd.lock.Lock() defer hd.lock.Unlock() var penalties []PenaltyItem - if hd.anchorQueue.Len() == 0 { - log.Trace("[Downloader] Empty anchor queue") - return nil, penalties - } - for hd.anchorQueue.Len() > 0 { - anchor := (*hd.anchorQueue)[0] - // Only process the anchors for which the nextRetryTime has already come + var req *HeaderRequest + hd.anchorTree.Ascend(func(anchor *Anchor) bool { if anchor.nextRetryTime.After(currentTime) { - return nil, penalties + return true } - if anchor.timeouts < 10 { - // Produce a header request that would extend this anchor (add parent, parent of parent, etc.) - return &HeaderRequest{ - Anchor: anchor, - Hash: anchor.parentHash, - Number: anchor.blockHeight - 1, - Length: 192, - Skip: 0, - Reverse: true, - }, penalties + if anchor.timeouts >= 10 { + // Ancestors of this anchor seem to be unavailable, invalidate and move on + hd.invalidateAnchor(anchor, "suspected unavailability") + penalties = append(penalties, PenaltyItem{Penalty: AbandonedAnchorPenalty, PeerID: anchor.peerID}) + return true } - // Ancestors of this anchor seem to be unavailable, invalidate and move on - hd.invalidateAnchor(anchor, "suspected unavailability") - penalties = append(penalties, PenaltyItem{Penalty: AbandonedAnchorPenalty, PeerID: anchor.peerID}) - } - return nil, penalties + req = &HeaderRequest{ + Anchor: anchor, + Hash: anchor.parentHash, + Number: anchor.blockHeight - 1, + Length: 192, + Skip: 0, + Reverse: true, + } + return false + }) + return req, penalties } func (hd *HeaderDownload) requestMoreHeadersForPOS(currentTime time.Time) (timeout bool, request *HeaderRequest, penalties []PenaltyItem) { @@ -450,7 +444,7 @@ func (hd *HeaderDownload) requestMoreHeadersForPOS(currentTime time.Time) (timeo return } -func (hd *HeaderDownload) UpdateStats(req *HeaderRequest, skeleton bool) { +func (hd *HeaderDownload) UpdateStats(req *HeaderRequest, skeleton bool, peer [64]byte) { hd.lock.Lock() defer hd.lock.Unlock() if skeleton { @@ -473,26 +467,21 @@ func (hd *HeaderDownload) UpdateStats(req *HeaderRequest, skeleton bool) { } } } - + //log.Debug("Header request sent", "req", fmt.Sprintf("%+v", req), "peer", fmt.Sprintf("%x", peer)[:8]) } func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime time.Time, timeout time.Duration) { hd.lock.Lock() defer hd.lock.Unlock() - if req.Anchor.idx == -1 { - // Anchor has already been deleted - return - } req.Anchor.timeouts++ req.Anchor.nextRetryTime = currentTime.Add(timeout) - heap.Fix(hd.anchorQueue, req.Anchor.idx) } func (hd *HeaderDownload) RequestSkeleton() *HeaderRequest { hd.lock.RLock() defer hd.lock.RUnlock() log.Debug("[Downloader] Request skeleton", "anchors", len(hd.anchors), "highestInDb", hd.highestInDb) - stride := uint64(8 * 192) + stride := uint64(192) var length uint64 = 192 // Include one header that we have already, to make sure the responses are not empty and do not get penalised when we are at the tip of the chain from := hd.highestInDb @@ -1017,7 +1006,7 @@ func (hd *HeaderDownload) ProcessHeader(sh ChainSegmentHeader, newBlock bool, pe } anchor.fLink = link hd.anchors[anchor.parentHash] = anchor - heap.Push(hd.anchorQueue, anchor) + hd.anchorTree.ReplaceOrInsert(anchor) return true } return false diff --git a/turbo/stages/headerdownload/header_data_struct.go b/turbo/stages/headerdownload/header_data_struct.go index 6de30d617..a1266990e 100644 --- a/turbo/stages/headerdownload/header_data_struct.go +++ b/turbo/stages/headerdownload/header_data_struct.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/google/btree" lru "github.com/hashicorp/golang-lru" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/etl" @@ -109,55 +110,6 @@ type Anchor struct { blockHeight uint64 nextRetryTime time.Time // Zero when anchor has just been created, otherwise time when anchor needs to be check to see if retry is needed timeouts int // Number of timeout that this anchor has experiences - after certain threshold, it gets invalidated - idx int // Index of the anchor in the queue to be able to modify specific items -} - -// AnchorQueue is a priority queue of anchors that priorises by the time when -// another retry on the extending the anchor needs to be attempted -// Every time anchor's extension is requested, the `nextRetryTime` is reset -// to 5 seconds in the future, and when it expires, and the anchor is still -// retry is made -// It implement heap.Interface to be useable by the standard library `heap` -// as a priority queue (implemented as a binary heap) -// As anchors are moved around in the binary heap, they internally track their -// position in the heap (using `idx` field). This feature allows updating -// the heap (using `Fix` function) in situations when anchor is accessed not -// through the priority queue, but through the map `anchor` in the -// HeaderDownloader type. -type AnchorQueue []*Anchor - -func (aq AnchorQueue) Len() int { - return len(aq) -} - -func (aq AnchorQueue) Less(i, j int) bool { - if aq[i].nextRetryTime == aq[j].nextRetryTime { - // When next retry times are the same, we prioritise low block height anchors - return aq[i].blockHeight < aq[j].blockHeight - } - return aq[i].nextRetryTime.Before(aq[j].nextRetryTime) -} - -func (aq AnchorQueue) Swap(i, j int) { - aq[i], aq[j] = aq[j], aq[i] - aq[i].idx, aq[j].idx = i, j // Restore indices after the swap -} - -func (aq *AnchorQueue) Push(x interface{}) { - // Push and Pop use pointer receivers because they modify the slice's length, - // not just its contents. - x.(*Anchor).idx = len(*aq) - *aq = append(*aq, x.(*Anchor)) -} - -func (aq *AnchorQueue) Pop() interface{} { - old := *aq - n := len(old) - x := old[n-1] - old[n-1] = nil - *aq = old[0 : n-1] - x.idx = -1 - return x } type ChainSegmentHeader struct { @@ -279,11 +231,11 @@ type HeaderDownload struct { anchors map[libcommon.Hash]*Anchor // Mapping from parentHash to collection of anchors links map[libcommon.Hash]*Link // Links by header hash engine consensus.Engine - insertQueue InsertQueue // Priority queue of non-persisted links that need to be verified and can be inserted - seenAnnounces *SeenAnnounces // External announcement hashes, after header verification if hash is in this set - will broadcast it further - persistedLinkQueue LinkQueue // Priority queue of persisted links used to limit their number - linkQueue LinkQueue // Priority queue of non-persisted links used to limit their number - anchorQueue *AnchorQueue // Priority queue of anchors used to sequence the header requests + insertQueue InsertQueue // Priority queue of non-persisted links that need to be verified and can be inserted + seenAnnounces *SeenAnnounces // External announcement hashes, after header verification if hash is in this set - will broadcast it further + persistedLinkQueue LinkQueue // Priority queue of persisted links used to limit their number + linkQueue LinkQueue // Priority queue of non-persisted links used to limit their number + anchorTree *btree.BTreeG[*Anchor] // anchors sorted by block height DeliveryNotify chan struct{} toAnnounce []Announce lock sync.RWMutex @@ -342,7 +294,7 @@ func NewHeaderDownload( anchorLimit: anchorLimit, engine: engine, links: make(map[libcommon.Hash]*Link), - anchorQueue: &AnchorQueue{}, + anchorTree: btree.NewG[*Anchor](32, func(a, b *Anchor) bool { return a.blockHeight < b.blockHeight }), seenAnnounces: NewSeenAnnounces(), DeliveryNotify: make(chan struct{}, 1), QuitPoWMining: make(chan struct{}), @@ -354,7 +306,6 @@ func NewHeaderDownload( } heap.Init(&hd.persistedLinkQueue) heap.Init(&hd.linkQueue) - heap.Init(hd.anchorQueue) heap.Init(&hd.insertQueue) return hd }