diff --git a/cl/clparams/config.go b/cl/clparams/config.go index b82d65f6b..67b64ee4c 100644 --- a/cl/clparams/config.go +++ b/cl/clparams/config.go @@ -262,8 +262,9 @@ var CheckpointSyncEndpoints = map[NetworkType][]string{ "https://prater-checkpoint-sync.stakely.io/eth/v2/debug/beacon/states/finalized", }, SepoliaNetwork: { - "https://sepolia.beaconstate.info/eth/v2/debug/beacon/states/finalized", - "https://checkpoint-sync.sepolia.ethpandaops.io/eth/v2/debug/beacon/states/finalized", + "https://beaconstate-sepolia.chainsafe.io/eth/v2/debug/beacon/states/finalized", + // "https://sepolia.beaconstate.info/eth/v2/debug/beacon/states/finalized", + // "https://checkpoint-sync.sepolia.ethpandaops.io/eth/v2/debug/beacon/states/finalized", }, GnosisNetwork: { "https://checkpoint.gnosis.gateway.fm/eth/v2/debug/beacon/states/finalized", diff --git a/cl/phase1/execution_client/execution_client_rpc.go b/cl/phase1/execution_client/execution_client_rpc.go index 6658920ad..0e1d11231 100644 --- a/cl/phase1/execution_client/execution_client_rpc.go +++ b/cl/phase1/execution_client/execution_client_rpc.go @@ -158,7 +158,7 @@ func checkPayloadStatus(payloadStatus *engine_types.PayloadStatus) error { return validationError.Error() } - if payloadStatus.Status != engine_types.ValidStatus && payloadStatus.Status != engine_types.AcceptedStatus { + if payloadStatus.Status == engine_types.InvalidStatus { return fmt.Errorf("status: %s", payloadStatus.Status) } return nil diff --git a/cl/phase1/forkchoice/on_block.go b/cl/phase1/forkchoice/on_block.go index 5162f5009..08946f16d 100644 --- a/cl/phase1/forkchoice/on_block.go +++ b/cl/phase1/forkchoice/on_block.go @@ -42,7 +42,9 @@ func (f *ForkChoiceStore) OnBlock(block *cltypes.SignedBeaconBlock, newPayload, default: return fmt.Errorf("replay block, code: %+v", status) } - + if block.Block.Body.ExecutionPayload != nil { + f.eth2Roots.Add(blockRoot, block.Block.Body.ExecutionPayload.BlockHash) + } var invalidBlock bool if newPayload && f.engine != nil { if invalidBlock, err = f.engine.NewPayload(block.Block.Body.ExecutionPayload); err != nil { @@ -53,9 +55,7 @@ func (f *ForkChoiceStore) OnBlock(block *cltypes.SignedBeaconBlock, newPayload, if invalidBlock { f.forkGraph.MarkHeaderAsInvalid(blockRoot) } - if block.Block.Body.ExecutionPayload != nil { - f.eth2Roots.Add(blockRoot, block.Block.Body.ExecutionPayload.BlockHash) - } + if block.Block.Slot > f.highestSeen { f.highestSeen = block.Block.Slot } diff --git a/eth/backend.go b/eth/backend.go index 10ad308af..2da9dd70b 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -163,10 +163,11 @@ type Ethereum struct { sentriesClient *sentry.MultiClient sentryServers []*sentry.GrpcServer - stagedSync *stagedsync.Sync - syncStages []*stagedsync.Stage - syncUnwindOrder stagedsync.UnwindOrder - syncPruneOrder stagedsync.PruneOrder + stagedSync *stagedsync.Sync + pipelineStagedSync *stagedsync.Sync + syncStages []*stagedsync.Stage + syncUnwindOrder stagedsync.UnwindOrder + syncPruneOrder stagedsync.PruneOrder downloaderClient proto_downloader.DownloaderClient @@ -572,9 +573,9 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, backend.chainDB, backend.notifications.Events, logger) - pipelineStages := stages2.NewPipelineStages(ctx, chainKv, ðconfig.Defaults, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.forkValidator, logger) - pipelineStagedSync := stagedsync.New(pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger) - executionRpc := direct.NewExecutionClientDirect(eth1.NewEthereumExecutionModule(blockReader, chainKv, pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, logger, config.HistoryV3)) + pipelineStages := stages2.NewPipelineStages(ctx, chainKv, config, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.forkValidator, logger) + backend.pipelineStagedSync = stagedsync.New(pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger) + executionRpc := direct.NewExecutionClientDirect(eth1.NewEthereumExecutionModule(blockReader, chainKv, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, config.HistoryV3)) if config.ExperimentalConsensusSeparation { log.Info("Using experimental Engine API") engineBackendRPC := engineapi.NewEngineServerExperimental( @@ -587,7 +588,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere backend.sentriesClient.Hd, engine_block_downloader.NewEngineBlockDownloader(ctx, logger, executionRpc, backend.sentriesClient.Hd, backend.sentriesClient.Bd, backend.sentriesClient.BroadcastNewBlock, blockReader, backend.sentriesClient.SendBodyRequest, - tmpdir, config.Sync.BodyDownloadTimeoutSeconds), + chainKv, chainConfig, tmpdir, config.Sync.BodyDownloadTimeoutSeconds), false, config.Miner.EnabledPOS) backend.engineBackendRPC = engineBackendRPC @@ -810,9 +811,7 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error { return } }() - if !config.InternalCL { - go s.engineBackendRPC.Start(httpRpcCfg, ff, stateCache, s.agg, s.engine, ethRpcClient, txPoolRpcClient, miningRpcClient) - } + go s.engineBackendRPC.Start(httpRpcCfg, ff, stateCache, s.agg, s.engine, ethRpcClient, txPoolRpcClient, miningRpcClient) // Register the backend on the node stack.RegisterLifecycle(s) @@ -1139,7 +1138,11 @@ func (s *Ethereum) Start() error { time.Sleep(10 * time.Millisecond) // just to reduce logs order confusion hook := stages2.NewHook(s.sentryCtx, s.notifications, s.stagedSync, s.blockReader, s.chainConfig, s.logger, s.sentriesClient.UpdateHead) - go stages2.StageLoop(s.sentryCtx, s.chainDB, s.stagedSync, s.sentriesClient.Hd, s.waitForStageLoopStop, s.config.Sync.LoopThrottle, s.logger, s.blockReader, hook) + if s.config.ExperimentalConsensusSeparation { + s.pipelineStagedSync.Run(s.chainDB, nil, true) + } else { + go stages2.StageLoop(s.sentryCtx, s.chainDB, s.stagedSync, s.sentriesClient.Hd, s.waitForStageLoopStop, s.config.Sync.LoopThrottle, s.logger, s.blockReader, hook) + } return nil } diff --git a/go.mod b/go.mod index b01fe1472..f29d5599c 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon go 1.19 require ( - github.com/ledgerwatch/erigon-lib v0.0.0-20230728175300-9bc3cb1e0073 + github.com/ledgerwatch/erigon-lib v0.0.0-20230730131916-e95b162b4a45 github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230622075030-1d69651854c2 github.com/ledgerwatch/log/v3 v3.8.0 github.com/ledgerwatch/secp256k1 v1.0.0 diff --git a/go.sum b/go.sum index 29a9cd775..0f3659228 100644 --- a/go.sum +++ b/go.sum @@ -499,8 +499,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20230728175300-9bc3cb1e0073 h1:LEaA9FPbQrN3SgN4nN+lkNid9SM0vsy0iqFPkm1Zg8M= -github.com/ledgerwatch/erigon-lib v0.0.0-20230728175300-9bc3cb1e0073/go.mod h1:KcHpuyVEafIuOM7SZrcIwHfR2EMq2dYPUce5xOm0xk4= +github.com/ledgerwatch/erigon-lib v0.0.0-20230730131916-e95b162b4a45 h1:nb1yrJoXtxW2aEt87VMjKJrur+mZNKRxWL/Y2GA4H1A= +github.com/ledgerwatch/erigon-lib v0.0.0-20230730131916-e95b162b4a45/go.mod h1:KcHpuyVEafIuOM7SZrcIwHfR2EMq2dYPUce5xOm0xk4= github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230622075030-1d69651854c2 h1:Ls2itRGHMOr2PbHRDA4g1HH8HQdwfJhRVfMPEaLQe94= github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230622075030-1d69651854c2/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= github.com/ledgerwatch/log/v3 v3.8.0 h1:gCpp7uGtIerEz1jKVPeDnbIopFPud9ZnCpBLlLBGqPU= diff --git a/turbo/engineapi/engine_block_downloader/block_downloader.go b/turbo/engineapi/engine_block_downloader/block_downloader.go index b89688f37..69ffc4027 100644 --- a/turbo/engineapi/engine_block_downloader/block_downloader.go +++ b/turbo/engineapi/engine_block_downloader/block_downloader.go @@ -12,16 +12,18 @@ import ( "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon-lib/chain" libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/length" "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/gointerfaces/execution" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common/dbutils" + "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/adapter" - "github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers" "github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils" "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/stages/bodydownload" @@ -44,8 +46,7 @@ type EngineBlockDownloader struct { bodyReqSend RequestBodyFunction // current status of the downloading process, aka: is it doing anything? - status atomic.Value // it is a headerdownload.SyncStatus - startDownloadCh chan downloadRequest + status atomic.Value // it is a headerdownload.SyncStatus // data reader blockPropagator adapter.BlockPropagator @@ -58,6 +59,7 @@ type EngineBlockDownloader struct { // Misc tmpdir string timeout int + config *chain.Config // lock lock sync.Mutex @@ -68,20 +70,22 @@ type EngineBlockDownloader struct { func NewEngineBlockDownloader(ctx context.Context, logger log.Logger, executionModule execution.ExecutionClient, hd *headerdownload.HeaderDownload, bd *bodydownload.BodyDownload, blockPropagator adapter.BlockPropagator, - blockReader services.FullBlockReader, bodyReqSend RequestBodyFunction, tmpdir string, timeout int) *EngineBlockDownloader { + blockReader services.FullBlockReader, bodyReqSend RequestBodyFunction, db kv.RoDB, config *chain.Config, + tmpdir string, timeout int) *EngineBlockDownloader { var s atomic.Value s.Store(headerdownload.Idle) return &EngineBlockDownloader{ ctx: ctx, hd: hd, bd: bd, + db: db, status: s, + config: config, tmpdir: tmpdir, logger: logger, blockPropagator: blockPropagator, timeout: timeout, blockReader: blockReader, - startDownloadCh: make(chan downloadRequest), bodyReqSend: bodyReqSend, executionModule: executionModule, } @@ -93,8 +97,6 @@ func (e *EngineBlockDownloader) scheduleHeadersDownload( heightToDownload uint64, downloaderTip libcommon.Hash, ) bool { - e.hd.BeaconRequestList.SetStatus(requestId, engine_helpers.DataWasMissing) - if e.hd.PosStatus() != headerdownload.Idle { e.logger.Info("[EngineBlockDownloader] Postponing PoS download since another one is in progress", "height", heightToDownload, "hash", hashToDownload) return false @@ -111,8 +113,6 @@ func (e *EngineBlockDownloader) scheduleHeadersDownload( e.hd.SetHeaderToDownloadPoS(hashToDownload, heightToDownload) e.hd.SetPOSSync(true) // This needs to be called after SetHeaderToDownloadPOS because SetHeaderToDownloadPOS sets `posAnchor` member field which is used by ProcessHeadersPOS - // headerCollector is closed in saveDownloadedPoSHeaders, thus nolint - //nolint e.hd.SetHeadersCollector(etl.NewCollector("EngineBlockDownloader", e.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), e.logger)) @@ -125,9 +125,7 @@ func (e *EngineBlockDownloader) scheduleHeadersDownload( func (e *EngineBlockDownloader) waitForEndOfHeadersDownload() headerdownload.SyncStatus { for e.hd.PosStatus() == headerdownload.Syncing { time.Sleep(10 * time.Millisecond) - } - return e.hd.PosStatus() } @@ -151,7 +149,7 @@ func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uin return nil } lastValidHash = h.ParentHash - if err := e.hd.VerifyHeader(&h); err != nil { + if err := e.hd.Engine().VerifyHeader(consensus.ChainReaderImpl{BlockReader: e.blockReader, Db: tx, Cfg: *e.config}, &h, false); err != nil { e.logger.Warn("Verification failed for header", "hash", h.Hash(), "height", h.Number.Uint64(), "err", err) badChainError = err e.hd.ReportBadHeaderPoS(h.Hash(), lastValidHash) @@ -231,6 +229,7 @@ func (e *EngineBlockDownloader) insertHeadersAndBodies(tx kv.Tx, fromBlock uint6 if err != nil { return err } + log.Info("Beginning downloaded headers insertion") // Start by seeking headers for k, v, err := headersCursors.Seek(dbutils.HeaderKey(fromBlock, fromHash)); k != nil; k, v, err = headersCursors.Next() { if err != nil { @@ -252,8 +251,10 @@ func (e *EngineBlockDownloader) insertHeadersAndBodies(tx kv.Tx, fromBlock uint6 if err := eth1_utils.InsertHeadersAndWait(e.ctx, e.executionModule, headersBatch); err != nil { return err } + log.Info("Beginning downloaded bodies insertion") + // then seek bodies - for k, v, err := bodiesCursors.Seek(dbutils.BlockBodyKey(fromBlock, fromHash)); k != nil; k, v, err = headersCursors.Next() { + for k, _, err := bodiesCursors.Seek(dbutils.BlockBodyKey(fromBlock, fromHash)); k != nil; k, _, err = bodiesCursors.Next() { if err != nil { return err } @@ -265,12 +266,12 @@ func (e *EngineBlockDownloader) insertHeadersAndBodies(tx kv.Tx, fromBlock uint6 blockNumbersBatch = blockNumbersBatch[:0] blockHashesBatch = blockHashesBatch[:0] } - if len(v) != 40 { + if len(k) != 40 { continue } - blockNumber := binary.BigEndian.Uint64(v[:8]) + blockNumber := binary.BigEndian.Uint64(k[:length.BlockNum]) var blockHash libcommon.Hash - copy(blockHash[:], v[8:]) + copy(blockHash[:], k[length.BlockNum:]) blockBody, err := rawdb.ReadBodyWithTransactions(tx, blockHash, blockNumber) if err != nil { return err diff --git a/turbo/engineapi/engine_block_downloader/body.go b/turbo/engineapi/engine_block_downloader/body.go index 1105b102d..312d48d52 100644 --- a/turbo/engineapi/engine_block_downloader/body.go +++ b/turbo/engineapi/engine_block_downloader/body.go @@ -18,7 +18,7 @@ import ( // downloadBodies executes bodies download. func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(tx kv.RwTx, fromBlock, toBlock uint64) (err error) { headerProgress := toBlock - bodyProgress := fromBlock + bodyProgress := fromBlock - 1 if err := stages.SaveStageProgress(tx, stages.Bodies, bodyProgress); err != nil { return err @@ -101,7 +101,6 @@ func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(tx kv.RwTx, fr toProcess := e.bd.NextProcessingCount() - write := true for i := uint64(0); i < toProcess; i++ { select { case <-logEvery.C: @@ -109,14 +108,12 @@ func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(tx kv.RwTx, fr default: } nextBlock := requestedLow + i - rawBody := e.bd.GetBodyFromCache(nextBlock, write /* delete */) + rawBody := e.bd.GetBodyFromCache(nextBlock, true) if rawBody == nil { e.bd.NotDelivered(nextBlock) - write = false - } - if !write { continue } + e.bd.NotDelivered(nextBlock) header, _, err := e.bd.GetHeader(nextBlock, e.blockReader, tx) if err != nil { diff --git a/turbo/engineapi/engine_block_downloader/core.go b/turbo/engineapi/engine_block_downloader/core.go index 86a3428dd..5aacd7615 100644 --- a/turbo/engineapi/engine_block_downloader/core.go +++ b/turbo/engineapi/engine_block_downloader/core.go @@ -2,89 +2,101 @@ package engine_block_downloader import ( libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/kv/memdb" - "github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils" "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" ) -type downloadRequest struct { - hashToDownload libcommon.Hash - downloaderTip libcommon.Hash - requestId int -} - -func (e *EngineBlockDownloader) Loop() { - for { - select { - case req := <-e.startDownloadCh: - /* Start download process*/ - // First we schedule the headers download process - if !e.scheduleHeadersDownload(req.requestId, req.hashToDownload, 0, req.downloaderTip) { - e.logger.Warn("[EngineBlockDownloader] could not begin header download") - // could it be scheduled? if not nevermind. - e.status.Store(headerdownload.Idle) - continue - } - // see the outcome of header download - headersStatus := e.waitForEndOfHeadersDownload() - if headersStatus != engine_helpers.Synced { - // Could not sync. Set to idle - e.status.Store(headerdownload.Idle) - e.logger.Warn("[EngineBlockDownloader] Header download did not yield success") - continue - } - tx, err := e.db.BeginRo(e.ctx) - if err != nil { - e.logger.Warn("[EngineBlockDownloader] Could not begin tx: %s", err) - e.status.Store(headerdownload.Idle) - continue - } - - memoryMutation := memdb.NewMemoryBatch(tx, e.tmpdir) - defer memoryMutation.Rollback() - - startBlock, endBlock, startHash, err := e.loadDownloadedHeaders(memoryMutation) - if err != nil { - e.status.Store(headerdownload.Idle) - continue - } - if err := e.downloadAndLoadBodiesSyncronously(memoryMutation, startBlock, endBlock); err != nil { - e.logger.Warn("[EngineBlockDownloader] Could not download bodies: %s", err) - e.status.Store(headerdownload.Idle) - continue - } - tx.Rollback() // Discard the original db tx - if err := e.insertHeadersAndBodies(memoryMutation.MemTx(), startBlock, startHash); err != nil { - e.logger.Warn("[EngineBlockDownloader] Could not insert headers and bodies: %s", err) - e.status.Store(headerdownload.Idle) - continue - } - e.status.Store(headerdownload.Idle) - case <-e.ctx.Done(): - return - } - +// download is the process that reverse download a specific block hash. +func (e *EngineBlockDownloader) download(hashToDownload libcommon.Hash, downloaderTip libcommon.Hash, requestId int, block *types.Block) { + /* Start download process*/ + // First we schedule the headers download process + if !e.scheduleHeadersDownload(requestId, hashToDownload, 0, downloaderTip) { + e.logger.Warn("[EngineBlockDownloader] could not begin header download") + // could it be scheduled? if not nevermind. + e.status.Store(headerdownload.Idle) + return } + // see the outcome of header download + headersStatus := e.waitForEndOfHeadersDownload() + + if headersStatus != headerdownload.Synced { + // Could not sync. Set to idle + e.logger.Warn("[EngineBlockDownloader] Header download did not yield success") + e.status.Store(headerdownload.Idle) + return + } + e.hd.SetPosStatus(headerdownload.Idle) + + tx, err := e.db.BeginRo(e.ctx) + if err != nil { + e.logger.Warn("[EngineBlockDownloader] Could not begin tx: %s", err) + e.status.Store(headerdownload.Idle) + return + } + defer tx.Rollback() + + tmpDb, err := mdbx.NewTemporaryMdbx() + if err != nil { + e.logger.Warn("[EngineBlockDownloader] Could create temporary mdbx", "err", err) + e.status.Store(headerdownload.Idle) + return + } + defer tmpDb.Close() + tmpTx, err := tmpDb.BeginRw(e.ctx) + if err != nil { + e.logger.Warn("[EngineBlockDownloader] Could create temporary mdbx", "err", err) + e.status.Store(headerdownload.Idle) + return + } + defer tmpTx.Rollback() + + memoryMutation := memdb.NewMemoryBatchWithCustomDB(tx, tmpDb, tmpTx, e.tmpdir) + defer memoryMutation.Rollback() + + startBlock, endBlock, startHash, err := e.loadDownloadedHeaders(memoryMutation) + if err != nil { + e.logger.Warn("[EngineBlockDownloader] Could load headers", "err", err) + e.status.Store(headerdownload.Idle) + return + } + + // bodiesCollector := etl.NewCollector("EngineBlockDownloader", e.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), e.logger) + if err := e.downloadAndLoadBodiesSyncronously(memoryMutation, startBlock, endBlock); err != nil { + e.logger.Warn("[EngineBlockDownloader] Could not download bodies", "err", err) + e.status.Store(headerdownload.Idle) + return + } + tx.Rollback() // Discard the original db tx + if err := e.insertHeadersAndBodies(tmpTx, startBlock, startHash); err != nil { + e.logger.Warn("[EngineBlockDownloader] Could not insert headers and bodies", "err", err) + e.status.Store(headerdownload.Idle) + return + } + if block != nil { + // Can fail, not an issue in this case. + eth1_utils.InsertHeaderAndBodyAndWait(e.ctx, e.executionModule, block.Header(), block.RawBody()) + } + e.status.Store(headerdownload.Synced) + e.logger.Info("[EngineBlockDownloader] Finished downloading blocks", "from", startBlock-1, "to", endBlock) + } // StartDownloading triggers the download process and returns true if the process started or false if it could not. -func (e *EngineBlockDownloader) StartDownloading(requestId int, hashToDownload libcommon.Hash, downloaderTip libcommon.Hash) bool { +// blockTip is optional and should be the block tip of the download request. which will be inserted at the end of the procedure if specified. +func (e *EngineBlockDownloader) StartDownloading(requestId int, hashToDownload libcommon.Hash, downloaderTip libcommon.Hash, blockTip *types.Block) bool { e.lock.Lock() defer e.lock.Unlock() - if e.status.Load() != headerdownload.Idle { + if e.status.Load() == headerdownload.Syncing { return false } e.status.Store(headerdownload.Syncing) - e.startDownloadCh <- downloadRequest{ - requestId: requestId, - hashToDownload: hashToDownload, - downloaderTip: downloaderTip, - } + go e.download(hashToDownload, downloaderTip, requestId, blockTip) return true } func (e *EngineBlockDownloader) Status() headerdownload.SyncStatus { - e.lock.Lock() - defer e.lock.Unlock() - return e.status.Load().(headerdownload.SyncStatus) + return headerdownload.SyncStatus(e.status.Load().(int)) } diff --git a/turbo/engineapi/engine_process.go b/turbo/engineapi/engine_process.go index 8db7e1e54..26a5fe394 100644 --- a/turbo/engineapi/engine_process.go +++ b/turbo/engineapi/engine_process.go @@ -8,105 +8,17 @@ import ( "github.com/ledgerwatch/erigon/common/math" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers" "github.com/ledgerwatch/erigon/turbo/engineapi/engine_types" "github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_chain_reader.go" "github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils" "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" ) -func (e *EngineServerExperimental) StartEngineMessageHandler() { - go func() { - for { - interrupted, err := e.engineMessageHandler() - if interrupted { - return - } - if err != nil { - e.logger.Error("[EngineServer] error received", "err", err) - } - } - }() -} - -// This loop is responsible for engine POS replacement. -func (e *EngineServerExperimental) engineMessageHandler() (bool, error) { - logPrefix := "EngineApi" - e.hd.SetPOSSync(true) - syncing := e.blockDownloader.Status() != headerdownload.Idle - if !syncing { - e.logger.Info(fmt.Sprintf("[%s] Waiting for Consensus Layer...", logPrefix)) - } - interrupt, requestId, requestWithStatus := e.hd.BeaconRequestList.WaitForRequest(syncing, e.test) - - chainReader := eth1_chain_reader.NewChainReaderEth1(e.ctx, e.config, e.executionService) - e.hd.SetHeaderReader(chainReader) - - interrupted, err := e.handleInterrupt(interrupt) - if err != nil { - return false, err - } - - if interrupted { - return true, nil - } - - if requestWithStatus == nil { - e.logger.Warn(fmt.Sprintf("[%s] Nil beacon request. Should only happen in tests", logPrefix)) - return false, nil - } - - request := requestWithStatus.Message - requestStatus := requestWithStatus.Status - - // Decide what kind of action we need to take place - forkChoiceMessage, forkChoiceInsteadOfNewPayload := request.(*engine_types.ForkChoiceState) - e.hd.ClearPendingPayloadHash() - e.hd.SetPendingPayloadStatus(nil) - - var payloadStatus *engine_types.PayloadStatus - if forkChoiceInsteadOfNewPayload { - payloadStatus, err = e.handlesForkChoice("ForkChoiceUpdated", chainReader, forkChoiceMessage, requestId) - } else { - payloadMessage := request.(*types.Block) - payloadStatus, err = e.handleNewPayload("NewPayload", payloadMessage, requestStatus, requestId, chainReader) - } - - if err != nil { - if requestStatus == engine_helpers.New { - e.hd.PayloadStatusCh <- engine_types.PayloadStatus{CriticalError: err} - } - return false, err - } - - if requestStatus == engine_helpers.New && payloadStatus != nil { - if payloadStatus.Status == engine_types.SyncingStatus || payloadStatus.Status == engine_types.AcceptedStatus { - e.hd.PayloadStatusCh <- *payloadStatus - } else { - // Let the stage loop run to the end so that the transaction is committed prior to replying to CL - e.hd.SetPendingPayloadStatus(payloadStatus) - } - } - - return false, nil -} - -func (e *EngineServerExperimental) handleInterrupt(interrupt engine_helpers.Interrupt) (bool, error) { - if interrupt != engine_helpers.None { - if interrupt == engine_helpers.Stopping { - close(e.hd.ShutdownCh) - return false, fmt.Errorf("server is stopping") - } - return true, nil - } - return false, nil -} +const fcuTimeout = 1000 // according to mathematics: 1000 millisecods = 1 second func (e *EngineServerExperimental) handleNewPayload( logPrefix string, block *types.Block, - requestStatus engine_helpers.RequestStatus, - requestId int, chainReader consensus.ChainHeaderReader, ) (*engine_types.PayloadStatus, error) { header := block.Header() @@ -115,28 +27,28 @@ func (e *EngineServerExperimental) handleNewPayload( e.logger.Info(fmt.Sprintf("[%s] Handling new payload", logPrefix), "height", headerNumber, "hash", headerHash) - parent := chainReader.GetHeaderByHash(header.ParentHash) + currentHeader := chainReader.CurrentHeader() + var currentHeadNumber *uint64 + if currentHeader != nil { + currentHeadNumber = new(uint64) + *currentHeadNumber = currentHeader.Number.Uint64() + } + parent := chainReader.GetHeader(header.ParentHash, headerNumber-1) if parent == nil { e.logger.Debug(fmt.Sprintf("[%s] New payload: need to download parent", logPrefix), "height", headerNumber, "hash", headerHash, "parentHash", header.ParentHash) if e.test { - e.hd.BeaconRequestList.Remove(requestId) return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil } - if !e.blockDownloader.StartDownloading(requestId, header.ParentHash, headerHash) { + if !e.blockDownloader.StartDownloading(0, header.ParentHash, headerHash, block) { return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil } - currentHeader := chainReader.CurrentHeader() - var currentHeadNumber *uint64 - if currentHeader != nil { - currentHeadNumber = new(uint64) - *currentHeadNumber = currentHeader.Number.Uint64() - } - if currentHeadNumber != nil && math.AbsoluteDifference(*currentHeadNumber, headerNumber) < 32 { + + if currentHeadNumber != nil { // We try waiting until we finish downloading the PoS blocks if the distance from the head is enough, // so that we will perform full validation. success := false - for i := 0; i < 10; i++ { + for i := 0; i < 100; i++ { time.Sleep(10 * time.Millisecond) if e.blockDownloader.Status() == headerdownload.Synced { success = true @@ -150,13 +62,14 @@ func (e *EngineServerExperimental) handleNewPayload( return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil } } - - e.hd.BeaconRequestList.Remove(requestId) - // Save header and body if err := eth1_utils.InsertHeaderAndBodyAndWait(e.ctx, e.executionService, header, block.RawBody()); err != nil { return nil, err } + if math.AbsoluteDifference(*currentHeadNumber, headerNumber) >= 32 { + return &engine_types.PayloadStatus{Status: engine_types.AcceptedStatus}, nil + } + e.logger.Debug(fmt.Sprintf("[%s] New payload begin verification", logPrefix)) status, latestValidHash, err := eth1_utils.ValidateChain(e.ctx, e.executionService, headerHash, headerNumber) e.logger.Debug(fmt.Sprintf("[%s] New payload verification ended", logPrefix), "status", status.String(), "err", err) @@ -197,12 +110,13 @@ func (e *EngineServerExperimental) handlesForkChoice( return nil, err } + // We do not have header, download. if headerNumber == nil { e.logger.Debug(fmt.Sprintf("[%s] Fork choice: need to download header with hash %x", logPrefix, headerHash)) if e.test { e.hd.BeaconRequestList.Remove(requestId) } else { - e.blockDownloader.StartDownloading(requestId, headerHash, headerHash) + e.blockDownloader.StartDownloading(requestId, headerHash, headerHash, nil) } return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil } @@ -214,14 +128,14 @@ func (e *EngineServerExperimental) handlesForkChoice( if e.test { e.hd.BeaconRequestList.Remove(requestId) } else { - e.blockDownloader.StartDownloading(requestId, headerHash, headerHash) + e.blockDownloader.StartDownloading(requestId, headerHash, headerHash, nil) } + return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil } - e.hd.BeaconRequestList.Remove(requestId) // Call forkchoice here - status, latestValidHash, err := eth1_utils.UpdateForkChoice(e.ctx, e.executionService, forkChoice.HeadHash, forkChoice.SafeBlockHash, forkChoice.FinalizedBlockHash, 100) + status, latestValidHash, err := eth1_utils.UpdateForkChoice(e.ctx, e.executionService, forkChoice.HeadHash, forkChoice.SafeBlockHash, forkChoice.FinalizedBlockHash, fcuTimeout) if err != nil { return nil, err } diff --git a/turbo/engineapi/engine_server_experimental.go b/turbo/engineapi/engine_server_experimental.go index c7202dc13..3ec09ee67 100644 --- a/turbo/engineapi/engine_server_experimental.go +++ b/turbo/engineapi/engine_server_experimental.go @@ -36,6 +36,7 @@ import ( "github.com/ledgerwatch/erigon/turbo/engineapi/engine_block_downloader" "github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers" "github.com/ledgerwatch/erigon/turbo/engineapi/engine_types" + "github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_chain_reader.go" "github.com/ledgerwatch/erigon/turbo/jsonrpc" "github.com/ledgerwatch/erigon/turbo/rpchelper" "github.com/ledgerwatch/erigon/turbo/services" @@ -81,8 +82,9 @@ func (e *EngineServerExperimental) Start(httpConfig httpcfg.HttpCfg, base := jsonrpc.NewBaseApi(filters, stateCache, e.blockReader, agg, httpConfig.WithDatadir, httpConfig.EvmCallTimeout, engineReader, httpConfig.Dirs) ethImpl := jsonrpc.NewEthAPI(base, e.db, eth, txPool, mining, httpConfig.Gascap, httpConfig.ReturnDataLimit, e.logger) - // engineImpl := NewEngineAPI(base, db, engineBackend) + // engineImpl := NewEngineAPI(base, db, engineBackend) + // e.startEngineMessageHandler() apiList := []rpc.API{ { Namespace: "eth", @@ -240,14 +242,18 @@ func (s *EngineServerExperimental) newPayload(ctx context.Context, req *engine_t block := types.NewBlockFromStorage(blockHash, &header, transactions, nil /* uncles */, withdrawals) s.hd.BeaconRequestList.AddPayloadRequest(block) - payloadStatus := <-s.hd.PayloadStatusCh + chainReader := eth1_chain_reader.NewChainReaderEth1(s.ctx, s.config, s.executionService) + payloadStatus, err := s.handleNewPayload("NewPayload", block, chainReader) + if err != nil { + return nil, err + } s.logger.Debug("[NewPayload] got reply", "payloadStatus", payloadStatus) if payloadStatus.CriticalError != nil { return nil, payloadStatus.CriticalError } - return &payloadStatus, nil + return payloadStatus, nil } // Check if we can quickly determine the status of a newPayload or forkchoiceUpdated. @@ -351,12 +357,12 @@ func (s *EngineServerExperimental) getQuickPayloadStatusIfPossible(blockHash lib return &engine_types.PayloadStatus{Status: engine_types.ValidStatus, LatestValidHash: &blockHash}, nil } - if parent == nil && s.hd.PosStatus() != headerdownload.Idle { + if parent == nil && s.hd.PosStatus() == headerdownload.Syncing { s.logger.Debug(fmt.Sprintf("[%s] Downloading some other PoS blocks", prefix), "hash", blockHash) return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil } } else { - if header == nil && s.hd.PosStatus() != headerdownload.Idle { + if header == nil && s.hd.PosStatus() == headerdownload.Syncing { s.logger.Debug(fmt.Sprintf("[%s] Downloading some other PoS stuff", prefix), "hash", blockHash) return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil } @@ -374,12 +380,6 @@ func (s *EngineServerExperimental) getQuickPayloadStatusIfPossible(blockHash lib } } - // If another payload is already commissioned then we just reply with syncing - if s.stageLoopIsBusy() { - s.logger.Debug(fmt.Sprintf("[%s] stage loop is busy", prefix)) - return &engine_types.PayloadStatus{Status: engine_types.SyncingStatus}, nil - } - return nil, nil } @@ -425,26 +425,22 @@ func (s *EngineServerExperimental) getPayload(ctx context.Context, payloadId uin // engineForkChoiceUpdated either states new block head or request the assembling of a new block func (s *EngineServerExperimental) forkchoiceUpdated(ctx context.Context, forkchoiceState *engine_types.ForkChoiceState, payloadAttributes *engine_types.PayloadAttributes, version clparams.StateVersion, ) (*engine_types.ForkChoiceUpdatedResponse, error) { - forkChoice := &engine_types.ForkChoiceState{ - HeadHash: forkchoiceState.HeadHash, - SafeBlockHash: forkchoiceState.SafeBlockHash, - FinalizedBlockHash: forkchoiceState.FinalizedBlockHash, - } - - status, err := s.getQuickPayloadStatusIfPossible(forkChoice.HeadHash, 0, libcommon.Hash{}, forkChoice, false) + status, err := s.getQuickPayloadStatusIfPossible(forkchoiceState.HeadHash, 0, libcommon.Hash{}, forkchoiceState, false) if err != nil { return nil, err } - s.lock.Lock() defer s.lock.Unlock() - if status == nil { - s.logger.Debug("[ForkChoiceUpdated] sending forkChoiceMessage", "head", forkChoice.HeadHash) - s.hd.BeaconRequestList.AddForkChoiceRequest(forkChoice) + chainReader := eth1_chain_reader.NewChainReaderEth1(s.ctx, s.config, s.executionService) - statusDeref := <-s.hd.PayloadStatusCh - status = &statusDeref + if status == nil { + s.logger.Debug("[ForkChoiceUpdated] sending forkChoiceMessage", "head", forkchoiceState.HeadHash) + + status, err = s.handlesForkChoice("ForkChoiceUpdated", chainReader, forkchoiceState, 0) + if err != nil { + return nil, err + } s.logger.Debug("[ForkChoiceUpdated] got reply", "payloadStatus", status) if status.CriticalError != nil { @@ -471,7 +467,7 @@ func (s *EngineServerExperimental) forkchoiceUpdated(ctx context.Context, forkch headHeader := rawdb.ReadHeader(tx2, headHash, *headNumber) tx2.Rollback() - if headHeader.Hash() != forkChoice.HeadHash { + if headHeader.Hash() != forkchoiceState.HeadHash { // Per Item 2 of https://github.com/ethereum/execution-apis/blob/v1.0.0-alpha.9/src/engine/specification.md#specification-1: // Client software MAY skip an update of the forkchoice state and // MUST NOT begin a payload build process if forkchoiceState.headBlockHash doesn't reference a leaf of the block tree. @@ -480,7 +476,7 @@ func (s *EngineServerExperimental) forkchoiceUpdated(ctx context.Context, forkch // {payloadStatus: {status: VALID, latestValidHash: forkchoiceState.headBlockHash, validationError: null}, payloadId: null}. s.logger.Warn("Skipping payload building because forkchoiceState.headBlockHash is not the head of the canonical chain", - "forkChoice.HeadBlockHash", forkChoice.HeadHash, "headHeader.Hash", headHeader.Hash()) + "forkChoice.HeadBlockHash", forkchoiceState.HeadHash, "headHeader.Hash", headHeader.Hash()) return &engine_types.ForkChoiceUpdatedResponse{PayloadStatus: status}, nil } @@ -489,7 +485,7 @@ func (s *EngineServerExperimental) forkchoiceUpdated(ctx context.Context, forkch } req := &execution.AssembleBlockRequest{ - ParentHash: gointerfaces.ConvertHashToH256(forkChoice.HeadHash), + ParentHash: gointerfaces.ConvertHashToH256(forkchoiceState.HeadHash), Timestamp: uint64(payloadAttributes.Timestamp), MixDigest: gointerfaces.ConvertHashToH256(payloadAttributes.PrevRandao), SuggestedFeeRecipent: gointerfaces.ConvertAddressToH160(payloadAttributes.SuggestedFeeRecipient), diff --git a/turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go b/turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go index 26d8a669d..d183090fc 100644 --- a/turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go +++ b/turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go @@ -11,6 +11,7 @@ import ( "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils" "github.com/ledgerwatch/log/v3" + "google.golang.org/protobuf/types/known/emptypb" ) type ChainReaderEth1 struct { @@ -32,7 +33,20 @@ func (c ChainReaderEth1) Config() *chain.Config { } func (c ChainReaderEth1) CurrentHeader() *types.Header { - panic("ChainReaderEth1.CurrentHeader not implemented") + resp, err := c.executionModule.CurrentHeader(c.ctx, &emptypb.Empty{}) + if err != nil { + log.Error("GetHeader failed", "err", err) + return nil + } + if resp == nil || resp.Header == nil { + return nil + } + ret, err := eth1_utils.HeaderRpcToHeader(resp.Header) + if err != nil { + log.Error("GetHeader decoding", "err", err) + return nil + } + return ret } func (ChainReaderEth1) FrozenBlocks() uint64 { diff --git a/turbo/execution/eth1/eth1_test.go b/turbo/execution/eth1/eth1_test.go deleted file mode 100644 index 2326effc5..000000000 --- a/turbo/execution/eth1/eth1_test.go +++ /dev/null @@ -1,62 +0,0 @@ -package eth1_test - -import ( - "context" - "math/big" - "testing" - - libcommon "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon-lib/gointerfaces" - "github.com/ledgerwatch/erigon-lib/gointerfaces/execution" - "github.com/ledgerwatch/erigon-lib/kv/memdb" - "github.com/ledgerwatch/erigon/core/rawdb" - "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/turbo/execution/eth1" - "github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils" - "github.com/stretchr/testify/require" -) - -func TestInsertGetterHeader(t *testing.T) { - bn := uint64(2) - header := &types.Header{ - Difficulty: big.NewInt(0), - Number: big.NewInt(int64(bn)), - } - db := memdb.NewTestDB(t) - tx, _ := db.BeginRw(context.TODO()) - rawdb.WriteTd(tx, libcommon.Hash{}, 1, libcommon.Big0) - tx.Commit() - e := eth1.NewEthereumExecutionModule(nil, db, nil, nil, nil, nil, nil, false) - _, err := e.InsertHeaders(context.TODO(), &execution.InsertHeadersRequest{ - Headers: []*execution.Header{ - eth1_utils.HeaderToHeaderRPC(header), - }}) - require.NoError(t, err) - resp, err := e.GetHeader(context.TODO(), &execution.GetSegmentRequest{ - BlockHash: gointerfaces.ConvertHashToH256(header.Hash()), - BlockNumber: &bn, - }) - require.NoError(t, err) - require.Equal(t, resp.Header.BlockNumber, bn) -} - -func TestInsertGetterBody(t *testing.T) { - bn := uint64(2) - bhash := libcommon.Hash{1} - txs := [][]byte{{1}} - body := &types.RawBody{ - Transactions: txs, - } - e := eth1.NewEthereumExecutionModule(nil, memdb.NewTestDB(t), nil, nil, nil, nil, nil, false) - _, err := e.InsertBodies(context.TODO(), &execution.InsertBodiesRequest{ - Bodies: []*execution.BlockBody{ - eth1_utils.ConvertRawBlockBodyToRpc(body, bn, bhash), - }}) - require.NoError(t, err) - resp, err := e.GetBody(context.TODO(), &execution.GetSegmentRequest{ - BlockHash: gointerfaces.ConvertHashToH256(bhash), - BlockNumber: &bn, - }) - require.NoError(t, err) - require.Equal(t, resp.Body.BlockHash, gointerfaces.ConvertHashToH256(bhash)) -} diff --git a/turbo/execution/eth1/ethereum_execution.go b/turbo/execution/eth1/ethereum_execution.go index 15a12913e..1574ada0d 100644 --- a/turbo/execution/eth1/ethereum_execution.go +++ b/turbo/execution/eth1/ethereum_execution.go @@ -8,8 +8,6 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces/execution" - types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types" - "github.com/ledgerwatch/erigon-lib/kv/rawdbv3" "github.com/ledgerwatch/log/v3" "golang.org/x/sync/semaphore" @@ -18,11 +16,11 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/stagedsync" - "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/turbo/builder" "github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers" "github.com/ledgerwatch/erigon/turbo/engineapi/engine_types" "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/erigon/turbo/shards" ) // EthereumExecutionModule describes ethereum execution logic and indexing. @@ -43,6 +41,10 @@ type EthereumExecutionModule struct { builderFunc builder.BlockBuilderFunc builders map[uint64]*builder.BlockBuilder + // Changes accumulator + accumulator *shards.Accumulator + stateChangeConsumer shards.StateChangeConsumer + // configuration config *chain.Config historyV3 bool @@ -51,17 +53,19 @@ type EthereumExecutionModule struct { } func NewEthereumExecutionModule(blockReader services.FullBlockReader, db kv.RwDB, executionPipeline *stagedsync.Sync, forkValidator *engine_helpers.ForkValidator, - config *chain.Config, builderFunc builder.BlockBuilderFunc, logger log.Logger, historyV3 bool) *EthereumExecutionModule { + config *chain.Config, builderFunc builder.BlockBuilderFunc, accumulator *shards.Accumulator, stateChangeConsumer shards.StateChangeConsumer, logger log.Logger, historyV3 bool) *EthereumExecutionModule { return &EthereumExecutionModule{ - blockReader: blockReader, - db: db, - executionPipeline: executionPipeline, - logger: logger, - forkValidator: forkValidator, - builders: make(map[uint64]*builder.BlockBuilder), - builderFunc: builderFunc, - config: config, - semaphore: semaphore.NewWeighted(1), + blockReader: blockReader, + db: db, + executionPipeline: executionPipeline, + logger: logger, + forkValidator: forkValidator, + builders: make(map[uint64]*builder.BlockBuilder), + builderFunc: builderFunc, + config: config, + semaphore: semaphore.NewWeighted(1), + accumulator: accumulator, + stateChangeConsumer: stateChangeConsumer, } } @@ -94,126 +98,6 @@ func (e *EthereumExecutionModule) canonicalHash(ctx context.Context, tx kv.Tx, b // Remaining -func (e *EthereumExecutionModule) UpdateForkChoice(ctx context.Context, req *execution.ForkChoice) (*execution.ForkChoiceReceipt, error) { - type canonicalEntry struct { - hash libcommon.Hash - number uint64 - } - if !e.semaphore.TryAcquire(1) { - return &execution.ForkChoiceReceipt{ - LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}), - Status: execution.ValidationStatus_Busy, - }, nil - } - defer e.semaphore.Release(1) - - tx, err := e.db.BeginRw(ctx) - if err != nil { - return nil, err - } - defer tx.Rollback() - // defer e.forkValidator.ClearWithUnwind(tx, e.notifications.Accumulator, e.notifications.StateChangesConsumer) - - blockHash := gointerfaces.ConvertH256ToHash(req.HeadBlockHash) - // Step one, find reconnection point, and mark all of those headers as canonical. - fcuHeader, err := e.blockReader.HeaderByHash(ctx, tx, blockHash) - if err != nil { - return nil, err - } - // If we dont have it, too bad - if fcuHeader == nil { - return &execution.ForkChoiceReceipt{ - Status: execution.ValidationStatus_MissingSegment, - LatestValidHash: &types2.H256{}, - }, nil - } - currentParentHash := fcuHeader.ParentHash - currentParentNumber := fcuHeader.Number.Uint64() - 1 - isCanonicalHash, err := rawdb.IsCanonicalHash(tx, currentParentHash, currentParentNumber) - if err != nil { - return nil, err - } - // Find such point, and collect all hashes - newCanonicals := make([]*canonicalEntry, 0, 2048) - newCanonicals = append(newCanonicals, &canonicalEntry{ - hash: fcuHeader.Hash(), - number: fcuHeader.Number.Uint64(), - }) - for !isCanonicalHash { - newCanonicals = append(newCanonicals, &canonicalEntry{ - hash: currentParentHash, - number: currentParentNumber, - }) - currentHeader, err := e.blockReader.Header(ctx, tx, currentParentHash, currentParentNumber) - if err != nil { - return nil, err - } - if currentHeader == nil { - return &execution.ForkChoiceReceipt{ - Status: execution.ValidationStatus_MissingSegment, - LatestValidHash: &types2.H256{}, - }, nil - } - currentParentHash = currentHeader.ParentHash - currentParentNumber = currentHeader.Number.Uint64() - 1 - isCanonicalHash, err = rawdb.IsCanonicalHash(tx, currentParentHash, currentParentNumber) - if err != nil { - return nil, err - } - } - if currentParentNumber != fcuHeader.Number.Uint64()-1 { - e.executionPipeline.UnwindTo(currentParentNumber, libcommon.Hash{}) - if e.historyV3 { - if err := rawdbv3.TxNums.Truncate(tx, currentParentNumber); err != nil { - return nil, err - } - } - } - // Run the unwind - if err := e.executionPipeline.RunUnwind(e.db, tx); err != nil { - return nil, err - } - // Mark all new canonicals as canonicals - for _, canonicalSegment := range newCanonicals { - if err := rawdb.WriteCanonicalHash(tx, canonicalSegment.hash, canonicalSegment.number); err != nil { - return nil, err - } - if e.historyV3 { - if err := rawdb.AppendCanonicalTxNums(tx, canonicalSegment.number); err != nil { - return nil, err - } - } - } - // Set Progress for headers and bodies accordingly. - if err := stages.SaveStageProgress(tx, stages.Headers, fcuHeader.Number.Uint64()); err != nil { - return nil, err - } - if err := stages.SaveStageProgress(tx, stages.Bodies, fcuHeader.Number.Uint64()); err != nil { - return nil, err - } - if err = rawdb.WriteHeadHeaderHash(tx, blockHash); err != nil { - return nil, err - } - // Run the forkchoice - if err := e.executionPipeline.Run(e.db, tx, false); err != nil { - return nil, err - } - // if head hash was set then success otherwise no - headHash := rawdb.ReadHeadBlockHash(tx) - headNumber := rawdb.ReadHeaderNumber(tx, headHash) - if headNumber != nil && e.logger != nil { - e.logger.Info("Current forkchoice", "hash", headHash, "number", *headNumber) - } - status := execution.ValidationStatus_Success - if headHash != blockHash { - status = execution.ValidationStatus_BadBlock - } - return &execution.ForkChoiceReceipt{ - LatestValidHash: gointerfaces.ConvertHashToH256(headHash), - Status: status, - }, tx.Commit() -} - func (e *EthereumExecutionModule) ValidateChain(ctx context.Context, req *execution.ValidationRequest) (*execution.ValidationReceipt, error) { if !e.semaphore.TryAcquire(1) { return &execution.ValidationReceipt{ @@ -227,6 +111,7 @@ func (e *EthereumExecutionModule) ValidateChain(ctx context.Context, req *execut return nil, err } defer tx.Rollback() + e.forkValidator.ClearWithUnwind(tx, e.accumulator, e.stateChangeConsumer) blockHash := gointerfaces.ConvertH256ToHash(req.Hash) header, err := e.blockReader.Header(ctx, tx, blockHash, req.Number) if err != nil { @@ -237,6 +122,7 @@ func (e *EthereumExecutionModule) ValidateChain(ctx context.Context, req *execut if err != nil { return nil, err } + if header == nil || body == nil { return &execution.ValidationReceipt{ LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}), @@ -244,6 +130,7 @@ func (e *EthereumExecutionModule) ValidateChain(ctx context.Context, req *execut ValidationStatus: execution.ValidationStatus_MissingSegment, }, nil } + status, lvh, validationError, criticalError := e.forkValidator.ValidatePayload(tx, header, body.RawBody(), false) if criticalError != nil { return nil, criticalError diff --git a/turbo/execution/eth1/forkchoice.go b/turbo/execution/eth1/forkchoice.go new file mode 100644 index 000000000..831eaa36e --- /dev/null +++ b/turbo/execution/eth1/forkchoice.go @@ -0,0 +1,216 @@ +package eth1 + +import ( + "context" + "fmt" + "time" + + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/gointerfaces" + "github.com/ledgerwatch/erigon-lib/gointerfaces/execution" + "github.com/ledgerwatch/erigon-lib/kv/rawdbv3" + "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" +) + +type forkchoiceOutcome struct { + receipt *execution.ForkChoiceReceipt + err error +} + +func sendForkchoiceReceiptWithoutWaiting(ch chan forkchoiceOutcome, receipt *execution.ForkChoiceReceipt) { + select { + case ch <- forkchoiceOutcome{receipt: receipt}: + default: + } +} + +func sendForkchoiceErrorWithoutWaiting(ch chan forkchoiceOutcome, err error) { + select { + case ch <- forkchoiceOutcome{err: err}: + default: + } +} + +func (e *EthereumExecutionModule) UpdateForkChoice(ctx context.Context, req *execution.ForkChoice) (*execution.ForkChoiceReceipt, error) { + blockHash := gointerfaces.ConvertH256ToHash(req.HeadBlockHash) + safeHash := gointerfaces.ConvertH256ToHash(req.SafeBlockHash) + finalizedHash := gointerfaces.ConvertH256ToHash(req.FinalizedBlockHash) + + outcomeCh := make(chan forkchoiceOutcome) + + // So we wait at most the amount specified by req.Timeout before just sending out + go e.updateForkChoice(ctx, blockHash, safeHash, finalizedHash, outcomeCh) + fcuTimer := time.NewTimer(time.Duration(req.Timeout) * time.Millisecond) + + select { + case <-fcuTimer.C: + e.logger.Debug("treating forkChoiceUpdated as asyncronous as it is taking too long") + return &execution.ForkChoiceReceipt{ + LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}), + Status: execution.ValidationStatus_Busy, + }, nil + case outcome := <-outcomeCh: + fmt.Println(outcome.receipt) + return outcome.receipt, outcome.err + } + +} + +func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, blockHash, safeHash, finalizedHash libcommon.Hash, outcomeCh chan forkchoiceOutcome) { + if !e.semaphore.TryAcquire(1) { + sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ + LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}), + Status: execution.ValidationStatus_Busy, + }) + return + } + defer e.semaphore.Release(1) + + type canonicalEntry struct { + hash libcommon.Hash + number uint64 + } + tx, err := e.db.BeginRw(ctx) + if err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + defer tx.Rollback() + + defer e.forkValidator.ClearWithUnwind(tx, e.accumulator, e.stateChangeConsumer) + + // Step one, find reconnection point, and mark all of those headers as canonical. + fcuHeader, err := e.blockReader.HeaderByHash(ctx, tx, blockHash) + if err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + // If we dont have it, too bad + if fcuHeader == nil { + sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ + LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}), + Status: execution.ValidationStatus_MissingSegment, + }) + return + } + currentParentHash := fcuHeader.ParentHash + currentParentNumber := fcuHeader.Number.Uint64() - 1 + isCanonicalHash, err := rawdb.IsCanonicalHash(tx, currentParentHash, currentParentNumber) + if err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + // Find such point, and collect all hashes + newCanonicals := make([]*canonicalEntry, 0, 2048) + newCanonicals = append(newCanonicals, &canonicalEntry{ + hash: fcuHeader.Hash(), + number: fcuHeader.Number.Uint64(), + }) + for !isCanonicalHash { + newCanonicals = append(newCanonicals, &canonicalEntry{ + hash: currentParentHash, + number: currentParentNumber, + }) + currentHeader, err := e.blockReader.Header(ctx, tx, currentParentHash, currentParentNumber) + if err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + if currentHeader == nil { + sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ + LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}), + Status: execution.ValidationStatus_MissingSegment, + }) + return + } + currentParentHash = currentHeader.ParentHash + currentParentNumber = currentHeader.Number.Uint64() - 1 + isCanonicalHash, err = rawdb.IsCanonicalHash(tx, currentParentHash, currentParentNumber) + if err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + } + if currentParentNumber != fcuHeader.Number.Uint64()-1 { + e.executionPipeline.UnwindTo(currentParentNumber, libcommon.Hash{}) + if e.historyV3 { + if err := rawdbv3.TxNums.Truncate(tx, currentParentNumber); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + } + } + + // Run the unwind + if err := e.executionPipeline.RunUnwind(e.db, tx); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + if e.historyV3 { + if err := rawdbv3.TxNums.Truncate(tx, currentParentNumber); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + } + // Mark all new canonicals as canonicals + for _, canonicalSegment := range newCanonicals { + if err := rawdb.WriteCanonicalHash(tx, canonicalSegment.hash, canonicalSegment.number); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + if e.historyV3 { + if err := rawdb.AppendCanonicalTxNums(tx, canonicalSegment.number); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + } + } + // Set Progress for headers and bodies accordingly. + if err := stages.SaveStageProgress(tx, stages.Headers, fcuHeader.Number.Uint64()); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + if err := stages.SaveStageProgress(tx, stages.Bodies, fcuHeader.Number.Uint64()); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + if err = rawdb.WriteHeadHeaderHash(tx, blockHash); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + // Run the forkchoice + if err := e.executionPipeline.Run(e.db, tx, false); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + // if head hash was set then success otherwise no + headHash := rawdb.ReadHeadBlockHash(tx) + headNumber := rawdb.ReadHeaderNumber(tx, headHash) + log := headNumber != nil && e.logger != nil + // Update forks... + rawdb.WriteForkchoiceFinalized(tx, finalizedHash) + rawdb.WriteForkchoiceSafe(tx, safeHash) + rawdb.WriteHeadBlockHash(tx, blockHash) + + status := execution.ValidationStatus_Success + if headHash != blockHash { + status = execution.ValidationStatus_BadBlock + if log { + e.logger.Warn("bad forkchoice", "hash", headHash) + } + } else { + if err := tx.Commit(); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + if log { + e.logger.Info("head updated", "hash", headHash, "number", *headNumber) + } + } + + sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ + LatestValidHash: gointerfaces.ConvertHashToH256(headHash), + Status: status, + }) +} diff --git a/turbo/execution/eth1/getters.go b/turbo/execution/eth1/getters.go index a83765205..e181bab67 100644 --- a/turbo/execution/eth1/getters.go +++ b/turbo/execution/eth1/getters.go @@ -17,6 +17,8 @@ import ( "github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils" ) +var errNotFound = errors.New("notfound") + func (e *EthereumExecutionModule) parseSegmentRequest(ctx context.Context, tx kv.Tx, req *execution.GetSegmentRequest) (blockHash libcommon.Hash, blockNumber uint64, err error) { switch { // Case 1: Only hash is given. @@ -24,7 +26,7 @@ func (e *EthereumExecutionModule) parseSegmentRequest(ctx context.Context, tx kv blockHash = gointerfaces.ConvertH256ToHash(req.BlockHash) blockNumberPtr := rawdb.ReadHeaderNumber(tx, blockHash) if blockNumberPtr == nil { - err = fmt.Errorf("ethereumExecutionModule.parseSegmentRequest: could not read block: non existent index") + err = errNotFound return } blockNumber = *blockNumberPtr @@ -32,7 +34,7 @@ func (e *EthereumExecutionModule) parseSegmentRequest(ctx context.Context, tx kv blockNumber = *req.BlockNumber blockHash, err = e.canonicalHash(ctx, tx, blockNumber) if err != nil { - err = fmt.Errorf("ethereumExecutionModule.parseSegmentRequest: could not read block %d: %s", blockNumber, err) + err = errNotFound return } case req.BlockHash != nil && req.BlockNumber != nil: @@ -54,6 +56,9 @@ func (e *EthereumExecutionModule) GetBody(ctx context.Context, req *execution.Ge defer tx.Rollback() blockHash, blockNumber, err := e.parseSegmentRequest(ctx, tx, req) + if err == errNotFound { + return &execution.GetBodyResponse{Body: nil}, nil + } if err != nil { return nil, fmt.Errorf("ethereumExecutionModule.GetBody: %s", err) } @@ -81,6 +86,9 @@ func (e *EthereumExecutionModule) GetHeader(ctx context.Context, req *execution. defer tx.Rollback() blockHash, blockNumber, err := e.parseSegmentRequest(ctx, tx, req) + if err == errNotFound { + return &execution.GetHeaderResponse{Header: nil}, nil + } if err != nil { return nil, fmt.Errorf("ethereumExecutionModule.GetHeader: %s", err) } @@ -103,7 +111,7 @@ func (e *EthereumExecutionModule) GetHeaderHashNumber(ctx context.Context, req * defer tx.Rollback() blockNumber := rawdb.ReadHeaderNumber(tx, gointerfaces.ConvertH256ToHash(req)) if blockNumber == nil { - return nil, fmt.Errorf("ethereumExecutionModule.parseSegmentRequest: could not read block: non existent index") + return &execution.GetHeaderHashNumberResponse{BlockNumber: nil}, nil } return &execution.GetHeaderHashNumberResponse{BlockNumber: blockNumber}, nil } @@ -117,7 +125,7 @@ func (e *EthereumExecutionModule) CanonicalHash(ctx context.Context, req *types2 blockHash := gointerfaces.ConvertH256ToHash(req) blockNumber := rawdb.ReadHeaderNumber(tx, blockHash) if blockNumber == nil { - return nil, fmt.Errorf("ethereumExecutionModule.CanonicalHash: could not read block: non existent index") + return &execution.IsCanonicalResponse{Canonical: false}, nil } expectedHash, err := e.canonicalHash(ctx, tx, *blockNumber) if err != nil { @@ -131,6 +139,7 @@ func (e *EthereumExecutionModule) CurrentHeader(ctx context.Context, _ *emptypb. if err != nil { return nil, fmt.Errorf("ethereumExecutionModule.CurrentHeader: could not open database: %s", err) } + defer tx.Rollback() hash := rawdb.ReadHeadHeaderHash(tx) number := rawdb.ReadHeaderNumber(tx, hash) h, _ := e.blockReader.Header(context.Background(), tx, hash, *number) @@ -151,6 +160,9 @@ func (e *EthereumExecutionModule) GetTD(ctx context.Context, req *execution.GetS defer tx.Rollback() blockHash, blockNumber, err := e.parseSegmentRequest(ctx, tx, req) + if err == errNotFound { + return &execution.GetTDResponse{Td: nil}, nil + } if err != nil { return nil, fmt.Errorf("ethereumExecutionModule.GetHeader: %s", err) } diff --git a/turbo/execution/eth1/inserters.go b/turbo/execution/eth1/inserters.go index 40fc0cfda..d7ec0f5ff 100644 --- a/turbo/execution/eth1/inserters.go +++ b/turbo/execution/eth1/inserters.go @@ -22,17 +22,12 @@ func (e *EthereumExecutionModule) InsertBodies(ctx context.Context, req *executi return nil, fmt.Errorf("ethereumExecutionModule.InsertBodies: could not begin transaction: %s", err) } defer tx.Rollback() + e.forkValidator.ClearWithUnwind(tx, e.accumulator, e.stateChangeConsumer) + for _, grpcBody := range req.Bodies { - var ok bool - if ok, err = rawdb.WriteRawBodyIfNotExists(tx, gointerfaces.ConvertH256ToHash(grpcBody.BlockHash), grpcBody.BlockNumber, eth1_utils.ConvertRawBlockBodyFromRpc(grpcBody)); err != nil { + if _, err = rawdb.WriteRawBodyIfNotExists(tx, gointerfaces.ConvertH256ToHash(grpcBody.BlockHash), grpcBody.BlockNumber, eth1_utils.ConvertRawBlockBodyFromRpc(grpcBody)); err != nil { return nil, fmt.Errorf("ethereumExecutionModule.InsertBodies: could not insert: %s", err) } - // TODO: Replace. - if e.historyV3 && ok { - if err := rawdb.AppendCanonicalTxNums(tx, grpcBody.BlockNumber); err != nil { - return nil, fmt.Errorf("ethereumExecutionModule.InsertBodies: could not insert: %s", err) - } - } } if err := tx.Commit(); err != nil { return nil, fmt.Errorf("ethereumExecutionModule.InsertBodies: could not commit: %s", err) @@ -55,6 +50,8 @@ func (e *EthereumExecutionModule) InsertHeaders(ctx context.Context, req *execut return nil, fmt.Errorf("ethereumExecutionModule.InsertHeaders: could not begin transaction: %s", err) } defer tx.Rollback() + e.forkValidator.ClearWithUnwind(tx, e.accumulator, e.stateChangeConsumer) + for _, grpcHeader := range req.Headers { header, err := eth1_utils.HeaderRpcToHeader(grpcHeader) if err != nil { diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index b2510f626..5b2d45e84 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -240,6 +240,12 @@ func (hd *HeaderDownload) LogAnchorState() { hd.logAnchorState() } +func (hd *HeaderDownload) Engine() consensus.Engine { + hd.lock.RLock() + defer hd.lock.RUnlock() + return hd.engine +} + func (hd *HeaderDownload) logAnchorState() { //nolint:prealloc var ss []string