erigon-pulse/turbo/engineapi/engine_block_downloader/body.go
2023-08-04 02:22:07 +02:00

230 lines
7.1 KiB
Go

package engine_block_downloader
import (
"fmt"
"runtime"
"time"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/dataflow"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/turbo/stages/bodydownload"
"github.com/ledgerwatch/log/v3"
)
// downloadBodies executes bodies download.
func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(tx kv.RwTx, fromBlock, toBlock uint64) (err error) {
headerProgress := toBlock
bodyProgress := fromBlock - 1
if err := stages.SaveStageProgress(tx, stages.Bodies, bodyProgress); err != nil {
return err
}
if err := stages.SaveStageProgress(tx, stages.Headers, headerProgress); err != nil {
return err
}
var d1, d2, d3, d4, d5, d6 time.Duration
timeout := e.timeout
// This will update bd.maxProgress
if _, _, _, _, err = e.bd.UpdateFromDb(tx); err != nil {
return
}
defer e.bd.ClearBodyCache()
if bodyProgress >= headerProgress {
return nil
}
logPrefix := "EngineBlockDownloader"
if headerProgress <= bodyProgress+16 {
// When processing small number of blocks, we can afford wasting more bandwidth but get blocks quicker
timeout = 1
} else {
// Do not print logs for short periods
e.logger.Info(fmt.Sprintf("[%s] Processing bodies...", logPrefix), "from", bodyProgress, "to", headerProgress)
}
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
var prevDeliveredCount float64 = 0
var prevWastedCount float64 = 0
timer := time.NewTimer(1 * time.Second) // Check periodically even in the abseence of incoming messages
var req *bodydownload.BodyRequest
var peer [64]byte
var sentToPeer bool
stopped := false
prevProgress := bodyProgress
var noProgressCount uint = 0 // How many time the progress was printed without actual progress
var totalDelivered uint64 = 0
loopBody := func() (bool, error) {
// loopCount is used here to ensure we don't get caught in a constant loop of making requests
// having some time out so requesting again and cycling like that forever. We'll cap it
// and break the loop so we can see if there are any records to actually process further down
// then come back here again in the next cycle
for loopCount := 0; loopCount == 0 || (req != nil && sentToPeer && loopCount < requestLoopCutOff); loopCount++ {
start := time.Now()
currentTime := uint64(time.Now().Unix())
req, err = e.bd.RequestMoreBodies(tx, e.blockReader, currentTime, e.blockPropagator)
if err != nil {
return false, fmt.Errorf("request more bodies: %w", err)
}
d1 += time.Since(start)
peer = [64]byte{}
sentToPeer = false
if req != nil {
start = time.Now()
peer, sentToPeer = e.bodyReqSend(e.ctx, req)
d2 += time.Since(start)
}
if req != nil && sentToPeer {
start = time.Now()
e.bd.RequestSent(req, currentTime+uint64(timeout), peer)
d3 += time.Since(start)
}
}
start := time.Now()
requestedLow, delivered, err := e.bd.GetDeliveries(tx)
if err != nil {
return false, err
}
totalDelivered += delivered
d4 += time.Since(start)
start = time.Now()
toProcess := e.bd.NextProcessingCount()
write := true
for i := uint64(0); i < toProcess; i++ {
select {
case <-logEvery.C:
logWritingBodies(logPrefix, bodyProgress, headerProgress, e.logger)
default:
}
nextBlock := requestedLow + i
rawBody := e.bd.GetBodyFromCache(nextBlock, write)
if rawBody == nil {
e.bd.NotDelivered(nextBlock)
write = false
}
if !write {
continue
}
e.bd.NotDelivered(nextBlock)
header, _, err := e.bd.GetHeader(nextBlock, e.blockReader, tx)
if err != nil {
return false, err
}
blockHeight := header.Number.Uint64()
if blockHeight != nextBlock {
return false, fmt.Errorf("[%s] Header block unexpected when matching body, got %v, expected %v", logPrefix, blockHeight, nextBlock)
}
// Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call)
ok, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody)
if err != nil {
return false, fmt.Errorf("WriteRawBodyIfNotExists: %w", err)
}
if ok {
dataflow.BlockBodyDownloadStates.AddChange(blockHeight, dataflow.BlockBodyCleared)
}
if blockHeight > bodyProgress {
bodyProgress = blockHeight
}
e.bd.AdvanceLow()
}
d5 += time.Since(start)
start = time.Now()
if bodyProgress == headerProgress {
return true, nil
}
timer.Stop()
timer = time.NewTimer(1 * time.Second)
select {
case <-e.ctx.Done():
stopped = true
case <-logEvery.C:
deliveredCount, wastedCount := e.bd.DeliveryCounts()
if prevProgress == bodyProgress {
noProgressCount++
} else {
noProgressCount = 0 // Reset, there was progress
}
logDownloadingBodies(logPrefix, bodyProgress, headerProgress-requestedLow, totalDelivered, prevDeliveredCount, deliveredCount,
prevWastedCount, wastedCount, e.bd.BodyCacheSize(), e.logger)
prevProgress = bodyProgress
prevDeliveredCount = deliveredCount
prevWastedCount = wastedCount
//logger.Info("Timings", "d1", d1, "d2", d2, "d3", d3, "d4", d4, "d5", d5, "d6", d6)
case <-timer.C:
e.logger.Trace("RequestQueueTime (bodies) ticked")
case <-e.bd.DeliveryNotify:
e.logger.Trace("bodyLoop woken up by the incoming request")
}
d6 += time.Since(start)
return false, nil
}
// kick off the loop and check for any reason to stop and break early
var shouldBreak bool
for !stopped && !shouldBreak {
if shouldBreak, err = loopBody(); err != nil {
return err
}
}
if stopped {
return libcommon.ErrStopped
}
e.logger.Info(fmt.Sprintf("[%s] Processed", logPrefix), "highest", bodyProgress)
return nil
}
func logDownloadingBodies(logPrefix string, committed, remaining uint64, totalDelivered uint64, prevDeliveredCount, deliveredCount,
prevWastedCount, wastedCount float64, bodyCacheSize int, logger log.Logger) {
speed := (deliveredCount - prevDeliveredCount) / float64(logInterval/time.Second)
wastedSpeed := (wastedCount - prevWastedCount) / float64(logInterval/time.Second)
if speed == 0 && wastedSpeed == 0 {
logger.Info(fmt.Sprintf("[%s] No block bodies to write in this log period", logPrefix), "block number", committed)
return
}
var m runtime.MemStats
dbg.ReadMemStats(&m)
logger.Info(fmt.Sprintf("[%s] Downloading block bodies", logPrefix),
"block_num", committed,
"delivery/sec", libcommon.ByteCount(uint64(speed)),
"wasted/sec", libcommon.ByteCount(uint64(wastedSpeed)),
"remaining", remaining,
"delivered", totalDelivered,
"cache", libcommon.ByteCount(uint64(bodyCacheSize)),
"alloc", libcommon.ByteCount(m.Alloc),
"sys", libcommon.ByteCount(m.Sys),
)
}
func logWritingBodies(logPrefix string, committed, headerProgress uint64, logger log.Logger) {
var m runtime.MemStats
dbg.ReadMemStats(&m)
remaining := headerProgress - committed
logger.Info(fmt.Sprintf("[%s] Writing block bodies", logPrefix),
"block_num", committed,
"remaining", remaining,
"alloc", libcommon.ByteCount(m.Alloc),
"sys", libcommon.ByteCount(m.Sys),
)
}