Improvements for the BSC stuck header sync (#6653)

There are 3 changes:
1. Replace `anchorQueue` with `anchorTree` to be able to always walk the
anchors in the order of increasing blockHeights (not possible with the
queue) to prioritise making progress on the lowest block heights
2. Not increment `nextRetryTime` if the request was not sent
3. Reduce the strides in skeleton from `8*192` to `192` to reduce
reliance of the long series of requests to make progress

Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro-2.local>
This commit is contained in:
ledgerwatch 2023-01-22 12:28:17 +00:00 committed by GitHub
parent f1dd51ccb3
commit 706a999178
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 53 additions and 109 deletions

View File

@ -741,7 +741,7 @@ func (ss *GrpcServer) PeerUseless(_ context.Context, req *proto_sentry.PeerUsele
peerInfo := ss.getPeer(peerID)
if ss.statusData != nil && !ss.statusData.PassivePeers && peerInfo != nil && !peerInfo.peer.Info().Network.Static && !peerInfo.peer.Info().Network.Trusted {
ss.removePeer(peerID)
log.Debug("Removed useless peer", "peerId", fmt.Sprintf("%x", peerID), "name", peerInfo.peer.Name())
log.Debug("Removed useless peer", "peerId", fmt.Sprintf("%x", peerID)[:8], "name", peerInfo.peer.Name())
}
return &emptypb.Empty{}, nil
}

View File

@ -400,7 +400,7 @@ func (cs *MultiClient) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPac
if _, err := sentry.PeerUseless(ctx, &outreq, &grpc.EmptyCallOption{}); err != nil {
return fmt.Errorf("sending peer useless request: %v", err)
}
log.Debug("Requested removal of peer for empty header response", "peerId", fmt.Sprintf("%x", ConvertH512ToPeerID(peerID)))
log.Debug("Requested removal of peer for empty header response", "peerId", fmt.Sprintf("%x", ConvertH512ToPeerID(peerID))[:8])
}
// No point processing empty response
return nil
@ -410,6 +410,7 @@ func (cs *MultiClient) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPac
return fmt.Errorf("decode 2 BlockHeadersPacket66: %w", err)
}
// Extract headers from the block
//var blockNums []int
var highestBlock uint64
csHeaders := make([]headerdownload.ChainSegmentHeader, 0, len(pkt))
for _, header := range pkt {
@ -428,7 +429,10 @@ func (cs *MultiClient) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPac
Hash: types.RawRlpHash(hRaw),
Number: number,
})
//blockNums = append(blockNums, int(number))
}
//sort.Ints(blockNums)
//log.Debug("Delivered headers", "peer", fmt.Sprintf("%x", ConvertH512ToPeerID(peerID))[:8], "blockNums", fmt.Sprintf("%d", blockNums))
if cs.Hd.POSSync() {
sort.Sort(headerdownload.HeadersReverseSort(csHeaders)) // Sorting by reverse order of block heights
tx, err := cs.db.BeginRo(ctx)
@ -451,11 +455,10 @@ func (cs *MultiClient) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPac
currentTime := time.Now()
req, penalties := cs.Hd.RequestMoreHeaders(currentTime)
if req != nil {
if _, sentToPeer := cs.SendHeaderRequest(ctx, req); sentToPeer {
cs.Hd.UpdateStats(req, false /* skeleton */)
if peer, sentToPeer := cs.SendHeaderRequest(ctx, req); sentToPeer {
cs.Hd.UpdateStats(req, false /* skeleton */, peer)
cs.Hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */)
}
// Regardless of whether request was actually sent to a peer, we update retry time to be 5 seconds in the future
cs.Hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */)
}
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)

View File

