erigon-pulse/turbo/engineapi/engine_block_downloader/block_downloader.go

255 lines
8.2 KiB
Go
Raw Normal View History

package engine_block_downloader
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/gointerfaces/execution"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/adapter"
"github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_chain_reader.go"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/stages/bodydownload"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
)
const (
logInterval = 30 * time.Second
requestLoopCutOff int = 1
)
type RequestBodyFunction func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool)
// EngineBlockDownloader is responsible to download blocks in reverse, and then insert them in the database.
type EngineBlockDownloader struct {
ctx context.Context
// downloaders
hd *headerdownload.HeaderDownload
bd *bodydownload.BodyDownload
bodyReqSend RequestBodyFunction
// current status of the downloading process, aka: is it doing anything?
status atomic.Value // it is a headerdownload.SyncStatus
// data reader
blockPropagator adapter.BlockPropagator
blockReader services.FullBlockReader
db kv.RoDB
// Execution module
chainRW eth1_chain_reader.ChainReaderWriterEth1
// Misc
tmpdir string
timeout int
config *chain.Config
// lock
lock sync.Mutex
// logs
logger log.Logger
}
func NewEngineBlockDownloader(ctx context.Context, logger log.Logger, hd *headerdownload.HeaderDownload, executionClient execution.ExecutionClient,
bd *bodydownload.BodyDownload, blockPropagator adapter.BlockPropagator,
bodyReqSend RequestBodyFunction, blockReader services.FullBlockReader, db kv.RoDB, config *chain.Config,
tmpdir string, timeout int) *EngineBlockDownloader {
var s atomic.Value
s.Store(headerdownload.Idle)
return &EngineBlockDownloader{
ctx: ctx,
hd: hd,
bd: bd,
db: db,
status: s,
config: config,
tmpdir: tmpdir,
logger: logger,
blockReader: blockReader,
blockPropagator: blockPropagator,
timeout: timeout,
bodyReqSend: bodyReqSend,
chainRW: eth1_chain_reader.NewChainReaderEth1(ctx, config, executionClient, 1000),
}
}
func (e *EngineBlockDownloader) scheduleHeadersDownload(
requestId int,
hashToDownload libcommon.Hash,
heightToDownload uint64,
downloaderTip libcommon.Hash,
) bool {
if e.hd.PosStatus() != headerdownload.Idle {
e.logger.Info("[EngineBlockDownloader] Postponing PoS download since another one is in progress", "height", heightToDownload, "hash", hashToDownload)
return false
}
if heightToDownload == 0 {
e.logger.Info("[EngineBlockDownloader] Downloading PoS headers...", "height", "unknown", "hash", hashToDownload, "requestId", requestId)
} else {
e.logger.Info("[EngineBlockDownloader] Downloading PoS headers...", "height", heightToDownload, "hash", hashToDownload, "requestId", requestId)
}
e.hd.SetRequestId(requestId)
e.hd.SetPoSDownloaderTip(downloaderTip)
e.hd.SetHeaderToDownloadPoS(hashToDownload, heightToDownload)
e.hd.SetPOSSync(true) // This needs to be called after SetHeaderToDownloadPOS because SetHeaderToDownloadPOS sets `posAnchor` member field which is used by ProcessHeadersPOS
//nolint
e.hd.SetHeadersCollector(etl.NewCollector("EngineBlockDownloader", e.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), e.logger))
e.hd.SetPosStatus(headerdownload.Syncing)
return true
}
// waitForEndOfHeadersDownload waits until the download of headers ends and returns the outcome.
func (e *EngineBlockDownloader) waitForEndOfHeadersDownload() headerdownload.SyncStatus {
for e.hd.PosStatus() == headerdownload.Syncing {
time.Sleep(10 * time.Millisecond)
}
return e.hd.PosStatus()
}
// waitForEndOfHeadersDownload waits until the download of headers ends and returns the outcome.
func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uint64, toBlock uint64, fromHash libcommon.Hash, err error) {
var lastValidHash libcommon.Hash
var badChainError error
var foundPow bool
headerLoadFunc := func(key, value []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error {
var h types.Header
// no header to process
if value == nil {
return nil
}
if err := rlp.DecodeBytes(value, &h); err != nil {
return err
}
if badChainError != nil {
e.hd.ReportBadHeaderPoS(h.Hash(), lastValidHash)
return nil
}
lastValidHash = h.ParentHash
// If we are in PoW range then block validation is not required anymore.
if foundPow {
if (fromHash == libcommon.Hash{}) {
fromHash = h.Hash()
fromBlock = h.Number.Uint64()
}
toBlock = h.Number.Uint64()
return saveHeader(tx, &h, h.Hash())
}
foundPow = h.Difficulty.Cmp(libcommon.Big0) != 0
if foundPow {
if (fromHash == libcommon.Hash{}) {
fromHash = h.Hash()
fromBlock = h.Number.Uint64()
}
toBlock = h.Number.Uint64()
return saveHeader(tx, &h, h.Hash())
}
if (fromHash == libcommon.Hash{}) {
fromHash = h.Hash()
fromBlock = h.Number.Uint64()
}
toBlock = h.Number.Uint64()
// Validate state if possible (bodies will be retrieved through body download)
return saveHeader(tx, &h, h.Hash())
}
err = e.hd.HeadersCollector().Load(tx, kv.Headers, headerLoadFunc, etl.TransformArgs{
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
},
})
return
}
func saveHeader(db kv.RwTx, header *types.Header, hash libcommon.Hash) error {
blockHeight := header.Number.Uint64()
// TODO(yperbasis): do we need to check if the header is already inserted (oldH)?
parentTd, err := rawdb.ReadTd(db, header.ParentHash, blockHeight-1)
if err != nil || parentTd == nil {
return fmt.Errorf("[saveHeader] parent's total difficulty not found with hash %x and height %d for header %x %d: %v", header.ParentHash, blockHeight-1, hash, blockHeight, err)
}
td := new(big.Int).Add(parentTd, header.Difficulty)
if err = rawdb.WriteHeader(db, header); err != nil {
return fmt.Errorf("[saveHeader] failed to WriteHeader: %w", err)
}
if err = rawdb.WriteTd(db, hash, blockHeight, td); err != nil {
return fmt.Errorf("[saveHeader] failed to WriteTd: %w", err)
}
if err = rawdb.WriteCanonicalHash(db, hash, blockHeight); err != nil {
return fmt.Errorf("[saveHeader] failed to canonical hash: %w", err)
}
return nil
}
func (e *EngineBlockDownloader) insertHeadersAndBodies(tx kv.Tx, fromBlock uint64, fromHash libcommon.Hash, toBlock uint64) error {
blockBatchSize := 500
blockWrittenLogSize := 20_000
// We divide them in batches
blocksBatch := []*types.Block{}
headersCursors, err := tx.Cursor(kv.Headers)
if err != nil {
return err
}
log.Info("Beginning downloaded blocks insertion")
// Start by seeking headers
for k, v, err := headersCursors.Seek(dbutils.HeaderKey(fromBlock, fromHash)); k != nil; k, v, err = headersCursors.Next() {
if err != nil {
return err
}
if len(blocksBatch) == blockBatchSize {
if err := e.chainRW.InsertBlocksAndWait(blocksBatch); err != nil {
return err
}
blocksBatch = blocksBatch[:0]
}
header := new(types.Header)
if err := rlp.Decode(bytes.NewReader(v), header); err != nil {
e.logger.Error("Invalid block header RLP", "err", err)
return nil
}
number := header.Number.Uint64()
if number > toBlock {
return e.chainRW.InsertBlocksAndWait(blocksBatch)
}
hash := header.Hash()
body, err := rawdb.ReadBodyWithTransactions(tx, hash, number)
if err != nil {
return err
}
if body == nil {
return fmt.Errorf("missing body at block=%d", number)
}
blocksBatch = append(blocksBatch, types.NewBlockFromStorage(hash, header, body.Transactions, nil, body.Withdrawals))
if number%uint64(blockWrittenLogSize) == 0 {
e.logger.Info("[insertHeadersAndBodies] Written blocks", "progress", number, "to", toBlock)
}
}
return e.chainRW.InsertBlocksAndWait(blocksBatch)
}