diff --git a/eth/stagedsync/stage_bodies.go b/eth/stagedsync/stage_bodies.go index 1e6821bbc..2d3d1b09b 100644 --- a/eth/stagedsync/stage_bodies.go +++ b/eth/stagedsync/stage_bodies.go @@ -151,7 +151,6 @@ func BodiesForward( currentTime := uint64(time.Now().Unix()) cfg.bd.RequestSent(req, currentTime+uint64(timeout), peer) d3 += time.Since(start) - log.Debug("body request sent", "req", fmt.Sprintf("%+v", req), "peer", fmt.Sprintf("%x", peer)) } // loopCount is used here to ensure we don't get caught in a constant loop of making requests @@ -178,7 +177,6 @@ func BodiesForward( start = time.Now() cfg.bd.RequestSent(req, currentTime+uint64(timeout), peer) d3 += time.Since(start) - log.Debug("body request sent", "req", fmt.Sprintf("%+v", req), "peer", fmt.Sprintf("%x", peer)) } loopCount++ @@ -199,53 +197,53 @@ func BodiesForward( toProcess := cfg.bd.NextProcessingCount() - if toProcess > 0 { - var i uint64 - for i = 0; i < toProcess; i++ { - nextBlock := requestedLow + i + write := true + for i := uint64(0); i < toProcess; i++ { + nextBlock := requestedLow + i + rawBody := cfg.bd.GetBodyFromCache(nextBlock) + if rawBody == nil { + log.Debug("Body was nil when reading from cache", "block", nextBlock) + cfg.bd.NotDelivered(nextBlock) + write = false + } + if !write { + continue + } + header, _, err := cfg.bd.GetHeader(nextBlock, cfg.blockReader, tx) + if err != nil { + return false, err + } + blockHeight := header.Number.Uint64() + if blockHeight != nextBlock { + return false, fmt.Errorf("[%s] Header block unexpected when matching body, got %v, expected %v", logPrefix, blockHeight, nextBlock) + } - header, _, err := cfg.bd.GetHeader(nextBlock, cfg.blockReader, tx) - if err != nil { + // Txn & uncle roots are verified via bd.requestedMap + err = cfg.bd.Engine.VerifyUncles(cr, header, rawBody.Uncles) + if err != nil { + log.Error(fmt.Sprintf("[%s] Uncle verification failed", logPrefix), "number", blockHeight, "hash", header.Hash().String(), "err", err) + u.UnwindTo(blockHeight-1, header.Hash()) + return true, nil + } + + // Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call) + ok, lastTxnNum, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody) + if err != nil { + return false, fmt.Errorf("WriteRawBodyIfNotExists: %w", err) + } + if cfg.historyV3 && ok { + if err := rawdb.TxNums.Append(tx, blockHeight, lastTxnNum); err != nil { return false, err } - blockHeight := header.Number.Uint64() - if blockHeight != nextBlock { - return false, fmt.Errorf("[%s] Header block unexpected when matching body, got %v, expected %v", logPrefix, blockHeight, nextBlock) - } - - rawBody := cfg.bd.GetBodyFromCache(nextBlock) - if rawBody == nil { - log.Debug("Body was nil when reading from cache", "block", nextBlock) - break - } - - // Txn & uncle roots are verified via bd.requestedMap - err = cfg.bd.Engine.VerifyUncles(cr, header, rawBody.Uncles) - if err != nil { - log.Error(fmt.Sprintf("[%s] Uncle verification failed", logPrefix), "number", blockHeight, "hash", header.Hash().String(), "err", err) - u.UnwindTo(blockHeight-1, header.Hash()) - return true, nil - } - - // Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call) - ok, lastTxnNum, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody) - if err != nil { - return false, fmt.Errorf("WriteRawBodyIfNotExists: %w", err) - } - if cfg.historyV3 && ok { - if err := rawdb.TxNums.Append(tx, blockHeight, lastTxnNum); err != nil { - return false, err - } - } - - if blockHeight > bodyProgress { - bodyProgress = blockHeight - if err = s.Update(tx, blockHeight); err != nil { - return false, fmt.Errorf("saving Bodies progress: %w", err) - } - } - cfg.bd.AdvanceLow() } + + if blockHeight > bodyProgress { + bodyProgress = blockHeight + if err = s.Update(tx, blockHeight); err != nil { + return false, fmt.Errorf("saving Bodies progress: %w", err) + } + } + cfg.bd.AdvanceLow() } if !quiet && toProcess > 0 { diff --git a/turbo/stages/bodydownload/body_algos.go b/turbo/stages/bodydownload/body_algos.go index 185d28e88..6b859a0e7 100644 --- a/turbo/stages/bodydownload/body_algos.go +++ b/turbo/stages/bodydownload/body_algos.go @@ -75,11 +75,7 @@ func (bd *BodyDownload) RequestMoreBodies(tx kv.RwTx, blockReader services.FullB blockNums := make([]uint64, 0, BlockBufferSize) hashes := make([]libcommon.Hash, 0, BlockBufferSize) - for blockNum := bd.requestedLow; len(blockNums) < BlockBufferSize && bd.requestedLow <= bd.maxProgress; blockNum++ { - // Check if we reached the highest allowed request block number, and turn back - if blockNum >= bd.maxProgress { - break // Avoid tight loop - } + for blockNum := bd.requestedLow; len(blockNums) < BlockBufferSize && blockNum < bd.maxProgress; blockNum++ { if bd.delivered.Contains(blockNum) { // Already delivered, no need to request continue @@ -172,11 +168,7 @@ func (bd *BodyDownload) RequestMoreBodies(tx kv.RwTx, blockReader services.FullB } if len(blockNums) > 0 { bodyReq = &BodyRequest{BlockNums: blockNums, Hashes: hashes} - for _, num := range blockNums { - bd.requests[num] = bodyReq - } } - return bodyReq, nil } @@ -210,6 +202,12 @@ func (bd *BodyDownload) checkPrefetchedBlock(hash libcommon.Hash, tx kv.RwTx, bl } func (bd *BodyDownload) RequestSent(bodyReq *BodyRequest, timeWithTimeout uint64, peer [64]byte) { + if len(bodyReq.BlockNums) > 0 { + log.Debug("Sent Body request", "peer", fmt.Sprintf("%x", peer)[:8], "min", bodyReq.BlockNums[0], "max", bodyReq.BlockNums[len(bodyReq.BlockNums)-1]) + } + for _, num := range bodyReq.BlockNums { + bd.requests[num] = bodyReq + } bodyReq.waitUntil = timeWithTimeout bodyReq.peerID = peer } @@ -343,6 +341,10 @@ func (bd *BodyDownload) DeliveryCounts() (float64, float64) { return bd.deliveredCount, bd.wastedCount } +func (bd *BodyDownload) NotDelivered(blockNum uint64) { + bd.delivered.Remove(blockNum) +} + func (bd *BodyDownload) GetPenaltyPeers() [][64]byte { peers := make([][64]byte, len(bd.peerMap)) i := 0