@ -783,12 +783,12 @@ func HeadersPOW(
headerInserter := headerdownload.NewHeaderInserter(logPrefix, localTd, headerProgress, cfg.blockReader)
cfg.hd.SetHeaderReader(&ChainReaderImpl{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader})
var sentToPeer bool
stopped := false
prevProgress := headerProgress
var noProgressCounter int
var wasProgress bool
var lastSkeletonTime time.Time
var sentToPeer bool
Loop:
for !stopped {
@ -806,12 +806,12 @@ Loop:
currentTime := time.Now()
req, penalties := cfg.hd.RequestMoreHeaders(currentTime)
if req != nil {
_, sentToPeer = cfg.headerReqSend(ctx, req)
var peer [64]byte
peer, sentToPeer = cfg.headerReqSend(ctx, req)
if sentToPeer {
cfg.hd.UpdateStats(req, false /* skeleton */)
cfg.hd.UpdateStats(req, false /* skeleton */, peer)
cfg.hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */)
}
// Regardless of whether request was actually sent to a peer, we update retry time to be 5 seconds in the future
cfg.hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */)
}
if len(penalties) > 0 {
cfg.penalize(ctx, penalties)
@ -820,12 +820,12 @@ Loop:
for req != nil && sentToPeer && maxRequests > 0 {
req, penalties = cfg.hd.RequestMoreHeaders(currentTime)
if req != nil {
_, sentToPeer = cfg.headerReqSend(ctx, req)
var peer [64]byte
peer, sentToPeer = cfg.headerReqSend(ctx, req)
if sentToPeer {
cfg.hd.UpdateStats(req, false /* skeleton */)
cfg.hd.UpdateStats(req, false /* skeleton */, peer)
cfg.hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */)
}
// Regardless of whether request was actually sent to a peer, we update retry time to be 5 seconds in the future
cfg.hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */)
}
if len(penalties) > 0 {
cfg.penalize(ctx, penalties)
@ -837,9 +837,10 @@ Loop:
if time.Since(lastSkeletonTime) > 1*time.Second {
req = cfg.hd.RequestSkeleton()
if req != nil {
_, sentToPeer = cfg.headerReqSend(ctx, req)
var peer [64]byte
peer, sentToPeer = cfg.headerReqSend(ctx, req)
if sentToPeer {
cfg.hd.UpdateStats(req, true /* skeleton */)
cfg.hd.UpdateStats(req, true /* skeleton */, peer)
lastSkeletonTime = time.Now()
}
}

View File

@ -196,8 +196,7 @@ func (hd *HeaderDownload) MarkAllVerified() {
func (hd *HeaderDownload) removeAnchor(anchor *Anchor) {
// Anchor is removed from the map, and from the priority queue
delete(hd.anchors, anchor.parentHash)
heap.Remove(hd.anchorQueue, anchor.idx)
anchor.idx = -1
hd.anchorTree.Delete(anchor)
}
func (hd *HeaderDownload) pruneLinkQueue() {
@ -297,12 +296,11 @@ func (hd *HeaderDownload) logAnchorState() {
sb.WriteString(fmt.Sprintf("{%8d", anchor.blockHeight))
sb.WriteString(fmt.Sprintf("-%d links=%d (%s)}", end, len(bs), sbb.String()))
sb.WriteString(fmt.Sprintf(" => %x", anchorParent))
sb.WriteString(fmt.Sprintf(", anchorQueue.idx=%d", anchor.idx))
sb.WriteString(fmt.Sprintf(", next retry in %v", anchor.nextRetryTime.Sub(currentTime)))
ss = append(ss, sb.String())
}
sort.Strings(ss)
log.Debug("[Downloader] Queue sizes", "anchors", hd.anchorQueue.Len(), "links", hd.linkQueue.Len(), "persisted", hd.persistedLinkQueue.Len())
log.Debug("[Downloader] Queue sizes", "anchors", hd.anchorTree.Len(), "links", hd.linkQueue.Len(), "persisted", hd.persistedLinkQueue.Len())
for _, s := range ss {
log.Debug(s)
}
@ -390,32 +388,28 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime time.Time) (*HeaderRequ
hd.lock.Lock()
defer hd.lock.Unlock()
var penalties []PenaltyItem
if hd.anchorQueue.Len() == 0 {
log.Trace("[Downloader] Empty anchor queue")
return nil, penalties
}
for hd.anchorQueue.Len() > 0 {
anchor := (*hd.anchorQueue)[0]
// Only process the anchors for which the nextRetryTime has already come
var req *HeaderRequest
hd.anchorTree.Ascend(func(anchor *Anchor) bool {
if anchor.nextRetryTime.After(currentTime) {
return nil, penalties
return true
}
if anchor.timeouts < 10 {
// Produce a header request that would extend this anchor (add parent, parent of parent, etc.)
return &HeaderRequest{
Anchor: anchor,
Hash: anchor.parentHash,
Number: anchor.blockHeight - 1,
Length: 192,
Skip: 0,
Reverse: true,
}, penalties
if anchor.timeouts >= 10 {
// Ancestors of this anchor seem to be unavailable, invalidate and move on
hd.invalidateAnchor(anchor, "suspected unavailability")
penalties = append(penalties, PenaltyItem{Penalty: AbandonedAnchorPenalty, PeerID: anchor.peerID})
return true
}
// Ancestors of this anchor seem to be unavailable, invalidate and move on
hd.invalidateAnchor(anchor, "suspected unavailability")
penalties = append(penalties, PenaltyItem{Penalty: AbandonedAnchorPenalty, PeerID: anchor.peerID})
}
return nil, penalties
req = &HeaderRequest{
Anchor: anchor,
Hash: anchor.parentHash,
Number: anchor.blockHeight - 1,
Length: 192,
Skip: 0,
Reverse: true,
}
return false
})
return req, penalties
}
func (hd *HeaderDownload) requestMoreHeadersForPOS(currentTime time.Time) (timeout bool, request *HeaderRequest, penalties []PenaltyItem) {
@ -450,7 +444,7 @@ func (hd *HeaderDownload) requestMoreHeadersForPOS(currentTime time.Time) (timeo
return
}
func (hd *HeaderDownload) UpdateStats(req *HeaderRequest, skeleton bool) {
func (hd *HeaderDownload) UpdateStats(req *HeaderRequest, skeleton bool, peer [64]byte) {
hd.lock.Lock()
defer hd.lock.Unlock()
if skeleton {
@ -473,26 +467,21 @@ func (hd *HeaderDownload) UpdateStats(req *HeaderRequest, skeleton bool) {
}
}
}
//log.Debug("Header request sent", "req", fmt.Sprintf("%+v", req), "peer", fmt.Sprintf("%x", peer)[:8])
}
func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime time.Time, timeout time.Duration) {
hd.lock.Lock()
defer hd.lock.Unlock()
if req.Anchor.idx == -1 {
// Anchor has already been deleted
return
}
req.Anchor.timeouts++
req.Anchor.nextRetryTime = currentTime.Add(timeout)
heap.Fix(hd.anchorQueue, req.Anchor.idx)
}
func (hd *HeaderDownload) RequestSkeleton() *HeaderRequest {
hd.lock.RLock()
defer hd.lock.RUnlock()
log.Debug("[Downloader] Request skeleton", "anchors", len(hd.anchors), "highestInDb", hd.highestInDb)
stride := uint64(8 * 192)
stride := uint64(192)
var length uint64 = 192
// Include one header that we have already, to make sure the responses are not empty and do not get penalised when we are at the tip of the chain
from := hd.highestInDb
@ -1017,7 +1006,7 @@ func (hd *HeaderDownload) ProcessHeader(sh ChainSegmentHeader, newBlock bool, pe
}
anchor.fLink = link
hd.anchors[anchor.parentHash] = anchor
heap.Push(hd.anchorQueue, anchor)
hd.anchorTree.ReplaceOrInsert(anchor)
return true
}
return false

View File

@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/google/btree"
lru "github.com/hashicorp/golang-lru"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/etl"
@ -109,55 +110,6 @@ type Anchor struct {
blockHeight uint64
nextRetryTime time.Time // Zero when anchor has just been created, otherwise time when anchor needs to be check to see if retry is needed
timeouts int // Number of timeout that this anchor has experiences - after certain threshold, it gets invalidated
idx int // Index of the anchor in the queue to be able to modify specific items
}
// AnchorQueue is a priority queue of anchors that priorises by the time when
// another retry on the extending the anchor needs to be attempted
// Every time anchor's extension is requested, the `nextRetryTime` is reset
// to 5 seconds in the future, and when it expires, and the anchor is still
// retry is made
// It implement heap.Interface to be useable by the standard library `heap`
// as a priority queue (implemented as a binary heap)
// As anchors are moved around in the binary heap, they internally track their
// position in the heap (using `idx` field). This feature allows updating
// the heap (using `Fix` function) in situations when anchor is accessed not
// through the priority queue, but through the map `anchor` in the
// HeaderDownloader type.
type AnchorQueue []*Anchor
func (aq AnchorQueue) Len() int {
return len(aq)
}
func (aq AnchorQueue) Less(i, j int) bool {
if aq[i].nextRetryTime == aq[j].nextRetryTime {
// When next retry times are the same, we prioritise low block height anchors
return aq[i].blockHeight < aq[j].blockHeight
}
return aq[i].nextRetryTime.Before(aq[j].nextRetryTime)
}
func (aq AnchorQueue) Swap(i, j int) {
aq[i], aq[j] = aq[j], aq[i]
aq[i].idx, aq[j].idx = i, j // Restore indices after the swap
}
func (aq *AnchorQueue) Push(x interface{}) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
x.(*Anchor).idx = len(*aq)
*aq = append(*aq, x.(*Anchor))
}
func (aq *AnchorQueue) Pop() interface{} {
old := *aq
n := len(old)
x := old[n-1]
old[n-1] = nil
*aq = old[0 : n-1]
x.idx = -1
return x
}
type ChainSegmentHeader struct {
@ -279,11 +231,11 @@ type HeaderDownload struct {
anchors map[libcommon.Hash]*Anchor // Mapping from parentHash to collection of anchors
links map[libcommon.Hash]*Link // Links by header hash
engine consensus.Engine
insertQueue InsertQueue // Priority queue of non-persisted links that need to be verified and can be inserted
seenAnnounces *SeenAnnounces // External announcement hashes, after header verification if hash is in this set - will broadcast it further
persistedLinkQueue LinkQueue // Priority queue of persisted links used to limit their number
linkQueue LinkQueue // Priority queue of non-persisted links used to limit their number
anchorQueue *AnchorQueue // Priority queue of anchors used to sequence the header requests
insertQueue InsertQueue // Priority queue of non-persisted links that need to be verified and can be inserted
seenAnnounces *SeenAnnounces // External announcement hashes, after header verification if hash is in this set - will broadcast it further
persistedLinkQueue LinkQueue // Priority queue of persisted links used to limit their number
linkQueue LinkQueue // Priority queue of non-persisted links used to limit their number
anchorTree *btree.BTreeG[*Anchor] // anchors sorted by block height
DeliveryNotify chan struct{}
toAnnounce []Announce
lock sync.RWMutex
@ -342,7 +294,7 @@ func NewHeaderDownload(
anchorLimit: anchorLimit,
engine: engine,
links: make(map[libcommon.Hash]*Link),
anchorQueue: &AnchorQueue{},
anchorTree: btree.NewG[*Anchor](32, func(a, b *Anchor) bool { return a.blockHeight < b.blockHeight }),
seenAnnounces: NewSeenAnnounces(),
DeliveryNotify: make(chan struct{}, 1),
QuitPoWMining: make(chan struct{}),
@ -354,7 +306,6 @@ func NewHeaderDownload(
}
heap.Init(&hd.persistedLinkQueue)
heap.Init(&hd.linkQueue)
heap.Init(hd.anchorQueue)
heap.Init(&hd.insertQueue)
return hd
}