Don't mark blocks as bad on transient errors (#8197)

For example, erigon on devnet8 marked a block as bad due to
"mdbx_cursor_open: cannot allocate memory":
```
[INFO] [09-12|04:57:36.041] [NewPayload] Handling new payload        height=171035 hash=0x321dea00c4853ee354bebaf8aef3e63fbe06c4508271c0db4c92b0f087aedc3b
171034
[WARN] [09-12|04:57:36.069] Could not validate block                 err="[3/7 BlockHashes] table: Header, mdbx_cursor_open: cannot allocate memory, stack: [kv_mdbx.go:1057 kv_mdbx.
go:1069 kv_mdbx.go:1077 memory_mutation.go:473 memory_mutation.go:502 etl.go:123 etl.go:96 block_writer.go:40 stage_blockhashes.go:49 default_stages.go:457 sync.go:425 sync.go:258 s
tageloop.go:414 backend.go:476 fork_validator.go:250 fork_validator.go:156 ethereum_execution.go:151 execution_client.go:51 chain_reader.go:252 engine_server.go:741 engine_server.go
:235 engine_server.go:600 value.go:586 value.go:370 service.go:224 handler.go:494 handler.go:444 handler.go:392 handler.go:223 handler.go:316 asm_amd64.s:1598]"
[WARN] [09-12|04:57:36.069] ethereumExecutionModule.ValidateChain: chain is invalid hash=0x321dea00c4853ee354bebaf8aef3e63fbe06c4508271c0db4c92b0f087aedc3b
```
With this PR blocks are marked as bad only on genuine protocol errors.
This commit is contained in:
Andrew Ashikhmin 2023-09-17 11:14:36 +02:00 committed by GitHub
parent fa4cf492d4
commit 17d6f86218
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 34 additions and 20 deletions

View File

