diff --git a/consensus/errors.go b/consensus/errors.go index c88ae64b4..0659812b7 100644 --- a/consensus/errors.go +++ b/consensus/errors.go @@ -19,6 +19,11 @@ package consensus import "errors" var ( + // ErrInvalidBlock is a generic error to wrap all non-transient genuine protocol validation errors. + // For example, ErrUnexpectedWithdrawals should be wrapped as ErrInvalidBlock, + // while an out-of-memory error should not. + ErrInvalidBlock = errors.New("invalid block") + // ErrUnknownAncestor is returned when validating a block requires an ancestor // that is unknown. ErrUnknownAncestor = errors.New("unknown ancestor") diff --git a/eth/backend.go b/eth/backend.go index 992276096..b49acc113 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -468,11 +468,8 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere inMemoryExecution := func(batch kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody, notifications *shards.Notifications) error { // Needs its own notifications to not update RPC daemon and txpool about pending blocks - stateSync, err := stages2.NewInMemoryExecution(backend.sentryCtx, backend.chainDB, config, backend.sentriesClient, + stateSync := stages2.NewInMemoryExecution(backend.sentryCtx, backend.chainDB, config, backend.sentriesClient, dirs, notifications, blockReader, blockWriter, backend.agg, log.New() /* logging will be discarded */) - if err != nil { - return err - } chainReader := stagedsync.NewChainReaderImpl(chainConfig, batch, blockReader, logger) // We start the mining step if err := stages2.StateStep(ctx, chainReader, backend.engine, batch, backend.blockWriter, stateSync, backend.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil { diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 3d90e484f..86db7d343 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -637,7 +637,7 @@ Loop: } return nil }(); err != nil { - if errors.Is(err, context.Canceled) || errors.Is(err, common.ErrStopped) { + if !errors.Is(err, consensus.ErrInvalidBlock) { return err } else { logger.Warn(fmt.Sprintf("[%s] Execution failed", logPrefix), "block", blockNum, "hash", header.Hash().String(), "err", err) diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 790a931cc..c19b0b739 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -172,7 +172,7 @@ func executeBlock( execRs, err = core.ExecuteBlockEphemerally(cfg.chainConfig, &vmConfig, getHashFn, cfg.engine, block, stateReader, stateWriter, NewChainReaderImpl(cfg.chainConfig, tx, cfg.blockReader, logger), getTracer, logger) if err != nil { - return err + return fmt.Errorf("%w: %v", consensus.ErrInvalidBlock, err) } receipts = execRs.Receipts stateSyncReceipt = execRs.StateSyncReceipt @@ -462,12 +462,14 @@ Loop: if err = executeBlock(block, tx, batch, cfg, *cfg.vmConfig, writeChangeSets, writeReceipts, writeCallTraces, initialCycle, stateStream, logger); err != nil { if !errors.Is(err, context.Canceled) { logger.Warn(fmt.Sprintf("[%s] Execution failed", logPrefix), "block", blockNum, "hash", blockHash.String(), "err", err) - if cfg.hd != nil { + if cfg.hd != nil && errors.Is(err, consensus.ErrInvalidBlock) { cfg.hd.ReportBadHeaderPoS(blockHash, block.ParentHash() /* lastValidAncestor */) } if cfg.badBlockHalt { return err } + } + if errors.Is(err, consensus.ErrInvalidBlock) { u.UnwindTo(blockNum-1, blockHash /* badBlock */) } else { u.UnwindTo(blockNum-1, libcommon.Hash{} /* badBlock */) diff --git a/eth/stagedsync/stage_interhashes.go b/eth/stagedsync/stage_interhashes.go index 32297d559..5fdd7f38b 100644 --- a/eth/stagedsync/stage_interhashes.go +++ b/eth/stagedsync/stage_interhashes.go @@ -23,6 +23,7 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/common/math" + "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/core/types/accounts" "github.com/ledgerwatch/erigon/turbo/services" @@ -126,7 +127,7 @@ func SpawnIntermediateHashesStage(s *StageState, u Unwinder, tx kv.RwTx, cfg Tri if cfg.checkRoot && root != expectedRootHash { logger.Error(fmt.Sprintf("[%s] Wrong trie root of block %d: %x, expected (from header): %x. Block hash: %x", logPrefix, to, root, expectedRootHash, headerHash)) if cfg.badBlockHalt { - return trie.EmptyRoot, fmt.Errorf("wrong trie root") + return trie.EmptyRoot, fmt.Errorf("%w: wrong trie root", consensus.ErrInvalidBlock) } if cfg.hd != nil { cfg.hd.ReportBadHeaderPoS(headerHash, syncHeadHeader.ParentHash) diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index 04da5612c..b15409eab 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -16,6 +16,7 @@ import ( "github.com/ledgerwatch/erigon-lib/common/length" "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/secp256k1" @@ -258,7 +259,7 @@ Loop: return minBlockErr } minHeader := rawdb.ReadHeader(tx, minBlockHash, minBlockNum) - if cfg.hd != nil { + if cfg.hd != nil && errors.Is(minBlockErr, consensus.ErrInvalidBlock) { cfg.hd.ReportBadHeaderPoS(minBlockHash, minHeader.ParentHash) } @@ -328,7 +329,8 @@ func recoverSenders(ctx context.Context, logPrefix string, cryptoContext *secp25 for i, tx := range body.Transactions { from, err := signer.SenderWithContext(cryptoContext, tx) if err != nil { - job.err = fmt.Errorf("%s: error recovering sender for tx=%x, %w", logPrefix, tx.Hash(), err) + job.err = fmt.Errorf("%w: error recovering sender for tx=%x, %v", + consensus.ErrInvalidBlock, tx.Hash(), err) break } copy(job.senders[i*length.Addr:], from[:]) diff --git a/turbo/engineapi/engine_block_downloader/block_downloader.go b/turbo/engineapi/engine_block_downloader/block_downloader.go index bd401eb61..b14f52e9a 100644 --- a/turbo/engineapi/engine_block_downloader/block_downloader.go +++ b/turbo/engineapi/engine_block_downloader/block_downloader.go @@ -130,7 +130,7 @@ func (e *EngineBlockDownloader) waitForEndOfHeadersDownload() headerdownload.Syn // waitForEndOfHeadersDownload waits until the download of headers ends and returns the outcome. func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uint64, toBlock uint64, fromHash libcommon.Hash, err error) { var lastValidHash libcommon.Hash - var badChainError error + var badChainError error // TODO(yperbasis): this is not set anywhere var foundPow bool headerLoadFunc := func(key, value []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error { diff --git a/turbo/engineapi/engine_helpers/fork_validator.go b/turbo/engineapi/engine_helpers/fork_validator.go index df039d856..c5323327c 100644 --- a/turbo/engineapi/engine_helpers/fork_validator.go +++ b/turbo/engineapi/engine_helpers/fork_validator.go @@ -15,6 +15,7 @@ package engine_helpers import ( "context" + "errors" "fmt" "sync" @@ -22,6 +23,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/ledgerwatch/erigon/cl/phase1/core/state/lru" + "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/turbo/engineapi/engine_types" "github.com/ledgerwatch/erigon/turbo/services" @@ -252,7 +254,15 @@ func (fv *ForkValidator) ClearWithUnwind(accumulator *shards.Accumulator, c shar // validateAndStorePayload validate and store a payload fork chain if such chain results valid. func (fv *ForkValidator) validateAndStorePayload(tx kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody, notifications *shards.Notifications) (status engine_types.EngineStatus, latestValidHash libcommon.Hash, validationError error, criticalError error) { - validationError = fv.validatePayload(tx, header, body, unwindPoint, headersChain, bodiesChain, notifications) + if err := fv.validatePayload(tx, header, body, unwindPoint, headersChain, bodiesChain, notifications); err != nil { + if errors.Is(err, consensus.ErrInvalidBlock) { + validationError = err + } else { + criticalError = err + return + } + } + latestValidHash = header.Hash() if validationError != nil { var latestValidNumber uint64 diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index b5f59e2a1..03d818927 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -340,10 +340,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK inMemoryExecution := func(batch kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody, notifications *shards.Notifications) error { // Needs its own notifications to not update RPC daemon and txpool about pending blocks - stateSync, err := stages2.NewInMemoryExecution(ctx, mock.DB, ðconfig.Defaults, mock.sentriesClient, dirs, notifications, mock.BlockReader, blockWriter, agg, log.New() /* logging will be discarded */) - if err != nil { - return err - } + stateSync := stages2.NewInMemoryExecution(ctx, mock.DB, ðconfig.Defaults, mock.sentriesClient, dirs, notifications, mock.BlockReader, blockWriter, agg, log.New() /* logging will be discarded */) // We start the mining step if err := stages2.StateStep(ctx, nil, engine, batch, blockWriter, stateSync, mock.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil { logger.Warn("Could not validate block", "err", err) diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 7b97608e3..189c6455f 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -365,7 +365,7 @@ func StateStep(ctx context.Context, chainReader consensus.ChainHeaderReader, eng if chainReader != nil { if err := engine.VerifyHeader(chainReader, currentHeader, true); err != nil { log.Warn("Header Verification Failed", "number", currentHeight, "hash", currentHash, "reason", err) - return err + return fmt.Errorf("%w: %v", consensus.ErrInvalidBlock, err) } } @@ -398,7 +398,7 @@ func StateStep(ctx context.Context, chainReader consensus.ChainHeaderReader, eng } if err := engine.VerifyHeader(chainReader, header, true); err != nil { log.Warn("Header Verification Failed", "number", header.Number.Uint64(), "hash", header.Hash(), "reason", err) - return err + return fmt.Errorf("%w: %v", consensus.ErrInvalidBlock, err) } // Setup @@ -539,7 +539,7 @@ func NewPipelineStages(ctx context.Context, func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config, controlServer *sentry.MultiClient, dirs datadir.Dirs, notifications *shards.Notifications, blockReader services.FullBlockReader, blockWriter *blockio.BlockWriter, agg *state.AggregatorV3, - logger log.Logger) (*stagedsync.Sync, error) { + logger log.Logger) *stagedsync.Sync { return stagedsync.New( stagedsync.StateStages(ctx, stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, false, blockReader, blockWriter, dirs.Tmp, nil, nil), @@ -570,5 +570,5 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config stagedsync.StateUnwindOrder, nil, /* pruneOrder */ logger, - ), nil + ) }