Further fixes for body downloader ()

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
ledgerwatch 2023-01-17 22:03:28 +00:00 committed by GitHub
parent 6443279775
commit 44b834e77a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 54 deletions
eth/stagedsync
turbo/stages/bodydownload

View File

@ -151,7 +151,6 @@ func BodiesForward(
currentTime := uint64(time.Now().Unix())
cfg.bd.RequestSent(req, currentTime+uint64(timeout), peer)
d3 += time.Since(start)
log.Debug("body request sent", "req", fmt.Sprintf("%+v", req), "peer", fmt.Sprintf("%x", peer))
}
// loopCount is used here to ensure we don't get caught in a constant loop of making requests
@ -178,7 +177,6 @@ func BodiesForward(
start = time.Now()
cfg.bd.RequestSent(req, currentTime+uint64(timeout), peer)
d3 += time.Since(start)
log.Debug("body request sent", "req", fmt.Sprintf("%+v", req), "peer", fmt.Sprintf("%x", peer))
}
loopCount++
@ -199,53 +197,53 @@ func BodiesForward(
toProcess := cfg.bd.NextProcessingCount()
if toProcess > 0 {
var i uint64
for i = 0; i < toProcess; i++ {
nextBlock := requestedLow + i
write := true
for i := uint64(0); i < toProcess; i++ {
nextBlock := requestedLow + i
rawBody := cfg.bd.GetBodyFromCache(nextBlock)
if rawBody == nil {
log.Debug("Body was nil when reading from cache", "block", nextBlock)
cfg.bd.NotDelivered(nextBlock)
write = false
}
if !write {
continue
}
header, _, err := cfg.bd.GetHeader(nextBlock, cfg.blockReader, tx)
if err != nil {
return false, err
}
blockHeight := header.Number.Uint64()
if blockHeight != nextBlock {
return false, fmt.Errorf("[%s] Header block unexpected when matching body, got %v, expected %v", logPrefix, blockHeight, nextBlock)
}
header, _, err := cfg.bd.GetHeader(nextBlock, cfg.blockReader, tx)
if err != nil {
// Txn & uncle roots are verified via bd.requestedMap
err = cfg.bd.Engine.VerifyUncles(cr, header, rawBody.Uncles)
if err != nil {
log.Error(fmt.Sprintf("[%s] Uncle verification failed", logPrefix), "number", blockHeight, "hash", header.Hash().String(), "err", err)
u.UnwindTo(blockHeight-1, header.Hash())
return true, nil
}
// Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call)
ok, lastTxnNum, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody)
if err != nil {
return false, fmt.Errorf("WriteRawBodyIfNotExists: %w", err)
}
if cfg.historyV3 && ok {
if err := rawdb.TxNums.Append(tx, blockHeight, lastTxnNum); err != nil {
return false, err
}
blockHeight := header.Number.Uint64()
if blockHeight != nextBlock {
return false, fmt.Errorf("[%s] Header block unexpected when matching body, got %v, expected %v", logPrefix, blockHeight, nextBlock)
}
rawBody := cfg.bd.GetBodyFromCache(nextBlock)
if rawBody == nil {
log.Debug("Body was nil when reading from cache", "block", nextBlock)
break
}
// Txn & uncle roots are verified via bd.requestedMap
err = cfg.bd.Engine.VerifyUncles(cr, header, rawBody.Uncles)
if err != nil {
log.Error(fmt.Sprintf("[%s] Uncle verification failed", logPrefix), "number", blockHeight, "hash", header.Hash().String(), "err", err)
u.UnwindTo(blockHeight-1, header.Hash())
return true, nil
}
// Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call)
ok, lastTxnNum, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody)
if err != nil {
return false, fmt.Errorf("WriteRawBodyIfNotExists: %w", err)
}
if cfg.historyV3 && ok {
if err := rawdb.TxNums.Append(tx, blockHeight, lastTxnNum); err != nil {
return false, err
}
}
if blockHeight > bodyProgress {
bodyProgress = blockHeight
if err = s.Update(tx, blockHeight); err != nil {
return false, fmt.Errorf("saving Bodies progress: %w", err)
}
}
cfg.bd.AdvanceLow()
}
if blockHeight > bodyProgress {
bodyProgress = blockHeight
if err = s.Update(tx, blockHeight); err != nil {
return false, fmt.Errorf("saving Bodies progress: %w", err)
}
}
cfg.bd.AdvanceLow()
}
if !quiet && toProcess > 0 {

View File

@ -75,11 +75,7 @@ func (bd *BodyDownload) RequestMoreBodies(tx kv.RwTx, blockReader services.FullB
blockNums := make([]uint64, 0, BlockBufferSize)
hashes := make([]libcommon.Hash, 0, BlockBufferSize)
for blockNum := bd.requestedLow; len(blockNums) < BlockBufferSize && bd.requestedLow <= bd.maxProgress; blockNum++ {
// Check if we reached the highest allowed request block number, and turn back
if blockNum >= bd.maxProgress {
break // Avoid tight loop
}
for blockNum := bd.requestedLow; len(blockNums) < BlockBufferSize && blockNum < bd.maxProgress; blockNum++ {
if bd.delivered.Contains(blockNum) {
// Already delivered, no need to request
continue
@ -172,11 +168,7 @@ func (bd *BodyDownload) RequestMoreBodies(tx kv.RwTx, blockReader services.FullB
}
if len(blockNums) > 0 {
bodyReq = &BodyRequest{BlockNums: blockNums, Hashes: hashes}
for _, num := range blockNums {
bd.requests[num] = bodyReq
}
}
return bodyReq, nil
}
@ -210,6 +202,12 @@ func (bd *BodyDownload) checkPrefetchedBlock(hash libcommon.Hash, tx kv.RwTx, bl
}
func (bd *BodyDownload) RequestSent(bodyReq *BodyRequest, timeWithTimeout uint64, peer [64]byte) {
if len(bodyReq.BlockNums) > 0 {
log.Debug("Sent Body request", "peer", fmt.Sprintf("%x", peer)[:8], "min", bodyReq.BlockNums[0], "max", bodyReq.BlockNums[len(bodyReq.BlockNums)-1])
}
for _, num := range bodyReq.BlockNums {
bd.requests[num] = bodyReq
}
bodyReq.waitUntil = timeWithTimeout
bodyReq.peerID = peer
}
@ -343,6 +341,10 @@ func (bd *BodyDownload) DeliveryCounts() (float64, float64) {
return bd.deliveredCount, bd.wastedCount
}
func (bd *BodyDownload) NotDelivered(blockNum uint64) {
bd.delivered.Remove(blockNum)
}
func (bd *BodyDownload) GetPenaltyPeers() [][64]byte {
peers := make([][64]byte, len(bd.peerMap))
i := 0