@ -19,6 +19,11 @@ package consensus
import "errors" import "errors"
var ( var (
// ErrInvalidBlock is a generic error to wrap all non-transient genuine protocol validation errors.
// For example, ErrUnexpectedWithdrawals should be wrapped as ErrInvalidBlock,
// while an out-of-memory error should not.
ErrInvalidBlock = errors.New("invalid block")
// ErrUnknownAncestor is returned when validating a block requires an ancestor // ErrUnknownAncestor is returned when validating a block requires an ancestor
// that is unknown. // that is unknown.
ErrUnknownAncestor = errors.New("unknown ancestor") ErrUnknownAncestor = errors.New("unknown ancestor")

View File

@ -468,11 +468,8 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
inMemoryExecution := func(batch kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody, inMemoryExecution := func(batch kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody,
notifications *shards.Notifications) error { notifications *shards.Notifications) error {
// Needs its own notifications to not update RPC daemon and txpool about pending blocks // Needs its own notifications to not update RPC daemon and txpool about pending blocks
stateSync, err := stages2.NewInMemoryExecution(backend.sentryCtx, backend.chainDB, config, backend.sentriesClient, stateSync := stages2.NewInMemoryExecution(backend.sentryCtx, backend.chainDB, config, backend.sentriesClient,
dirs, notifications, blockReader, blockWriter, backend.agg, log.New() /* logging will be discarded */) dirs, notifications, blockReader, blockWriter, backend.agg, log.New() /* logging will be discarded */)
if err != nil {
return err
}
chainReader := stagedsync.NewChainReaderImpl(chainConfig, batch, blockReader, logger) chainReader := stagedsync.NewChainReaderImpl(chainConfig, batch, blockReader, logger)
// We start the mining step // We start the mining step
if err := stages2.StateStep(ctx, chainReader, backend.engine, batch, backend.blockWriter, stateSync, backend.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil { if err := stages2.StateStep(ctx, chainReader, backend.engine, batch, backend.blockWriter, stateSync, backend.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil {

View File

@ -637,7 +637,7 @@ Loop:
} }
return nil return nil
}(); err != nil { }(); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, common.ErrStopped) { if !errors.Is(err, consensus.ErrInvalidBlock) {
return err return err
} else { } else {
logger.Warn(fmt.Sprintf("[%s] Execution failed", logPrefix), "block", blockNum, "hash", header.Hash().String(), "err", err) logger.Warn(fmt.Sprintf("[%s] Execution failed", logPrefix), "block", blockNum, "hash", header.Hash().String(), "err", err)

View File

@ -172,7 +172,7 @@ func executeBlock(
execRs, err = core.ExecuteBlockEphemerally(cfg.chainConfig, &vmConfig, getHashFn, cfg.engine, block, stateReader, stateWriter, NewChainReaderImpl(cfg.chainConfig, tx, cfg.blockReader, logger), getTracer, logger) execRs, err = core.ExecuteBlockEphemerally(cfg.chainConfig, &vmConfig, getHashFn, cfg.engine, block, stateReader, stateWriter, NewChainReaderImpl(cfg.chainConfig, tx, cfg.blockReader, logger), getTracer, logger)
if err != nil { if err != nil {
return err return fmt.Errorf("%w: %v", consensus.ErrInvalidBlock, err)
} }
receipts = execRs.Receipts receipts = execRs.Receipts
stateSyncReceipt = execRs.StateSyncReceipt stateSyncReceipt = execRs.StateSyncReceipt
@ -462,12 +462,14 @@ Loop:
if err = executeBlock(block, tx, batch, cfg, *cfg.vmConfig, writeChangeSets, writeReceipts, writeCallTraces, initialCycle, stateStream, logger); err != nil { if err = executeBlock(block, tx, batch, cfg, *cfg.vmConfig, writeChangeSets, writeReceipts, writeCallTraces, initialCycle, stateStream, logger); err != nil {
if !errors.Is(err, context.Canceled) { if !errors.Is(err, context.Canceled) {
logger.Warn(fmt.Sprintf("[%s] Execution failed", logPrefix), "block", blockNum, "hash", blockHash.String(), "err", err) logger.Warn(fmt.Sprintf("[%s] Execution failed", logPrefix), "block", blockNum, "hash", blockHash.String(), "err", err)
if cfg.hd != nil { if cfg.hd != nil && errors.Is(err, consensus.ErrInvalidBlock) {
cfg.hd.ReportBadHeaderPoS(blockHash, block.ParentHash() /* lastValidAncestor */) cfg.hd.ReportBadHeaderPoS(blockHash, block.ParentHash() /* lastValidAncestor */)
} }
if cfg.badBlockHalt { if cfg.badBlockHalt {
return err return err
} }
}
if errors.Is(err, consensus.ErrInvalidBlock) {
u.UnwindTo(blockNum-1, blockHash /* badBlock */) u.UnwindTo(blockNum-1, blockHash /* badBlock */)
} else { } else {
u.UnwindTo(blockNum-1, libcommon.Hash{} /* badBlock */) u.UnwindTo(blockNum-1, libcommon.Hash{} /* badBlock */)

View File

@ -23,6 +23,7 @@ import (
"github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/common/math" "github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/types/accounts" "github.com/ledgerwatch/erigon/core/types/accounts"
"github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/services"
@ -126,7 +127,7 @@ func SpawnIntermediateHashesStage(s *StageState, u Unwinder, tx kv.RwTx, cfg Tri
if cfg.checkRoot && root != expectedRootHash { if cfg.checkRoot && root != expectedRootHash {
logger.Error(fmt.Sprintf("[%s] Wrong trie root of block %d: %x, expected (from header): %x. Block hash: %x", logPrefix, to, root, expectedRootHash, headerHash)) logger.Error(fmt.Sprintf("[%s] Wrong trie root of block %d: %x, expected (from header): %x. Block hash: %x", logPrefix, to, root, expectedRootHash, headerHash))
if cfg.badBlockHalt { if cfg.badBlockHalt {
return trie.EmptyRoot, fmt.Errorf("wrong trie root") return trie.EmptyRoot, fmt.Errorf("%w: wrong trie root", consensus.ErrInvalidBlock)
} }
if cfg.hd != nil { if cfg.hd != nil {
cfg.hd.ReportBadHeaderPoS(headerHash, syncHeadHeader.ParentHash) cfg.hd.ReportBadHeaderPoS(headerHash, syncHeadHeader.ParentHash)

View File

@ -16,6 +16,7 @@ import (
"github.com/ledgerwatch/erigon-lib/common/length" "github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/secp256k1" "github.com/ledgerwatch/secp256k1"
@ -258,7 +259,7 @@ Loop:
return minBlockErr return minBlockErr
} }
minHeader := rawdb.ReadHeader(tx, minBlockHash, minBlockNum) minHeader := rawdb.ReadHeader(tx, minBlockHash, minBlockNum)
if cfg.hd != nil { if cfg.hd != nil && errors.Is(minBlockErr, consensus.ErrInvalidBlock) {
cfg.hd.ReportBadHeaderPoS(minBlockHash, minHeader.ParentHash) cfg.hd.ReportBadHeaderPoS(minBlockHash, minHeader.ParentHash)
} }
@ -328,7 +329,8 @@ func recoverSenders(ctx context.Context, logPrefix string, cryptoContext *secp25
for i, tx := range body.Transactions { for i, tx := range body.Transactions {
from, err := signer.SenderWithContext(cryptoContext, tx) from, err := signer.SenderWithContext(cryptoContext, tx)
if err != nil { if err != nil {
job.err = fmt.Errorf("%s: error recovering sender for tx=%x, %w", logPrefix, tx.Hash(), err) job.err = fmt.Errorf("%w: error recovering sender for tx=%x, %v",
consensus.ErrInvalidBlock, tx.Hash(), err)
break break
} }
copy(job.senders[i*length.Addr:], from[:]) copy(job.senders[i*length.Addr:], from[:])

View File

@ -130,7 +130,7 @@ func (e *EngineBlockDownloader) waitForEndOfHeadersDownload() headerdownload.Syn
// waitForEndOfHeadersDownload waits until the download of headers ends and returns the outcome. // waitForEndOfHeadersDownload waits until the download of headers ends and returns the outcome.
func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uint64, toBlock uint64, fromHash libcommon.Hash, err error) { func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uint64, toBlock uint64, fromHash libcommon.Hash, err error) {
var lastValidHash libcommon.Hash var lastValidHash libcommon.Hash
var badChainError error var badChainError error // TODO(yperbasis): this is not set anywhere
var foundPow bool var foundPow bool
headerLoadFunc := func(key, value []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error { headerLoadFunc := func(key, value []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error {

View File

@ -15,6 +15,7 @@ package engine_helpers
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sync" "sync"
@ -22,6 +23,7 @@ import (
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/cl/phase1/core/state/lru" "github.com/ledgerwatch/erigon/cl/phase1/core/state/lru"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_types" "github.com/ledgerwatch/erigon/turbo/engineapi/engine_types"
"github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/services"
@ -252,7 +254,15 @@ func (fv *ForkValidator) ClearWithUnwind(accumulator *shards.Accumulator, c shar
// validateAndStorePayload validate and store a payload fork chain if such chain results valid. // validateAndStorePayload validate and store a payload fork chain if such chain results valid.
func (fv *ForkValidator) validateAndStorePayload(tx kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody, func (fv *ForkValidator) validateAndStorePayload(tx kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody,
notifications *shards.Notifications) (status engine_types.EngineStatus, latestValidHash libcommon.Hash, validationError error, criticalError error) { notifications *shards.Notifications) (status engine_types.EngineStatus, latestValidHash libcommon.Hash, validationError error, criticalError error) {
validationError = fv.validatePayload(tx, header, body, unwindPoint, headersChain, bodiesChain, notifications) if err := fv.validatePayload(tx, header, body, unwindPoint, headersChain, bodiesChain, notifications); err != nil {
if errors.Is(err, consensus.ErrInvalidBlock) {
validationError = err
} else {
criticalError = err
return
}
}
latestValidHash = header.Hash() latestValidHash = header.Hash()
if validationError != nil { if validationError != nil {
var latestValidNumber uint64 var latestValidNumber uint64

View File

@ -340,10 +340,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
inMemoryExecution := func(batch kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody, inMemoryExecution := func(batch kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody,
notifications *shards.Notifications) error { notifications *shards.Notifications) error {
// Needs its own notifications to not update RPC daemon and txpool about pending blocks // Needs its own notifications to not update RPC daemon and txpool about pending blocks
stateSync, err := stages2.NewInMemoryExecution(ctx, mock.DB, &ethconfig.Defaults, mock.sentriesClient, dirs, notifications, mock.BlockReader, blockWriter, agg, log.New() /* logging will be discarded */) stateSync := stages2.NewInMemoryExecution(ctx, mock.DB, &ethconfig.Defaults, mock.sentriesClient, dirs, notifications, mock.BlockReader, blockWriter, agg, log.New() /* logging will be discarded */)
if err != nil {
return err
}
// We start the mining step // We start the mining step
if err := stages2.StateStep(ctx, nil, engine, batch, blockWriter, stateSync, mock.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil { if err := stages2.StateStep(ctx, nil, engine, batch, blockWriter, stateSync, mock.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil {
logger.Warn("Could not validate block", "err", err) logger.Warn("Could not validate block", "err", err)

View File

@ -365,7 +365,7 @@ func StateStep(ctx context.Context, chainReader consensus.ChainHeaderReader, eng
if chainReader != nil { if chainReader != nil {
if err := engine.VerifyHeader(chainReader, currentHeader, true); err != nil { if err := engine.VerifyHeader(chainReader, currentHeader, true); err != nil {
log.Warn("Header Verification Failed", "number", currentHeight, "hash", currentHash, "reason", err) log.Warn("Header Verification Failed", "number", currentHeight, "hash", currentHash, "reason", err)
return err return fmt.Errorf("%w: %v", consensus.ErrInvalidBlock, err)
} }
} }
@ -398,7 +398,7 @@ func StateStep(ctx context.Context, chainReader consensus.ChainHeaderReader, eng
} }
if err := engine.VerifyHeader(chainReader, header, true); err != nil { if err := engine.VerifyHeader(chainReader, header, true); err != nil {
log.Warn("Header Verification Failed", "number", header.Number.Uint64(), "hash", header.Hash(), "reason", err) log.Warn("Header Verification Failed", "number", header.Number.Uint64(), "hash", header.Hash(), "reason", err)
return err return fmt.Errorf("%w: %v", consensus.ErrInvalidBlock, err)
} }
// Setup // Setup
@ -539,7 +539,7 @@ func NewPipelineStages(ctx context.Context,
func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config, controlServer *sentry.MultiClient, func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config, controlServer *sentry.MultiClient,
dirs datadir.Dirs, notifications *shards.Notifications, blockReader services.FullBlockReader, blockWriter *blockio.BlockWriter, agg *state.AggregatorV3, dirs datadir.Dirs, notifications *shards.Notifications, blockReader services.FullBlockReader, blockWriter *blockio.BlockWriter, agg *state.AggregatorV3,
logger log.Logger) (*stagedsync.Sync, error) { logger log.Logger) *stagedsync.Sync {
return stagedsync.New( return stagedsync.New(
stagedsync.StateStages(ctx, stagedsync.StateStages(ctx,
stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, false, blockReader, blockWriter, dirs.Tmp, nil, nil), stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, false, blockReader, blockWriter, dirs.Tmp, nil, nil),
@ -570,5 +570,5 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config
stagedsync.StateUnwindOrder, stagedsync.StateUnwindOrder,
nil, /* pruneOrder */ nil, /* pruneOrder */
logger, logger,
), nil )
} }