Consensus separation for Engine API (Working on Sepolia) (#7945)

This makes the experimental consensus separation functional on sepolia.
This commit is contained in:
Giulio rebuffo 2023-07-30 23:35:55 +02:00 committed by GitHub
parent 5693439521
commit 443757edbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 448 additions and 454 deletions

View File

@ -262,8 +262,9 @@ var CheckpointSyncEndpoints = map[NetworkType][]string{
"https://prater-checkpoint-sync.stakely.io/eth/v2/debug/beacon/states/finalized",
},
SepoliaNetwork: {
"https://sepolia.beaconstate.info/eth/v2/debug/beacon/states/finalized",
"https://checkpoint-sync.sepolia.ethpandaops.io/eth/v2/debug/beacon/states/finalized",
"https://beaconstate-sepolia.chainsafe.io/eth/v2/debug/beacon/states/finalized",
// "https://sepolia.beaconstate.info/eth/v2/debug/beacon/states/finalized",
// "https://checkpoint-sync.sepolia.ethpandaops.io/eth/v2/debug/beacon/states/finalized",
},
GnosisNetwork: {
"https://checkpoint.gnosis.gateway.fm/eth/v2/debug/beacon/states/finalized",

View File

@ -158,7 +158,7 @@ func checkPayloadStatus(payloadStatus *engine_types.PayloadStatus) error {
return validationError.Error()
}
if payloadStatus.Status != engine_types.ValidStatus && payloadStatus.Status != engine_types.AcceptedStatus {
if payloadStatus.Status == engine_types.InvalidStatus {
return fmt.Errorf("status: %s", payloadStatus.Status)
}
return nil

View File

@ -42,7 +42,9 @@ func (f *ForkChoiceStore) OnBlock(block *cltypes.SignedBeaconBlock, newPayload,
default:
return fmt.Errorf("replay block, code: %+v", status)
}
if block.Block.Body.ExecutionPayload != nil {
f.eth2Roots.Add(blockRoot, block.Block.Body.ExecutionPayload.BlockHash)
}
var invalidBlock bool
if newPayload && f.engine != nil {
if invalidBlock, err = f.engine.NewPayload(block.Block.Body.ExecutionPayload); err != nil {
@ -53,9 +55,7 @@ func (f *ForkChoiceStore) OnBlock(block *cltypes.SignedBeaconBlock, newPayload,
if invalidBlock {
f.forkGraph.MarkHeaderAsInvalid(blockRoot)
}
if block.Block.Body.ExecutionPayload != nil {
f.eth2Roots.Add(blockRoot, block.Block.Body.ExecutionPayload.BlockHash)
}
if block.Block.Slot > f.highestSeen {
f.highestSeen = block.Block.Slot
}

View File

@ -163,10 +163,11 @@ type Ethereum struct {
sentriesClient *sentry.MultiClient
sentryServers []*sentry.GrpcServer
stagedSync *stagedsync.Sync
syncStages []*stagedsync.Stage
syncUnwindOrder stagedsync.UnwindOrder
syncPruneOrder stagedsync.PruneOrder
stagedSync *stagedsync.Sync
pipelineStagedSync *stagedsync.Sync
syncStages []*stagedsync.Stage
syncUnwindOrder stagedsync.UnwindOrder
syncPruneOrder stagedsync.PruneOrder
downloaderClient proto_downloader.DownloaderClient
@ -572,9 +573,9 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, backend.chainDB, backend.notifications.Events, logger)
pipelineStages := stages2.NewPipelineStages(ctx, chainKv, &ethconfig.Defaults, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.forkValidator, logger)
pipelineStagedSync := stagedsync.New(pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger)
executionRpc := direct.NewExecutionClientDirect(eth1.NewEthereumExecutionModule(blockReader, chainKv, pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, logger, config.HistoryV3))
pipelineStages := stages2.NewPipelineStages(ctx, chainKv, config, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.forkValidator, logger)
backend.pipelineStagedSync = stagedsync.New(pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger)
executionRpc := direct.NewExecutionClientDirect(eth1.NewEthereumExecutionModule(blockReader, chainKv, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, config.HistoryV3))
if config.ExperimentalConsensusSeparation {
log.Info("Using experimental Engine API")
engineBackendRPC := engineapi.NewEngineServerExperimental(
@ -587,7 +588,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
backend.sentriesClient.Hd,
engine_block_downloader.NewEngineBlockDownloader(ctx, logger, executionRpc, backend.sentriesClient.Hd,
backend.sentriesClient.Bd, backend.sentriesClient.BroadcastNewBlock, blockReader, backend.sentriesClient.SendBodyRequest,
tmpdir, config.Sync.BodyDownloadTimeoutSeconds),
chainKv, chainConfig, tmpdir, config.Sync.BodyDownloadTimeoutSeconds),
false,
config.Miner.EnabledPOS)
backend.engineBackendRPC = engineBackendRPC
@ -810,9 +811,7 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error {
return
}
}()
if !config.InternalCL {
go s.engineBackendRPC.Start(httpRpcCfg, ff, stateCache, s.agg, s.engine, ethRpcClient, txPoolRpcClient, miningRpcClient)
}
go s.engineBackendRPC.Start(httpRpcCfg, ff, stateCache, s.agg, s.engine, ethRpcClient, txPoolRpcClient, miningRpcClient)
// Register the backend on the node
stack.RegisterLifecycle(s)
@ -1139,7 +1138,11 @@ func (s *Ethereum) Start() error {
time.Sleep(10 * time.Millisecond) // just to reduce logs order confusion
hook := stages2.NewHook(s.sentryCtx, s.notifications, s.stagedSync, s.blockReader, s.chainConfig, s.logger, s.sentriesClient.UpdateHead)
go stages2.StageLoop(s.sentryCtx, s.chainDB, s.stagedSync, s.sentriesClient.Hd, s.waitForStageLoopStop, s.config.Sync.LoopThrottle, s.logger, s.blockReader, hook)
if s.config.ExperimentalConsensusSeparation {
s.pipelineStagedSync.Run(s.chainDB, nil, true)
} else {
go stages2.StageLoop(s.sentryCtx, s.chainDB, s.stagedSync, s.sentriesClient.Hd, s.waitForStageLoopStop, s.config.Sync.LoopThrottle, s.logger, s.blockReader, hook)
}
return nil
}

2
go.mod
View File

@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
go 1.19
require (
github.com/ledgerwatch/erigon-lib v0.0.0-20230728175300-9bc3cb1e0073
github.com/ledgerwatch/erigon-lib v0.0.0-20230730131916-e95b162b4a45
github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230622075030-1d69651854c2
github.com/ledgerwatch/log/v3 v3.8.0
github.com/ledgerwatch/secp256k1 v1.0.0

4
go.sum
View File

@ -499,8 +499,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20230728175300-9bc3cb1e0073 h1:LEaA9FPbQrN3SgN4nN+lkNid9SM0vsy0iqFPkm1Zg8M=
github.com/ledgerwatch/erigon-lib v0.0.0-20230728175300-9bc3cb1e0073/go.mod h1:KcHpuyVEafIuOM7SZrcIwHfR2EMq2dYPUce5xOm0xk4=
github.com/ledgerwatch/erigon-lib v0.0.0-20230730131916-e95b162b4a45 h1:nb1yrJoXtxW2aEt87VMjKJrur+mZNKRxWL/Y2GA4H1A=
github.com/ledgerwatch/erigon-lib v0.0.0-20230730131916-e95b162b4a45/go.mod h1:KcHpuyVEafIuOM7SZrcIwHfR2EMq2dYPUce5xOm0xk4=
github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230622075030-1d69651854c2 h1:Ls2itRGHMOr2PbHRDA4g1HH8HQdwfJhRVfMPEaLQe94=
github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230622075030-1d69651854c2/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.8.0 h1:gCpp7uGtIerEz1jKVPeDnbIopFPud9ZnCpBLlLBGqPU=

View File

@ -12,16 +12,18 @@ import (
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/gointerfaces/execution"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/adapter"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers"
"github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/stages/bodydownload"
@ -44,8 +46,7 @@ type EngineBlockDownloader struct {
bodyReqSend RequestBodyFunction
// current status of the downloading process, aka: is it doing anything?
status atomic.Value // it is a headerdownload.SyncStatus
startDownloadCh chan downloadRequest
status atomic.Value // it is a headerdownload.SyncStatus
// data reader
blockPropagator adapter.BlockPropagator
@ -58,6 +59,7 @@ type EngineBlockDownloader struct {
// Misc
tmpdir string
timeout int
config *chain.Config
// lock
lock sync.Mutex
@ -68,20 +70,22 @@ type EngineBlockDownloader struct {
func NewEngineBlockDownloader(ctx context.Context, logger log.Logger, executionModule execution.ExecutionClient,
hd *headerdownload.HeaderDownload, bd *bodydownload.BodyDownload, blockPropagator adapter.BlockPropagator,
blockReader services.FullBlockReader, bodyReqSend RequestBodyFunction, tmpdir string, timeout int) *EngineBlockDownloader {
blockReader services.FullBlockReader, bodyReqSend RequestBodyFunction, db kv.RoDB, config *chain.Config,
tmpdir string, timeout int) *EngineBlockDownloader {
var s atomic.Value
s.Store(headerdownload.Idle)
return &EngineBlockDownloader{
ctx: ctx,
hd: hd,
bd: bd,
db: db,
status: s,
config: config,
tmpdir: tmpdir,
logger: logger,
blockPropagator: blockPropagator,
timeout: timeout,
blockReader: blockReader,
startDownloadCh: make(chan downloadRequest),
bodyReqSend: bodyReqSend,
executionModule: executionModule,
}
@ -93,8 +97,6 @@ func (e *EngineBlockDownloader) scheduleHeadersDownload(
heightToDownload uint64,
downloaderTip libcommon.Hash,
) bool {
e.hd.BeaconRequestList.SetStatus(requestId, engine_helpers.DataWasMissing)
if e.hd.PosStatus() != headerdownload.Idle {
e.logger.Info("[EngineBlockDownloader] Postponing PoS download since another one is in progress", "height", heightToDownload, "hash", hashToDownload)
return false
@ -111,8 +113,6 @@ func (e *EngineBlockDownloader) scheduleHeadersDownload(
e.hd.SetHeaderToDownloadPoS(hashToDownload, heightToDownload)
e.hd.SetPOSSync(true) // This needs to be called after SetHeaderToDownloadPOS because SetHeaderToDownloadPOS sets `posAnchor` member field which is used by ProcessHeadersPOS
// headerCollector is closed in saveDownloadedPoSHeaders, thus nolint
//nolint
e.hd.SetHeadersCollector(etl.NewCollector("EngineBlockDownloader", e.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), e.logger))
@ -125,9 +125,7 @@ func (e *EngineBlockDownloader) scheduleHeadersDownload(
func (e *EngineBlockDownloader) waitForEndOfHeadersDownload() headerdownload.SyncStatus {
for e.hd.PosStatus() == headerdownload.Syncing {
time.Sleep(10 * time.Millisecond)
}
return e.hd.PosStatus()
}
@ -151,7 +149,7 @@ func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uin
return nil
}
lastValidHash = h.ParentHash
if err := e.hd.VerifyHeader(&h); err != nil {
if err := e.hd.Engine().VerifyHeader(consensus.ChainReaderImpl{BlockReader: e.blockReader, Db: tx, Cfg: *e.config}, &h, false); err != nil {
e.logger.Warn("Verification failed for header", "hash", h.Hash(), "height", h.Number.Uint64(), "err", err)
badChainError = err
e.hd.ReportBadHeaderPoS(h.Hash(), lastValidHash)
@ -231,6 +229,7 @@ func (e *EngineBlockDownloader) insertHeadersAndBodies(tx kv.Tx, fromBlock uint6
if err != nil {
return err
}
log.Info("Beginning downloaded headers insertion")
// Start by seeking headers
for k, v, err := headersCursors.Seek(dbutils.HeaderKey(fromBlock, fromHash)); k != nil; k, v, err = headersCursors.Next() {
if err != nil {
@ -252,8 +251,10 @@ func (e *EngineBlockDownloader) insertHeadersAndBodies(tx kv.Tx, fromBlock uint6
if err := eth1_utils.InsertHeadersAndWait(e.ctx, e.executionModule, headersBatch); err != nil {
return err
}
log.Info("Beginning downloaded bodies insertion")
// then seek bodies
for k, v, err := bodiesCursors.Seek(dbutils.BlockBodyKey(fromBlock, fromHash)); k != nil; k, v, err = headersCursors.Next() {
for k, _, err := bodiesCursors.Seek(dbutils.BlockBodyKey(fromBlock, fromHash)); k != nil; k, _, err = bodiesCursors.Next() {
if err != nil {
return err
}
@ -265,12 +266,12 @@ func (e *EngineBlockDownloader) insertHeadersAndBodies(tx kv.Tx, fromBlock uint6
blockNumbersBatch = blockNumbersBatch[:0]
blockHashesBatch = blockHashesBatch[:0]
}
if len(v) != 40 {
if len(k) != 40 {
continue
}
blockNumber := binary.BigEndian.Uint64(v[:8])
blockNumber := binary.BigEndian.Uint64(k[:length.BlockNum])
var blockHash libcommon.Hash
copy(blockHash[:], v[8:])
copy(blockHash[:], k[length.BlockNum:])
blockBody, err := rawdb.ReadBodyWithTransactions(tx, blockHash, blockNumber)
if err != nil {
return err

View File

@ -18,7 +18,7 @@ import (
// downloadBodies executes bodies download.
func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(tx kv.RwTx, fromBlock, toBlock uint64) (err error) {
headerProgress := toBlock
bodyProgress := fromBlock
bodyProgress := fromBlock - 1
if err := stages.SaveStageProgress(tx, stages.Bodies, bodyProgress); err != nil {
return err
@ -101,7 +101,6 @@ func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(tx kv.RwTx, fr
toProcess := e.bd.NextProcessingCount()
write := true
for i := uint64(0); i < toProcess; i++ {
select {
case <-logEvery.C:
@ -109,14 +108,12 @@ func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(tx kv.RwTx, fr
default:
}
nextBlock := requestedLow + i
rawBody := e.bd.GetBodyFromCache(nextBlock, write /* delete */)
rawBody := e.bd.GetBodyFromCache(nextBlock, true)
if rawBody == nil {
e.bd.NotDelivered(nextBlock)
write = false
}
if !write {
continue
}
e.bd.NotDelivered(nextBlock)
header, _, err := e.bd.GetHeader(nextBlock, e.blockReader, tx)
if err != nil {

View File

@ -2,89 +2,101 @@ package engine_block_downloader
import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
)
type downloadRequest struct {
hashToDownload libcommon.Hash
downloaderTip libcommon.Hash
requestId int
}
func (e *EngineBlockDownloader) Loop() {
for {
select {
case req := <-e.startDownloadCh:
/* Start download process*/
// First we schedule the headers download process
if !e.scheduleHeadersDownload(req.requestId, req.hashToDownload, 0, req.downloaderTip) {
e.logger.Warn("[EngineBlockDownloader] could not begin header download")
// could it be scheduled? if not nevermind.
e.status.Store(headerdownload.Idle)
continue
}
// see the outcome of header download
headersStatus := e.waitForEndOfHeadersDownload()
if headersStatus != engine_helpers.Synced {
// Could not sync. Set to idle
e.status.Store(headerdownload.Idle)
e.logger.Warn("[EngineBlockDownloader] Header download did not yield success")
continue
}
tx, err := e.db.BeginRo(e.ctx)
if err != nil {
e.logger.Warn("[EngineBlockDownloader] Could not begin tx: %s", err)
e.status.Store(headerdownload.Idle)
continue
}
memoryMutation := memdb.NewMemoryBatch(tx, e.tmpdir)
defer memoryMutation.Rollback()
startBlock, endBlock, startHash, err := e.loadDownloadedHeaders(memoryMutation)
if err != nil {
e.status.Store(headerdownload.Idle)
continue
}
if err := e.downloadAndLoadBodiesSyncronously(memoryMutation, startBlock, endBlock); err != nil {
e.logger.Warn("[EngineBlockDownloader] Could not download bodies: %s", err)
e.status.Store(headerdownload.Idle)
continue
}
tx.Rollback() // Discard the original db tx
if err := e.insertHeadersAndBodies(memoryMutation.MemTx(), startBlock, startHash); err != nil {
e.logger.Warn("[EngineBlockDownloader] Could not insert headers and bodies: %s", err)
e.status.Store(headerdownload.Idle)
continue
}
e.status.Store(headerdownload.Idle)
case <-e.ctx.Done():
return
}
// download is the process that reverse download a specific block hash.
func (e *EngineBlockDownloader) download(hashToDownload libcommon.Hash, downloaderTip libcommon.Hash, requestId int, block *types.Block) {
/* Start download process*/
// First we schedule the headers download process
if !e.scheduleHeadersDownload(requestId, hashToDownload, 0, downloaderTip) {
e.logger.Warn("[EngineBlockDownloader] could not begin header download")
// could it be scheduled? if not nevermind.
e.status.Store(headerdownload.Idle)
return
}
// see the outcome of header download
headersStatus := e.waitForEndOfHeadersDownload()
if headersStatus != headerdownload.Synced {
// Could not sync. Set to idle
e.logger.Warn("[EngineBlockDownloader] Header download did not yield success")
e.status.Store(headerdownload.Idle)
return
}
e.hd.SetPosStatus(headerdownload.Idle)
tx, err := e.db.BeginRo(e.ctx)
if err != nil {
e.logger.Warn("[EngineBlockDownloader] Could not begin tx: %s", err)
e.status.Store(headerdownload.Idle)
return
}
defer tx.Rollback()
tmpDb, err := mdbx.NewTemporaryMdbx()
if err != nil {
e.logger.Warn("[EngineBlockDownloader] Could create temporary mdbx", "err", err)
e.status.Store(headerdownload.Idle)
return
}
defer tmpDb.Close()
tmpTx, err := tmpDb.BeginRw(e.ctx)
if err != nil {
e.logger.Warn("[EngineBlockDownloader] Could create temporary mdbx", "err", err)
e.status.Store(headerdownload.Idle)
return
}
defer tmpTx.Rollback()
memoryMutation := memdb.NewMemoryBatchWithCustomDB(tx, tmpDb, tmpTx, e.tmpdir)
defer memoryMutation.Rollback()
startBlock, endBlock, startHash, err := e.loadDownloadedHeaders(memoryMutation)
if err != nil {
e.logger.Warn("[EngineBlockDownloader] Could load headers", "err", err)
e.status.Store(headerdownload.Idle)
return
}
// bodiesCollector := etl.NewCollector("EngineBlockDownloader", e.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), e.logger)
if err := e.downloadAndLoadBodiesSyncronously(memoryMutation, startBlock, endBlock); err != nil {
e.logger.Warn("[EngineBlockDownloader] Could not download bodies", "err", err)
e.status.Store(headerdownload.Idle)
return
}
tx.Rollback() // Discard the original db tx
if err := e.insertHeadersAndBodies(tmpTx, startBlock, startHash); err != nil {
e.logger.Warn("[EngineBlockDownloader] Could not insert headers and bodies", "err", err)
e.status.Store(headerdownload.Idle)
return
}
if block != nil {
// Can fail, not an issue in this case.
eth1_utils.InsertHeaderAndBodyAndWait(e.ctx, e.executionModule, block.Header(), block.RawBody())
}
e.status.Store(headerdownload.Synced)
e.logger.Info("[EngineBlockDownloader] Finished downloading blocks", "from", startBlock-1, "to", endBlock)
}
// StartDownloading triggers the download process and returns true if the process started or false if it could not.
func (e *EngineBlockDownloader) StartDownloading(requestId int, hashToDownload libcommon.Hash, downloaderTip libcommon.Hash) bool {
// blockTip is optional and should be the block tip of the download request. which will be inserted at the end of the procedure if specified.
func (e *EngineBlockDownloader) StartDownloading(requestId int, hashToDownload libcommon.Hash, downloaderTip libcommon.Hash, blockTip *types.Block) bool {
e.lock.Lock()
defer e.lock.Unlock()
if e.status.Load() != headerdownload.Idle {
if e.status.Load() == headerdownload.Syncing {
return false
}
e.status.Store(headerdownload.Syncing)
e.startDownloadCh <- downloadRequest{
requestId: requestId,
hashToDownload: hashToDownload,
downloaderTip: downloaderTip,
}
go e.download(hashToDownload, downloaderTip, requestId, blockTip)
return true
}
func (e *EngineBlockDownloader) Status() headerdownload.SyncStatus {
e.lock.Lock()
defer e.lock.Unlock()
return e.status.Load().(headerdownload.SyncStatus)
return headerdownload.SyncStatus(e.status.Load().(int))
}

View File

@ -8,105 +8,17 @@ import (
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_types"
"github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_chain_reader.go"
"github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
)
func (e *EngineServerExperimental) StartEngineMessageHandler() {
go func() {
for {
interrupted, err := e.engineMessageHandler()
if interrupted {
return
}
if err != nil {
e.logger.Error("[EngineServer] error received", "err", err)
}
}
}()
}
// This loop is responsible for engine POS replacement.
func (e *EngineServerExperimental) engineMessageHandler() (bool, error) {
logPrefix := "EngineApi"
e.hd.SetPOSSync(true)
syncing := e.blockDownloader.Status() != headerdownload.Idle
if !syncing {
e.logger.Info(fmt.Sprintf("[%s] Waiting for Consensus Layer...", logPrefix))
}
interrupt, requestId, requestWithStatus := e.hd.BeaconRequestList.WaitForRequest(syncing, e.test)
chainReader := eth1_chain_reader.NewChainReaderEth1(e.ctx, e.config, e.executionService)
e.hd.SetHeaderReader(chainReader)
interrupted, err := e.handleInterrupt(interrupt)
if err != nil {
return false, err
}
if interrupted {
return true, nil
}
if requestWithStatus == nil {
e.logger.Warn(fmt.Sprintf("[%s] Nil beacon request. Should only happen in tests", logPrefix))
return false, nil
}
request := requestWithStatus.Message
requestStatus := requestWithStatus.Status
// Decide what kind of action we need to take place
forkChoiceMessage, forkChoiceInsteadOfNewPayload := request.(*engine_types.ForkChoiceState)
e.hd.ClearPendingPayloadHash()
e.hd.SetPendingPayloadStatus(nil)
var payloadStatus *engine_types.PayloadStatus
if forkChoiceInsteadOfNewPayload {
payloadStatus, err = e.handlesForkChoice("ForkChoiceUpdated", chainReader, forkChoiceMessage, requestId)
} else {
payloadMessage := request.(*types.Block)
payloadStatus, err = e.handleNewPayload("NewPayload", payloadMessage, requestStatus, requestId, chainReader)
}
if err != nil {
if requestStatus == engine_helpers.New {
e.hd.PayloadStatusCh <- engine_types.PayloadStatus{CriticalError: err}
}
return false, err
}
if requestStatus == engine_helpers.New && payloadStatus != nil {
if payloadStatus.Status == engine_types.SyncingStatus || payloadStatus.Status == engine_types.AcceptedStatus {
e.hd.PayloadStatusCh <- *payloadStatus
} else {
// Let the stage loop run to the end so that the transaction is committed prior to replying to CL
e.hd.SetPendingPayloadStatus(payloadStatus)
}
}
return false, nil
}
func (e *EngineServerExperimental) handleInterrupt(interrupt engine_helpers.Interrupt) (bool, error) {
if interrupt != engine_helpers.None {
if interrupt == engine_helpers.Stopping {
close(e.hd.ShutdownCh)
return false, fmt.Errorf("server is stopping")
}
return true, nil
}
return false, nil
}
const fcuTimeout = 1000 // according to mathematics: 1000 millisecods = 1 second
func (e *EngineServerExperimental) handleNewPayload(
logPrefix string,
block *types.Block,
requestStatus engine_helpers.RequestStatus,
requestId int,
chainReader consensus.ChainHeaderReader,
) (*engine_types.PayloadStatus, error) {
header := block.Header()
@ -115,28 +27,28 @@ func (e *EngineServerExperimental) handleNewPayload(
e.logger.Info(fmt.Sprintf("[%s] Handling new payload", logPrefix), "height", headerNumber, "hash", headerHash)
parent := chainReader.GetHeaderByHash(header.ParentHash)
currentHeader := chainReader.CurrentHeader()
var currentHeadNumber *uint64
if currentHeader != nil {
currentHeadNumber = new(uint64)
*currentHeadNumber = currentHeader.Number.Uint64()
}
parent := chainReader.GetHeader(header.ParentHash, headerNumber-1)
if parent == nil {
e.logger.Debug(fmt.Sprintf("[%s] New payload: need to download parent", logPrefix), "height", headerNumber, "hash", headerHash, "parentHash", header.ParentHash)
if e.test {
e.hd.BeaconRequestList.Remove(requestId)
return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil
}
if !e.blockDownloader.StartDownloading(requestId, header.ParentHash, headerHash) {
if !e.blockDownloader.StartDownloading(0, header.ParentHash, headerHash, block) {
return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil
}
currentHeader := chainReader.CurrentHeader()
var currentHeadNumber *uint64
if currentHeader != nil {
currentHeadNumber = new(uint64)
*currentHeadNumber = currentHeader.Number.Uint64()
}
if currentHeadNumber != nil && math.AbsoluteDifference(*currentHeadNumber, headerNumber) < 32 {
if currentHeadNumber != nil {
// We try waiting until we finish downloading the PoS blocks if the distance from the head is enough,
// so that we will perform full validation.
success := false
for i := 0; i < 10; i++ {
for i := 0; i < 100; i++ {
time.Sleep(10 * time.Millisecond)
if e.blockDownloader.Status() == headerdownload.Synced {
success = true
@ -150,13 +62,14 @@ func (e *EngineServerExperimental) handleNewPayload(
return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil
}
}
e.hd.BeaconRequestList.Remove(requestId)
// Save header and body
if err := eth1_utils.InsertHeaderAndBodyAndWait(e.ctx, e.executionService, header, block.RawBody()); err != nil {
return nil, err
}
if math.AbsoluteDifference(*currentHeadNumber, headerNumber) >= 32 {
return &engine_types.PayloadStatus{Status: engine_types.AcceptedStatus}, nil
}
e.logger.Debug(fmt.Sprintf("[%s] New payload begin verification", logPrefix))
status, latestValidHash, err := eth1_utils.ValidateChain(e.ctx, e.executionService, headerHash, headerNumber)
e.logger.Debug(fmt.Sprintf("[%s] New payload verification ended", logPrefix), "status", status.String(), "err", err)
@ -197,12 +110,13 @@ func (e *EngineServerExperimental) handlesForkChoice(
return nil, err
}
// We do not have header, download.
if headerNumber == nil {
e.logger.Debug(fmt.Sprintf("[%s] Fork choice: need to download header with hash %x", logPrefix, headerHash))
if e.test {
e.hd.BeaconRequestList.Remove(requestId)
} else {
e.blockDownloader.StartDownloading(requestId, headerHash, headerHash)
e.blockDownloader.StartDownloading(requestId, headerHash, headerHash, nil)
}
return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil
}
@ -214,14 +128,14 @@ func (e *EngineServerExperimental) handlesForkChoice(
if e.test {
e.hd.BeaconRequestList.Remove(requestId)
} else {
e.blockDownloader.StartDownloading(requestId, headerHash, headerHash)
e.blockDownloader.StartDownloading(requestId, headerHash, headerHash, nil)
}
return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil
}
e.hd.BeaconRequestList.Remove(requestId)
// Call forkchoice here
status, latestValidHash, err := eth1_utils.UpdateForkChoice(e.ctx, e.executionService, forkChoice.HeadHash, forkChoice.SafeBlockHash, forkChoice.FinalizedBlockHash, 100)
status, latestValidHash, err := eth1_utils.UpdateForkChoice(e.ctx, e.executionService, forkChoice.HeadHash, forkChoice.SafeBlockHash, forkChoice.FinalizedBlockHash, fcuTimeout)
if err != nil {
return nil, err
}

View File

@ -36,6 +36,7 @@ import (
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_block_downloader"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_types"
"github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_chain_reader.go"
"github.com/ledgerwatch/erigon/turbo/jsonrpc"
"github.com/ledgerwatch/erigon/turbo/rpchelper"
"github.com/ledgerwatch/erigon/turbo/services"
@ -81,8 +82,9 @@ func (e *EngineServerExperimental) Start(httpConfig httpcfg.HttpCfg,
base := jsonrpc.NewBaseApi(filters, stateCache, e.blockReader, agg, httpConfig.WithDatadir, httpConfig.EvmCallTimeout, engineReader, httpConfig.Dirs)
ethImpl := jsonrpc.NewEthAPI(base, e.db, eth, txPool, mining, httpConfig.Gascap, httpConfig.ReturnDataLimit, e.logger)
// engineImpl := NewEngineAPI(base, db, engineBackend)
// engineImpl := NewEngineAPI(base, db, engineBackend)
// e.startEngineMessageHandler()
apiList := []rpc.API{
{
Namespace: "eth",
@ -240,14 +242,18 @@ func (s *EngineServerExperimental) newPayload(ctx context.Context, req *engine_t
block := types.NewBlockFromStorage(blockHash, &header, transactions, nil /* uncles */, withdrawals)
s.hd.BeaconRequestList.AddPayloadRequest(block)
payloadStatus := <-s.hd.PayloadStatusCh
chainReader := eth1_chain_reader.NewChainReaderEth1(s.ctx, s.config, s.executionService)
payloadStatus, err := s.handleNewPayload("NewPayload", block, chainReader)
if err != nil {
return nil, err
}
s.logger.Debug("[NewPayload] got reply", "payloadStatus", payloadStatus)
if payloadStatus.CriticalError != nil {
return nil, payloadStatus.CriticalError
}
return &payloadStatus, nil
return payloadStatus, nil
}
// Check if we can quickly determine the status of a newPayload or forkchoiceUpdated.
@ -351,12 +357,12 @@ func (s *EngineServerExperimental) getQuickPayloadStatusIfPossible(blockHash lib
return &engine_types.PayloadStatus{Status: engine_types.ValidStatus, LatestValidHash: &blockHash}, nil
}
if parent == nil && s.hd.PosStatus() != headerdownload.Idle {
if parent == nil && s.hd.PosStatus() == headerdownload.Syncing {
s.logger.Debug(fmt.Sprintf("[%s] Downloading some other PoS blocks", prefix), "hash", blockHash)
return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil
}
} else {
if header == nil && s.hd.PosStatus() != headerdownload.Idle {
if header == nil && s.hd.PosStatus() == headerdownload.Syncing {
s.logger.Debug(fmt.Sprintf("[%s] Downloading some other PoS stuff", prefix), "hash", blockHash)
return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil
}
@ -374,12 +380,6 @@ func (s *EngineServerExperimental) getQuickPayloadStatusIfPossible(blockHash lib
}
}
// If another payload is already commissioned then we just reply with syncing
if s.stageLoopIsBusy() {
s.logger.Debug(fmt.Sprintf("[%s] stage loop is busy", prefix))
return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil
}
return nil, nil
}
@ -425,26 +425,22 @@ func (s *EngineServerExperimental) getPayload(ctx context.Context, payloadId uin
// engineForkChoiceUpdated either states new block head or request the assembling of a new block
func (s *EngineServerExperimental) forkchoiceUpdated(ctx context.Context, forkchoiceState *engine_types.ForkChoiceState, payloadAttributes *engine_types.PayloadAttributes, version clparams.StateVersion,
) (*engine_types.ForkChoiceUpdatedResponse, error) {
forkChoice := &engine_types.ForkChoiceState{
HeadHash: forkchoiceState.HeadHash,
SafeBlockHash: forkchoiceState.SafeBlockHash,
FinalizedBlockHash: forkchoiceState.FinalizedBlockHash,
}
status, err := s.getQuickPayloadStatusIfPossible(forkChoice.HeadHash, 0, libcommon.Hash{}, forkChoice, false)
status, err := s.getQuickPayloadStatusIfPossible(forkchoiceState.HeadHash, 0, libcommon.Hash{}, forkchoiceState, false)
if err != nil {
return nil, err
}
s.lock.Lock()
defer s.lock.Unlock()
if status == nil {
s.logger.Debug("[ForkChoiceUpdated] sending forkChoiceMessage", "head", forkChoice.HeadHash)
s.hd.BeaconRequestList.AddForkChoiceRequest(forkChoice)
chainReader := eth1_chain_reader.NewChainReaderEth1(s.ctx, s.config, s.executionService)
statusDeref := <-s.hd.PayloadStatusCh
status = &statusDeref
if status == nil {
s.logger.Debug("[ForkChoiceUpdated] sending forkChoiceMessage", "head", forkchoiceState.HeadHash)
status, err = s.handlesForkChoice("ForkChoiceUpdated", chainReader, forkchoiceState, 0)
if err != nil {
return nil, err
}
s.logger.Debug("[ForkChoiceUpdated] got reply", "payloadStatus", status)
if status.CriticalError != nil {
@ -471,7 +467,7 @@ func (s *EngineServerExperimental) forkchoiceUpdated(ctx context.Context, forkch
headHeader := rawdb.ReadHeader(tx2, headHash, *headNumber)
tx2.Rollback()
if headHeader.Hash() != forkChoice.HeadHash {
if headHeader.Hash() != forkchoiceState.HeadHash {
// Per Item 2 of https://github.com/ethereum/execution-apis/blob/v1.0.0-alpha.9/src/engine/specification.md#specification-1:
// Client software MAY skip an update of the forkchoice state and
// MUST NOT begin a payload build process if forkchoiceState.headBlockHash doesn't reference a leaf of the block tree.
@ -480,7 +476,7 @@ func (s *EngineServerExperimental) forkchoiceUpdated(ctx context.Context, forkch
// {payloadStatus: {status: VALID, latestValidHash: forkchoiceState.headBlockHash, validationError: null}, payloadId: null}.
s.logger.Warn("Skipping payload building because forkchoiceState.headBlockHash is not the head of the canonical chain",
"forkChoice.HeadBlockHash", forkChoice.HeadHash, "headHeader.Hash", headHeader.Hash())
"forkChoice.HeadBlockHash", forkchoiceState.HeadHash, "headHeader.Hash", headHeader.Hash())
return &engine_types.ForkChoiceUpdatedResponse{PayloadStatus: status}, nil
}
@ -489,7 +485,7 @@ func (s *EngineServerExperimental) forkchoiceUpdated(ctx context.Context, forkch
}
req := &execution.AssembleBlockRequest{
ParentHash: gointerfaces.ConvertHashToH256(forkChoice.HeadHash),
ParentHash: gointerfaces.ConvertHashToH256(forkchoiceState.HeadHash),
Timestamp: uint64(payloadAttributes.Timestamp),
MixDigest: gointerfaces.ConvertHashToH256(payloadAttributes.PrevRandao),
SuggestedFeeRecipent: gointerfaces.ConvertAddressToH160(payloadAttributes.SuggestedFeeRecipient),

View File

@ -11,6 +11,7 @@ import (
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils"
"github.com/ledgerwatch/log/v3"
"google.golang.org/protobuf/types/known/emptypb"
)
type ChainReaderEth1 struct {
@ -32,7 +33,20 @@ func (c ChainReaderEth1) Config() *chain.Config {
}
func (c ChainReaderEth1) CurrentHeader() *types.Header {
panic("ChainReaderEth1.CurrentHeader not implemented")
resp, err := c.executionModule.CurrentHeader(c.ctx, &emptypb.Empty{})
if err != nil {
log.Error("GetHeader failed", "err", err)
return nil
}
if resp == nil || resp.Header == nil {
return nil
}
ret, err := eth1_utils.HeaderRpcToHeader(resp.Header)
if err != nil {
log.Error("GetHeader decoding", "err", err)
return nil
}
return ret
}
func (ChainReaderEth1) FrozenBlocks() uint64 {

View File

@ -1,62 +0,0 @@
package eth1_test
import (
"context"
"math/big"
"testing"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/execution"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/turbo/execution/eth1"
"github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils"
"github.com/stretchr/testify/require"
)
func TestInsertGetterHeader(t *testing.T) {
bn := uint64(2)
header := &types.Header{
Difficulty: big.NewInt(0),
Number: big.NewInt(int64(bn)),
}
db := memdb.NewTestDB(t)
tx, _ := db.BeginRw(context.TODO())
rawdb.WriteTd(tx, libcommon.Hash{}, 1, libcommon.Big0)
tx.Commit()
e := eth1.NewEthereumExecutionModule(nil, db, nil, nil, nil, nil, nil, false)
_, err := e.InsertHeaders(context.TODO(), &execution.InsertHeadersRequest{
Headers: []*execution.Header{
eth1_utils.HeaderToHeaderRPC(header),
}})
require.NoError(t, err)
resp, err := e.GetHeader(context.TODO(), &execution.GetSegmentRequest{
BlockHash: gointerfaces.ConvertHashToH256(header.Hash()),
BlockNumber: &bn,
})
require.NoError(t, err)
require.Equal(t, resp.Header.BlockNumber, bn)
}
func TestInsertGetterBody(t *testing.T) {
bn := uint64(2)
bhash := libcommon.Hash{1}
txs := [][]byte{{1}}
body := &types.RawBody{
Transactions: txs,
}
e := eth1.NewEthereumExecutionModule(nil, memdb.NewTestDB(t), nil, nil, nil, nil, nil, false)
_, err := e.InsertBodies(context.TODO(), &execution.InsertBodiesRequest{
Bodies: []*execution.BlockBody{
eth1_utils.ConvertRawBlockBodyToRpc(body, bn, bhash),
}})
require.NoError(t, err)
resp, err := e.GetBody(context.TODO(), &execution.GetSegmentRequest{
BlockHash: gointerfaces.ConvertHashToH256(bhash),
BlockNumber: &bn,
})
require.NoError(t, err)
require.Equal(t, resp.Body.BlockHash, gointerfaces.ConvertHashToH256(bhash))
}

View File

@ -8,8 +8,6 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/execution"
types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv/rawdbv3"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/semaphore"
@ -18,11 +16,11 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/turbo/builder"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_types"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
)
// EthereumExecutionModule describes ethereum execution logic and indexing.
@ -43,6 +41,10 @@ type EthereumExecutionModule struct {
builderFunc builder.BlockBuilderFunc
builders map[uint64]*builder.BlockBuilder
// Changes accumulator
accumulator *shards.Accumulator
stateChangeConsumer shards.StateChangeConsumer
// configuration
config *chain.Config
historyV3 bool
@ -51,17 +53,19 @@ type EthereumExecutionModule struct {
}
func NewEthereumExecutionModule(blockReader services.FullBlockReader, db kv.RwDB, executionPipeline *stagedsync.Sync, forkValidator *engine_helpers.ForkValidator,
config *chain.Config, builderFunc builder.BlockBuilderFunc, logger log.Logger, historyV3 bool) *EthereumExecutionModule {
config *chain.Config, builderFunc builder.BlockBuilderFunc, accumulator *shards.Accumulator, stateChangeConsumer shards.StateChangeConsumer, logger log.Logger, historyV3 bool) *EthereumExecutionModule {
return &EthereumExecutionModule{
blockReader: blockReader,
db: db,
executionPipeline: executionPipeline,
logger: logger,
forkValidator: forkValidator,
builders: make(map[uint64]*builder.BlockBuilder),
builderFunc: builderFunc,
config: config,
semaphore: semaphore.NewWeighted(1),
blockReader: blockReader,
db: db,
executionPipeline: executionPipeline,
logger: logger,
forkValidator: forkValidator,
builders: make(map[uint64]*builder.BlockBuilder),
builderFunc: builderFunc,
config: config,
semaphore: semaphore.NewWeighted(1),
accumulator: accumulator,
stateChangeConsumer: stateChangeConsumer,
}
}
@ -94,126 +98,6 @@ func (e *EthereumExecutionModule) canonicalHash(ctx context.Context, tx kv.Tx, b
// Remaining
func (e *EthereumExecutionModule) UpdateForkChoice(ctx context.Context, req *execution.ForkChoice) (*execution.ForkChoiceReceipt, error) {
type canonicalEntry struct {
hash libcommon.Hash
number uint64
}
if !e.semaphore.TryAcquire(1) {
return &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
Status: execution.ValidationStatus_Busy,
}, nil
}
defer e.semaphore.Release(1)
tx, err := e.db.BeginRw(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()
// defer e.forkValidator.ClearWithUnwind(tx, e.notifications.Accumulator, e.notifications.StateChangesConsumer)
blockHash := gointerfaces.ConvertH256ToHash(req.HeadBlockHash)
// Step one, find reconnection point, and mark all of those headers as canonical.
fcuHeader, err := e.blockReader.HeaderByHash(ctx, tx, blockHash)
if err != nil {
return nil, err
}
// If we dont have it, too bad
if fcuHeader == nil {
return &execution.ForkChoiceReceipt{
Status: execution.ValidationStatus_MissingSegment,
LatestValidHash: &types2.H256{},
}, nil
}
currentParentHash := fcuHeader.ParentHash
currentParentNumber := fcuHeader.Number.Uint64() - 1
isCanonicalHash, err := rawdb.IsCanonicalHash(tx, currentParentHash, currentParentNumber)
if err != nil {
return nil, err
}
// Find such point, and collect all hashes
newCanonicals := make([]*canonicalEntry, 0, 2048)
newCanonicals = append(newCanonicals, &canonicalEntry{
hash: fcuHeader.Hash(),
number: fcuHeader.Number.Uint64(),
})
for !isCanonicalHash {
newCanonicals = append(newCanonicals, &canonicalEntry{
hash: currentParentHash,
number: currentParentNumber,
})
currentHeader, err := e.blockReader.Header(ctx, tx, currentParentHash, currentParentNumber)
if err != nil {
return nil, err
}
if currentHeader == nil {
return &execution.ForkChoiceReceipt{
Status: execution.ValidationStatus_MissingSegment,
LatestValidHash: &types2.H256{},
}, nil
}
currentParentHash = currentHeader.ParentHash
currentParentNumber = currentHeader.Number.Uint64() - 1
isCanonicalHash, err = rawdb.IsCanonicalHash(tx, currentParentHash, currentParentNumber)
if err != nil {
return nil, err
}
}
if currentParentNumber != fcuHeader.Number.Uint64()-1 {
e.executionPipeline.UnwindTo(currentParentNumber, libcommon.Hash{})
if e.historyV3 {
if err := rawdbv3.TxNums.Truncate(tx, currentParentNumber); err != nil {
return nil, err
}
}
}
// Run the unwind
if err := e.executionPipeline.RunUnwind(e.db, tx); err != nil {
return nil, err
}
// Mark all new canonicals as canonicals
for _, canonicalSegment := range newCanonicals {
if err := rawdb.WriteCanonicalHash(tx, canonicalSegment.hash, canonicalSegment.number); err != nil {
return nil, err
}
if e.historyV3 {
if err := rawdb.AppendCanonicalTxNums(tx, canonicalSegment.number); err != nil {
return nil, err
}
}
}
// Set Progress for headers and bodies accordingly.
if err := stages.SaveStageProgress(tx, stages.Headers, fcuHeader.Number.Uint64()); err != nil {
return nil, err
}
if err := stages.SaveStageProgress(tx, stages.Bodies, fcuHeader.Number.Uint64()); err != nil {
return nil, err
}
if err = rawdb.WriteHeadHeaderHash(tx, blockHash); err != nil {
return nil, err
}
// Run the forkchoice
if err := e.executionPipeline.Run(e.db, tx, false); err != nil {
return nil, err
}
// if head hash was set then success otherwise no
headHash := rawdb.ReadHeadBlockHash(tx)
headNumber := rawdb.ReadHeaderNumber(tx, headHash)
if headNumber != nil && e.logger != nil {
e.logger.Info("Current forkchoice", "hash", headHash, "number", *headNumber)
}
status := execution.ValidationStatus_Success
if headHash != blockHash {
status = execution.ValidationStatus_BadBlock
}
return &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(headHash),
Status: status,
}, tx.Commit()
}
func (e *EthereumExecutionModule) ValidateChain(ctx context.Context, req *execution.ValidationRequest) (*execution.ValidationReceipt, error) {
if !e.semaphore.TryAcquire(1) {
return &execution.ValidationReceipt{
@ -227,6 +111,7 @@ func (e *EthereumExecutionModule) ValidateChain(ctx context.Context, req *execut
return nil, err
}
defer tx.Rollback()
e.forkValidator.ClearWithUnwind(tx, e.accumulator, e.stateChangeConsumer)
blockHash := gointerfaces.ConvertH256ToHash(req.Hash)
header, err := e.blockReader.Header(ctx, tx, blockHash, req.Number)
if err != nil {
@ -237,6 +122,7 @@ func (e *EthereumExecutionModule) ValidateChain(ctx context.Context, req *execut
if err != nil {
return nil, err
}
if header == nil || body == nil {
return &execution.ValidationReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
@ -244,6 +130,7 @@ func (e *EthereumExecutionModule) ValidateChain(ctx context.Context, req *execut
ValidationStatus: execution.ValidationStatus_MissingSegment,
}, nil
}
status, lvh, validationError, criticalError := e.forkValidator.ValidatePayload(tx, header, body.RawBody(), false)
if criticalError != nil {
return nil, criticalError

View File

@ -0,0 +1,216 @@
package eth1
import (
"context"
"fmt"
"time"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/execution"
"github.com/ledgerwatch/erigon-lib/kv/rawdbv3"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
)
type forkchoiceOutcome struct {
receipt *execution.ForkChoiceReceipt
err error
}
func sendForkchoiceReceiptWithoutWaiting(ch chan forkchoiceOutcome, receipt *execution.ForkChoiceReceipt) {
select {
case ch <- forkchoiceOutcome{receipt: receipt}:
default:
}
}
func sendForkchoiceErrorWithoutWaiting(ch chan forkchoiceOutcome, err error) {
select {
case ch <- forkchoiceOutcome{err: err}:
default:
}
}
func (e *EthereumExecutionModule) UpdateForkChoice(ctx context.Context, req *execution.ForkChoice) (*execution.ForkChoiceReceipt, error) {
blockHash := gointerfaces.ConvertH256ToHash(req.HeadBlockHash)
safeHash := gointerfaces.ConvertH256ToHash(req.SafeBlockHash)
finalizedHash := gointerfaces.ConvertH256ToHash(req.FinalizedBlockHash)
outcomeCh := make(chan forkchoiceOutcome)
// So we wait at most the amount specified by req.Timeout before just sending out
go e.updateForkChoice(ctx, blockHash, safeHash, finalizedHash, outcomeCh)
fcuTimer := time.NewTimer(time.Duration(req.Timeout) * time.Millisecond)
select {
case <-fcuTimer.C:
e.logger.Debug("treating forkChoiceUpdated as asyncronous as it is taking too long")
return &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
Status: execution.ValidationStatus_Busy,
}, nil
case outcome := <-outcomeCh:
fmt.Println(outcome.receipt)
return outcome.receipt, outcome.err
}
}
func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, blockHash, safeHash, finalizedHash libcommon.Hash, outcomeCh chan forkchoiceOutcome) {
if !e.semaphore.TryAcquire(1) {
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
Status: execution.ValidationStatus_Busy,
})
return
}
defer e.semaphore.Release(1)
type canonicalEntry struct {
hash libcommon.Hash
number uint64
}
tx, err := e.db.BeginRw(ctx)
if err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
defer tx.Rollback()
defer e.forkValidator.ClearWithUnwind(tx, e.accumulator, e.stateChangeConsumer)
// Step one, find reconnection point, and mark all of those headers as canonical.
fcuHeader, err := e.blockReader.HeaderByHash(ctx, tx, blockHash)
if err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
// If we dont have it, too bad
if fcuHeader == nil {
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
Status: execution.ValidationStatus_MissingSegment,
})
return
}
currentParentHash := fcuHeader.ParentHash
currentParentNumber := fcuHeader.Number.Uint64() - 1
isCanonicalHash, err := rawdb.IsCanonicalHash(tx, currentParentHash, currentParentNumber)
if err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
// Find such point, and collect all hashes
newCanonicals := make([]*canonicalEntry, 0, 2048)
newCanonicals = append(newCanonicals, &canonicalEntry{
hash: fcuHeader.Hash(),
number: fcuHeader.Number.Uint64(),
})
for !isCanonicalHash {
newCanonicals = append(newCanonicals, &canonicalEntry{
hash: currentParentHash,
number: currentParentNumber,
})
currentHeader, err := e.blockReader.Header(ctx, tx, currentParentHash, currentParentNumber)
if err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
if currentHeader == nil {
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
Status: execution.ValidationStatus_MissingSegment,
})
return
}
currentParentHash = currentHeader.ParentHash
currentParentNumber = currentHeader.Number.Uint64() - 1
isCanonicalHash, err = rawdb.IsCanonicalHash(tx, currentParentHash, currentParentNumber)
if err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
}
if currentParentNumber != fcuHeader.Number.Uint64()-1 {
e.executionPipeline.UnwindTo(currentParentNumber, libcommon.Hash{})
if e.historyV3 {
if err := rawdbv3.TxNums.Truncate(tx, currentParentNumber); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
}
}
// Run the unwind
if err := e.executionPipeline.RunUnwind(e.db, tx); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
if e.historyV3 {
if err := rawdbv3.TxNums.Truncate(tx, currentParentNumber); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
}
// Mark all new canonicals as canonicals
for _, canonicalSegment := range newCanonicals {
if err := rawdb.WriteCanonicalHash(tx, canonicalSegment.hash, canonicalSegment.number); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
if e.historyV3 {
if err := rawdb.AppendCanonicalTxNums(tx, canonicalSegment.number); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
}
}
// Set Progress for headers and bodies accordingly.
if err := stages.SaveStageProgress(tx, stages.Headers, fcuHeader.Number.Uint64()); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
if err := stages.SaveStageProgress(tx, stages.Bodies, fcuHeader.Number.Uint64()); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
if err = rawdb.WriteHeadHeaderHash(tx, blockHash); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
// Run the forkchoice
if err := e.executionPipeline.Run(e.db, tx, false); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
// if head hash was set then success otherwise no
headHash := rawdb.ReadHeadBlockHash(tx)
headNumber := rawdb.ReadHeaderNumber(tx, headHash)
log := headNumber != nil && e.logger != nil
// Update forks...
rawdb.WriteForkchoiceFinalized(tx, finalizedHash)
rawdb.WriteForkchoiceSafe(tx, safeHash)
rawdb.WriteHeadBlockHash(tx, blockHash)
status := execution.ValidationStatus_Success
if headHash != blockHash {
status = execution.ValidationStatus_BadBlock
if log {
e.logger.Warn("bad forkchoice", "hash", headHash)
}
} else {
if err := tx.Commit(); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
if log {
e.logger.Info("head updated", "hash", headHash, "number", *headNumber)
}
}
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(headHash),
Status: status,
})
}

View File

@ -17,6 +17,8 @@ import (
"github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils"
)
var errNotFound = errors.New("notfound")
func (e *EthereumExecutionModule) parseSegmentRequest(ctx context.Context, tx kv.Tx, req *execution.GetSegmentRequest) (blockHash libcommon.Hash, blockNumber uint64, err error) {
switch {
// Case 1: Only hash is given.
@ -24,7 +26,7 @@ func (e *EthereumExecutionModule) parseSegmentRequest(ctx context.Context, tx kv
blockHash = gointerfaces.ConvertH256ToHash(req.BlockHash)
blockNumberPtr := rawdb.ReadHeaderNumber(tx, blockHash)
if blockNumberPtr == nil {
err = fmt.Errorf("ethereumExecutionModule.parseSegmentRequest: could not read block: non existent index")
err = errNotFound
return
}
blockNumber = *blockNumberPtr
@ -32,7 +34,7 @@ func (e *EthereumExecutionModule) parseSegmentRequest(ctx context.Context, tx kv
blockNumber = *req.BlockNumber
blockHash, err = e.canonicalHash(ctx, tx, blockNumber)
if err != nil {
err = fmt.Errorf("ethereumExecutionModule.parseSegmentRequest: could not read block %d: %s", blockNumber, err)
err = errNotFound
return
}
case req.BlockHash != nil && req.BlockNumber != nil:
@ -54,6 +56,9 @@ func (e *EthereumExecutionModule) GetBody(ctx context.Context, req *execution.Ge
defer tx.Rollback()
blockHash, blockNumber, err := e.parseSegmentRequest(ctx, tx, req)
if err == errNotFound {
return &execution.GetBodyResponse{Body: nil}, nil
}
if err != nil {
return nil, fmt.Errorf("ethereumExecutionModule.GetBody: %s", err)
}
@ -81,6 +86,9 @@ func (e *EthereumExecutionModule) GetHeader(ctx context.Context, req *execution.
defer tx.Rollback()
blockHash, blockNumber, err := e.parseSegmentRequest(ctx, tx, req)
if err == errNotFound {
return &execution.GetHeaderResponse{Header: nil}, nil
}
if err != nil {
return nil, fmt.Errorf("ethereumExecutionModule.GetHeader: %s", err)
}
@ -103,7 +111,7 @@ func (e *EthereumExecutionModule) GetHeaderHashNumber(ctx context.Context, req *
defer tx.Rollback()
blockNumber := rawdb.ReadHeaderNumber(tx, gointerfaces.ConvertH256ToHash(req))
if blockNumber == nil {
return nil, fmt.Errorf("ethereumExecutionModule.parseSegmentRequest: could not read block: non existent index")
return &execution.GetHeaderHashNumberResponse{BlockNumber: nil}, nil
}
return &execution.GetHeaderHashNumberResponse{BlockNumber: blockNumber}, nil
}
@ -117,7 +125,7 @@ func (e *EthereumExecutionModule) CanonicalHash(ctx context.Context, req *types2
blockHash := gointerfaces.ConvertH256ToHash(req)
blockNumber := rawdb.ReadHeaderNumber(tx, blockHash)
if blockNumber == nil {
return nil, fmt.Errorf("ethereumExecutionModule.CanonicalHash: could not read block: non existent index")
return &execution.IsCanonicalResponse{Canonical: false}, nil
}
expectedHash, err := e.canonicalHash(ctx, tx, *blockNumber)
if err != nil {
@ -131,6 +139,7 @@ func (e *EthereumExecutionModule) CurrentHeader(ctx context.Context, _ *emptypb.
if err != nil {
return nil, fmt.Errorf("ethereumExecutionModule.CurrentHeader: could not open database: %s", err)
}
defer tx.Rollback()
hash := rawdb.ReadHeadHeaderHash(tx)
number := rawdb.ReadHeaderNumber(tx, hash)
h, _ := e.blockReader.Header(context.Background(), tx, hash, *number)
@ -151,6 +160,9 @@ func (e *EthereumExecutionModule) GetTD(ctx context.Context, req *execution.GetS
defer tx.Rollback()
blockHash, blockNumber, err := e.parseSegmentRequest(ctx, tx, req)
if err == errNotFound {
return &execution.GetTDResponse{Td: nil}, nil
}
if err != nil {
return nil, fmt.Errorf("ethereumExecutionModule.GetHeader: %s", err)
}

View File

@ -22,17 +22,12 @@ func (e *EthereumExecutionModule) InsertBodies(ctx context.Context, req *executi
return nil, fmt.Errorf("ethereumExecutionModule.InsertBodies: could not begin transaction: %s", err)
}
defer tx.Rollback()
e.forkValidator.ClearWithUnwind(tx, e.accumulator, e.stateChangeConsumer)
for _, grpcBody := range req.Bodies {
var ok bool
if ok, err = rawdb.WriteRawBodyIfNotExists(tx, gointerfaces.ConvertH256ToHash(grpcBody.BlockHash), grpcBody.BlockNumber, eth1_utils.ConvertRawBlockBodyFromRpc(grpcBody)); err != nil {
if _, err = rawdb.WriteRawBodyIfNotExists(tx, gointerfaces.ConvertH256ToHash(grpcBody.BlockHash), grpcBody.BlockNumber, eth1_utils.ConvertRawBlockBodyFromRpc(grpcBody)); err != nil {
return nil, fmt.Errorf("ethereumExecutionModule.InsertBodies: could not insert: %s", err)
}
// TODO: Replace.
if e.historyV3 && ok {
if err := rawdb.AppendCanonicalTxNums(tx, grpcBody.BlockNumber); err != nil {
return nil, fmt.Errorf("ethereumExecutionModule.InsertBodies: could not insert: %s", err)
}
}
}
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("ethereumExecutionModule.InsertBodies: could not commit: %s", err)
@ -55,6 +50,8 @@ func (e *EthereumExecutionModule) InsertHeaders(ctx context.Context, req *execut
return nil, fmt.Errorf("ethereumExecutionModule.InsertHeaders: could not begin transaction: %s", err)
}
defer tx.Rollback()
e.forkValidator.ClearWithUnwind(tx, e.accumulator, e.stateChangeConsumer)
for _, grpcHeader := range req.Headers {
header, err := eth1_utils.HeaderRpcToHeader(grpcHeader)
if err != nil {

View File

@ -240,6 +240,12 @@ func (hd *HeaderDownload) LogAnchorState() {
hd.logAnchorState()
}
func (hd *HeaderDownload) Engine() consensus.Engine {
hd.lock.RLock()
defer hd.lock.RUnlock()
return hd.engine
}
func (hd *HeaderDownload) logAnchorState() {
//nolint:prealloc
var ss []string