mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-20 09:21:11 +00:00
230 lines
7.1 KiB
Go
230 lines
7.1 KiB
Go
package engine_block_downloader
|
|
|
|
import (
|
|
"fmt"
|
|
"runtime"
|
|
"time"
|
|
|
|
libcommon "github.com/ledgerwatch/erigon-lib/common"
|
|
"github.com/ledgerwatch/erigon-lib/common/dbg"
|
|
"github.com/ledgerwatch/erigon-lib/kv"
|
|
"github.com/ledgerwatch/erigon/core/rawdb"
|
|
"github.com/ledgerwatch/erigon/dataflow"
|
|
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
|
|
"github.com/ledgerwatch/erigon/turbo/stages/bodydownload"
|
|
"github.com/ledgerwatch/log/v3"
|
|
)
|
|
|
|
// downloadBodies executes bodies download.
|
|
func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(tx kv.RwTx, fromBlock, toBlock uint64) (err error) {
|
|
headerProgress := toBlock
|
|
bodyProgress := fromBlock - 1
|
|
|
|
if err := stages.SaveStageProgress(tx, stages.Bodies, bodyProgress); err != nil {
|
|
return err
|
|
}
|
|
if err := stages.SaveStageProgress(tx, stages.Headers, headerProgress); err != nil {
|
|
return err
|
|
}
|
|
|
|
var d1, d2, d3, d4, d5, d6 time.Duration
|
|
|
|
timeout := e.timeout
|
|
|
|
// This will update bd.maxProgress
|
|
if _, _, _, _, err = e.bd.UpdateFromDb(tx); err != nil {
|
|
return
|
|
}
|
|
defer e.bd.ClearBodyCache()
|
|
|
|
if bodyProgress >= headerProgress {
|
|
return nil
|
|
}
|
|
|
|
logPrefix := "EngineBlockDownloader"
|
|
if headerProgress <= bodyProgress+16 {
|
|
// When processing small number of blocks, we can afford wasting more bandwidth but get blocks quicker
|
|
timeout = 1
|
|
} else {
|
|
// Do not print logs for short periods
|
|
e.logger.Info(fmt.Sprintf("[%s] Processing bodies...", logPrefix), "from", bodyProgress, "to", headerProgress)
|
|
}
|
|
logEvery := time.NewTicker(logInterval)
|
|
defer logEvery.Stop()
|
|
|
|
var prevDeliveredCount float64 = 0
|
|
var prevWastedCount float64 = 0
|
|
timer := time.NewTimer(1 * time.Second) // Check periodically even in the abseence of incoming messages
|
|
var req *bodydownload.BodyRequest
|
|
var peer [64]byte
|
|
var sentToPeer bool
|
|
stopped := false
|
|
prevProgress := bodyProgress
|
|
var noProgressCount uint = 0 // How many time the progress was printed without actual progress
|
|
var totalDelivered uint64 = 0
|
|
|
|
loopBody := func() (bool, error) {
|
|
// loopCount is used here to ensure we don't get caught in a constant loop of making requests
|
|
// having some time out so requesting again and cycling like that forever. We'll cap it
|
|
// and break the loop so we can see if there are any records to actually process further down
|
|
// then come back here again in the next cycle
|
|
for loopCount := 0; loopCount == 0 || (req != nil && sentToPeer && loopCount < requestLoopCutOff); loopCount++ {
|
|
start := time.Now()
|
|
currentTime := uint64(time.Now().Unix())
|
|
req, err = e.bd.RequestMoreBodies(tx, e.blockReader, currentTime, e.blockPropagator)
|
|
if err != nil {
|
|
return false, fmt.Errorf("request more bodies: %w", err)
|
|
}
|
|
d1 += time.Since(start)
|
|
peer = [64]byte{}
|
|
sentToPeer = false
|
|
if req != nil {
|
|
start = time.Now()
|
|
peer, sentToPeer = e.bodyReqSend(e.ctx, req)
|
|
d2 += time.Since(start)
|
|
}
|
|
if req != nil && sentToPeer {
|
|
start = time.Now()
|
|
e.bd.RequestSent(req, currentTime+uint64(timeout), peer)
|
|
d3 += time.Since(start)
|
|
}
|
|
}
|
|
|
|
start := time.Now()
|
|
requestedLow, delivered, err := e.bd.GetDeliveries(tx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
totalDelivered += delivered
|
|
d4 += time.Since(start)
|
|
start = time.Now()
|
|
|
|
toProcess := e.bd.NextProcessingCount()
|
|
|
|
write := true
|
|
for i := uint64(0); i < toProcess; i++ {
|
|
select {
|
|
case <-logEvery.C:
|
|
logWritingBodies(logPrefix, bodyProgress, headerProgress, e.logger)
|
|
default:
|
|
}
|
|
nextBlock := requestedLow + i
|
|
rawBody := e.bd.GetBodyFromCache(nextBlock, write)
|
|
if rawBody == nil {
|
|
e.bd.NotDelivered(nextBlock)
|
|
write = false
|
|
}
|
|
if !write {
|
|
continue
|
|
}
|
|
|
|
e.bd.NotDelivered(nextBlock)
|
|
header, _, err := e.bd.GetHeader(nextBlock, e.blockReader, tx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
blockHeight := header.Number.Uint64()
|
|
if blockHeight != nextBlock {
|
|
return false, fmt.Errorf("[%s] Header block unexpected when matching body, got %v, expected %v", logPrefix, blockHeight, nextBlock)
|
|
}
|
|
|
|
// Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call)
|
|
ok, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody)
|
|
if err != nil {
|
|
return false, fmt.Errorf("WriteRawBodyIfNotExists: %w", err)
|
|
}
|
|
if ok {
|
|
dataflow.BlockBodyDownloadStates.AddChange(blockHeight, dataflow.BlockBodyCleared)
|
|
}
|
|
|
|
if blockHeight > bodyProgress {
|
|
bodyProgress = blockHeight
|
|
}
|
|
e.bd.AdvanceLow()
|
|
}
|
|
|
|
d5 += time.Since(start)
|
|
start = time.Now()
|
|
if bodyProgress == headerProgress {
|
|
return true, nil
|
|
}
|
|
|
|
timer.Stop()
|
|
timer = time.NewTimer(1 * time.Second)
|
|
select {
|
|
case <-e.ctx.Done():
|
|
stopped = true
|
|
case <-logEvery.C:
|
|
deliveredCount, wastedCount := e.bd.DeliveryCounts()
|
|
if prevProgress == bodyProgress {
|
|
noProgressCount++
|
|
} else {
|
|
noProgressCount = 0 // Reset, there was progress
|
|
}
|
|
logDownloadingBodies(logPrefix, bodyProgress, headerProgress-requestedLow, totalDelivered, prevDeliveredCount, deliveredCount,
|
|
prevWastedCount, wastedCount, e.bd.BodyCacheSize(), e.logger)
|
|
prevProgress = bodyProgress
|
|
prevDeliveredCount = deliveredCount
|
|
prevWastedCount = wastedCount
|
|
//logger.Info("Timings", "d1", d1, "d2", d2, "d3", d3, "d4", d4, "d5", d5, "d6", d6)
|
|
case <-timer.C:
|
|
e.logger.Trace("RequestQueueTime (bodies) ticked")
|
|
case <-e.bd.DeliveryNotify:
|
|
e.logger.Trace("bodyLoop woken up by the incoming request")
|
|
}
|
|
d6 += time.Since(start)
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// kick off the loop and check for any reason to stop and break early
|
|
var shouldBreak bool
|
|
for !stopped && !shouldBreak {
|
|
if shouldBreak, err = loopBody(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if stopped {
|
|
return libcommon.ErrStopped
|
|
}
|
|
e.logger.Info(fmt.Sprintf("[%s] Processed", logPrefix), "highest", bodyProgress)
|
|
|
|
return nil
|
|
}
|
|
|
|
func logDownloadingBodies(logPrefix string, committed, remaining uint64, totalDelivered uint64, prevDeliveredCount, deliveredCount,
|
|
prevWastedCount, wastedCount float64, bodyCacheSize int, logger log.Logger) {
|
|
speed := (deliveredCount - prevDeliveredCount) / float64(logInterval/time.Second)
|
|
wastedSpeed := (wastedCount - prevWastedCount) / float64(logInterval/time.Second)
|
|
if speed == 0 && wastedSpeed == 0 {
|
|
logger.Info(fmt.Sprintf("[%s] No block bodies to write in this log period", logPrefix), "block number", committed)
|
|
return
|
|
}
|
|
|
|
var m runtime.MemStats
|
|
dbg.ReadMemStats(&m)
|
|
logger.Info(fmt.Sprintf("[%s] Downloading block bodies", logPrefix),
|
|
"block_num", committed,
|
|
"delivery/sec", libcommon.ByteCount(uint64(speed)),
|
|
"wasted/sec", libcommon.ByteCount(uint64(wastedSpeed)),
|
|
"remaining", remaining,
|
|
"delivered", totalDelivered,
|
|
"cache", libcommon.ByteCount(uint64(bodyCacheSize)),
|
|
"alloc", libcommon.ByteCount(m.Alloc),
|
|
"sys", libcommon.ByteCount(m.Sys),
|
|
)
|
|
}
|
|
|
|
func logWritingBodies(logPrefix string, committed, headerProgress uint64, logger log.Logger) {
|
|
var m runtime.MemStats
|
|
dbg.ReadMemStats(&m)
|
|
remaining := headerProgress - committed
|
|
logger.Info(fmt.Sprintf("[%s] Writing block bodies", logPrefix),
|
|
"block_num", committed,
|
|
"remaining", remaining,
|
|
"alloc", libcommon.ByteCount(m.Alloc),
|
|
"sys", libcommon.ByteCount(m.Sys),
|
|
)
|
|
}
|