From d8b91c4d024ab74d5898e3d057f674123cc3b272 Mon Sep 17 00:00:00 2001 From: Mark Holt <135143369+mh0lt@users.noreply.github.com> Date: Sat, 13 Jan 2024 10:33:34 +0000 Subject: [PATCH] Fix startup sync for txpool processing for bor block production (#9219) When the sync loop first runs it suppresses block sync events both in the initial loop and when the blocks being processed are greater than 1000. This fix removed the first check, because otherwise the first block received by the process ends up not getting sent to the tx pool. Which means it won't produce new block for polygon. As well as this fix - I have also moved the gas initialization to the txpool start method rather than prompting it with a 'synthetic block event' As the txpool start has access to the core & tx DB's it can find the current block and chain config internally so that it doesn't need to be externally activated it can just do this itself on start up. This has the advantage of making the txpool more self contained. --- cmd/txpool/main.go | 5 ++- consensus/misc/eip1559.go | 39 +++++++++++++++++++ erigon-lib/chain/chain_config.go | 8 ++-- erigon-lib/txpool/pool.go | 31 +++++++++++---- erigon-lib/txpool/pool_fuzz_test.go | 4 +- erigon-lib/txpool/pool_test.go | 14 +++---- .../txpool/txpooluitl/all_components.go | 4 +- eth/backend.go | 23 ++--------- eth/stagedsync/stage_execute.go | 17 ++++---- turbo/shards/state_change_accumulator.go | 1 + .../snapshotsync/freezeblocks/block_reader.go | 9 ++++- .../freezeblocks/block_snapshots.go | 8 +++- turbo/stages/mock/mock_sentry.go | 4 +- 13 files changed, 110 insertions(+), 57 deletions(-) diff --git a/cmd/txpool/main.go b/cmd/txpool/main.go index 463be539b..905ba3ef7 100644 --- a/cmd/txpool/main.go +++ b/cmd/txpool/main.go @@ -24,6 +24,7 @@ import ( "github.com/ledgerwatch/erigon-lib/types" "github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcdaemontest" common2 "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon/consensus/misc" "github.com/ledgerwatch/erigon/ethdb/privateapi" "github.com/ledgerwatch/log/v3" "github.com/spf13/cobra" @@ -163,7 +164,7 @@ func doTxpool(ctx context.Context, logger log.Logger) error { newTxs := make(chan types.Announcements, 1024) defer close(newTxs) txPoolDB, txPool, fetch, send, txpoolGrpcServer, err := txpooluitl.AllComponents(ctx, cfg, - kvcache.New(cacheConfig), newTxs, coreDB, sentryClients, kvClient, logger) + kvcache.New(cacheConfig), newTxs, coreDB, sentryClients, kvClient, misc.Eip1559FeeCalculator, logger) if err != nil { return err } @@ -178,7 +179,7 @@ func doTxpool(ctx context.Context, logger log.Logger) error { } notifyMiner := func() {} - txpool.MainLoop(ctx, txPoolDB, coreDB, txPool, newTxs, send, txpoolGrpcServer.NewSlotsStreams, notifyMiner) + txpool.MainLoop(ctx, txPoolDB, txPool, newTxs, send, txpoolGrpcServer.NewSlotsStreams, notifyMiner) grpcServer.GracefulStop() return nil diff --git a/consensus/misc/eip1559.go b/consensus/misc/eip1559.go index de5b4ec01..b2a53af3f 100644 --- a/consensus/misc/eip1559.go +++ b/consensus/misc/eip1559.go @@ -22,9 +22,12 @@ import ( "github.com/ledgerwatch/erigon-lib/chain" "github.com/ledgerwatch/erigon-lib/common" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/polygon/bor/borcfg" "github.com/ledgerwatch/erigon/common/math" + "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/params" ) @@ -56,6 +59,42 @@ func VerifyEip1559Header(config *chain.Config, parent, header *types.Header, ski return nil } +var Eip1559FeeCalculator eip1559Calculator + +type eip1559Calculator struct{} + +func (f eip1559Calculator) CurrentFees(chainConfig *chain.Config, db kv.Getter) (baseFee uint64, blobFee uint64, minBlobGasPrice uint64, err error) { + hash := rawdb.ReadHeadHeaderHash(db) + + if hash == (libcommon.Hash{}) { + return 0, 0, 0, fmt.Errorf("can't get head header hash") + } + + currentHeader, err := rawdb.ReadHeaderByHash(db, hash) + + if err != nil { + return 0, 0, 0, err + } + + if chainConfig != nil { + if currentHeader.BaseFee != nil { + baseFee = CalcBaseFee(chainConfig, currentHeader).Uint64() + } + + if currentHeader.ExcessBlobGas != nil { + excessBlobGas := CalcExcessBlobGas(chainConfig, currentHeader) + b, err := GetBlobGasPrice(chainConfig, excessBlobGas) + if err == nil { + blobFee = b.Uint64() + } + } + } + + minBlobGasPrice = chainConfig.GetMinBlobGasPrice() + + return baseFee, blobFee, minBlobGasPrice, nil +} + // CalcBaseFee calculates the basefee of the header. func CalcBaseFee(config *chain.Config, parent *types.Header) *big.Int { // If the current block is the first EIP-1559 block, return the InitialBaseFee. diff --git a/erigon-lib/chain/chain_config.go b/erigon-lib/chain/chain_config.go index cc8fe6d94..bb5125660 100644 --- a/erigon-lib/chain/chain_config.go +++ b/erigon-lib/chain/chain_config.go @@ -233,28 +233,28 @@ func (c *Config) GetBurntContract(num uint64) *common.Address { } func (c *Config) GetMinBlobGasPrice() uint64 { - if c.MinBlobGasPrice != nil { + if c != nil && c.MinBlobGasPrice != nil { return *c.MinBlobGasPrice } return 1 // MIN_BLOB_GASPRICE (EIP-4844) } func (c *Config) GetMaxBlobGasPerBlock() uint64 { - if c.MaxBlobGasPerBlock != nil { + if c != nil && c.MaxBlobGasPerBlock != nil { return *c.MaxBlobGasPerBlock } return 786432 // MAX_BLOB_GAS_PER_BLOCK (EIP-4844) } func (c *Config) GetTargetBlobGasPerBlock() uint64 { - if c.TargetBlobGasPerBlock != nil { + if c != nil && c.TargetBlobGasPerBlock != nil { return *c.TargetBlobGasPerBlock } return 393216 // TARGET_BLOB_GAS_PER_BLOCK (EIP-4844) } func (c *Config) GetBlobGasPriceUpdateFraction() uint64 { - if c.BlobGasPriceUpdateFraction != nil { + if c != nil && c.BlobGasPriceUpdateFraction != nil { return *c.BlobGasPriceUpdateFraction } return 3338477 // BLOB_GASPRICE_UPDATE_FRACTION (EIP-4844) diff --git a/erigon-lib/txpool/pool.go b/erigon-lib/txpool/pool.go index e6e479815..88026073a 100644 --- a/erigon-lib/txpool/pool.go +++ b/erigon-lib/txpool/pool.go @@ -225,11 +225,17 @@ type TxPool struct { cancunTime *uint64 isPostCancun atomic.Bool maxBlobsPerBlock uint64 + feeCalculator FeeCalculator logger log.Logger } +type FeeCalculator interface { + CurrentFees(chainConfig *chain.Config, db kv.Getter) (baseFee uint64, blobFee uint64, minBlobGasPrice uint64, err error) +} + func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, cache kvcache.Cache, - chainID uint256.Int, shanghaiTime, agraBlock, cancunTime *big.Int, maxBlobsPerBlock uint64, logger log.Logger, + chainID uint256.Int, shanghaiTime, agraBlock, cancunTime *big.Int, maxBlobsPerBlock uint64, + feeCalculator FeeCalculator, logger log.Logger, ) (*TxPool, error) { localsHistory, err := simplelru.NewLRU[string, struct{}](10_000, nil) if err != nil { @@ -275,6 +281,7 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, minedBlobTxsByBlock: map[uint64][]*metaTx{}, minedBlobTxsByHash: map[string]*metaTx{}, maxBlobsPerBlock: maxBlobsPerBlock, + feeCalculator: feeCalculator, logger: logger, } @@ -331,7 +338,6 @@ func (p *TxPool) Start(ctx context.Context, db kv.RwDB) error { } func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error { - defer newBlockTimer.ObserveDuration(time.Now()) //t := time.Now() @@ -1700,7 +1706,7 @@ const txMaxBroadcastSize = 4 * 1024 // // promote/demote transactions // reorgs -func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs chan types.Announcements, send *Send, newSlotsStreams *NewSlotsStreams, notifyMiningAboutNewSlots func()) { +func MainLoop(ctx context.Context, db kv.RwDB, p *TxPool, newTxs chan types.Announcements, send *Send, newSlotsStreams *NewSlotsStreams, notifyMiningAboutNewSlots func()) { syncToNewPeersEvery := time.NewTicker(p.cfg.SyncToNewPeersEvery) defer syncToNewPeersEvery.Stop() processRemoteTxsEvery := time.NewTicker(p.cfg.ProcessRemoteTxsEvery) @@ -2072,8 +2078,15 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { i++ } - var pendingBaseFee uint64 - { + var pendingBaseFee, pendingBlobFee, minBlobGasPrice uint64 + + if p.feeCalculator != nil { + if chainConfig, _ := ChainConfig(tx); chainConfig != nil { + pendingBaseFee, pendingBlobFee, minBlobGasPrice, _ = p.feeCalculator.CurrentFees(chainConfig, coreTx) + } + } + + if pendingBaseFee == 0 { v, err := tx.GetOne(kv.PoolInfo, PoolPendingBaseFeeKey) if err != nil { return err @@ -2082,8 +2095,8 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { pendingBaseFee = binary.BigEndian.Uint64(v) } } - var pendingBlobFee uint64 = 1 // MIN_BLOB_GAS_PRICE A/EIP-4844 - { + + if pendingBlobFee == 0 { v, err := tx.GetOne(kv.PoolInfo, PoolPendingBlobFeeKey) if err != nil { return err @@ -2093,6 +2106,10 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { } } + if pendingBlobFee == 0 { + pendingBlobFee = minBlobGasPrice + } + err = p.senders.registerNewSenders(&txs, p.logger) if err != nil { return err diff --git a/erigon-lib/txpool/pool_fuzz_test.go b/erigon-lib/txpool/pool_fuzz_test.go index 1e8923d88..08106374c 100644 --- a/erigon-lib/txpool/pool_fuzz_test.go +++ b/erigon-lib/txpool/pool_fuzz_test.go @@ -314,7 +314,7 @@ func FuzzOnNewBlocks(f *testing.F) { cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New()) + pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) assert.NoError(err) err = pool.Start(ctx, db) @@ -540,7 +540,7 @@ func FuzzOnNewBlocks(f *testing.F) { check(p2pReceived, types.TxSlots{}, "after_flush") checkNotify(p2pReceived, types.TxSlots{}, "after_flush") - p2, err := New(ch, coreDB, txpoolcfg.DefaultConfig, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New()) + p2, err := New(ch, coreDB, txpoolcfg.DefaultConfig, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) assert.NoError(err) p2.senders = pool.senders // senders are not persisted diff --git a/erigon-lib/txpool/pool_test.go b/erigon-lib/txpool/pool_test.go index ef4347a58..80de08a51 100644 --- a/erigon-lib/txpool/pool_test.go +++ b/erigon-lib/txpool/pool_test.go @@ -53,7 +53,7 @@ func TestNonceFromAddress(t *testing.T) { cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New()) + pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) assert.NoError(err) require.True(pool != nil) ctx := context.Background() @@ -173,7 +173,7 @@ func TestReplaceWithHigherFee(t *testing.T) { cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New()) + pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) assert.NoError(err) require.NotEqual(nil, pool) ctx := context.Background() @@ -290,7 +290,7 @@ func TestReverseNonces(t *testing.T) { cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New()) + pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) assert.NoError(err) require.True(pool != nil) ctx := context.Background() @@ -417,7 +417,7 @@ func TestTxPoke(t *testing.T) { cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New()) + pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) assert.NoError(err) require.True(pool != nil) ctx := context.Background() @@ -682,7 +682,7 @@ func TestShanghaiValidateTx(t *testing.T) { } cache := &kvcache.DummyCache{} - pool, err := New(ch, coreDB, cfg, cache, *u256.N1, shanghaiTime, nil /* agraBlock */, nil /* cancunTime */, fixedgas.DefaultMaxBlobsPerBlock, logger) + pool, err := New(ch, coreDB, cfg, cache, *u256.N1, shanghaiTime, nil /* agraBlock */, nil /* cancunTime */, fixedgas.DefaultMaxBlobsPerBlock, nil, logger) asrt.NoError(err) ctx := context.Background() tx, err := coreDB.BeginRw(ctx) @@ -728,7 +728,7 @@ func TestBlobTxReplacement(t *testing.T) { db, coreDB := memdb.NewTestPoolDB(t), memdb.NewTestDB(t) cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, common.Big0, nil, common.Big0, fixedgas.DefaultMaxBlobsPerBlock, log.New()) + pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, common.Big0, nil, common.Big0, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) assert.NoError(err) require.True(pool != nil) ctx := context.Background() @@ -953,7 +953,7 @@ func TestDropRemoteAtNoGossip(t *testing.T) { logger := log.New() sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - txPool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, big.NewInt(0), big.NewInt(0), nil, fixedgas.DefaultMaxBlobsPerBlock, logger) + txPool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, big.NewInt(0), big.NewInt(0), nil, fixedgas.DefaultMaxBlobsPerBlock, nil, logger) assert.NoError(err) require.True(txPool != nil) diff --git a/erigon-lib/txpool/txpooluitl/all_components.go b/erigon-lib/txpool/txpooluitl/all_components.go index 156a5771c..ecf884bbe 100644 --- a/erigon-lib/txpool/txpooluitl/all_components.go +++ b/erigon-lib/txpool/txpooluitl/all_components.go @@ -101,7 +101,7 @@ func SaveChainConfigIfNeed(ctx context.Context, coreDB kv.RoDB, txPoolDB kv.RwDB } func AllComponents(ctx context.Context, cfg txpoolcfg.Config, cache kvcache.Cache, newTxs chan types.Announcements, chainDB kv.RoDB, - sentryClients []direct.SentryClient, stateChangesClient txpool.StateChangesClient, logger log.Logger) (kv.RwDB, *txpool.TxPool, *txpool.Fetch, *txpool.Send, *txpool.GrpcServer, error) { + sentryClients []direct.SentryClient, stateChangesClient txpool.StateChangesClient, feeCalculator txpool.FeeCalculator, logger log.Logger) (kv.RwDB, *txpool.TxPool, *txpool.Fetch, *txpool.Send, *txpool.GrpcServer, error) { opts := mdbx.NewMDBX(logger).Label(kv.TxPoolDB).Path(cfg.DBDir). WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }). WriteMergeThreshold(3 * 8192). @@ -144,7 +144,7 @@ func AllComponents(ctx context.Context, cfg txpoolcfg.Config, cache kvcache.Cach cancunTime = cfg.OverrideCancunTime } - txPool, err := txpool.New(newTxs, chainDB, cfg, cache, *chainID, shanghaiTime, agraBlock, cancunTime, maxBlobsPerBlock, logger) + txPool, err := txpool.New(newTxs, chainDB, cfg, cache, *chainID, shanghaiTime, agraBlock, cancunTime, maxBlobsPerBlock, feeCalculator, logger) if err != nil { return nil, nil, nil, nil, nil, err } diff --git a/eth/backend.go b/eth/backend.go index 686109446..1b2e1baee 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -604,7 +604,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger backend.newTxs = make(chan types2.Announcements, 1024) //defer close(newTxs) backend.txPoolDB, backend.txPool, backend.txPoolFetch, backend.txPoolSend, backend.txPoolGrpcServer, err = txpooluitl.AllComponents( - ctx, config.TxPool, kvcache.NewDummy(), backend.newTxs, backend.chainDB, backend.sentriesClient.Sentries(), stateDiffClient, logger, + ctx, config.TxPool, kvcache.NewDummy(), backend.newTxs, backend.chainDB, backend.sentriesClient.Sentries(), stateDiffClient, misc.Eip1559FeeCalculator, logger, ) if err != nil { return nil, err @@ -708,23 +708,6 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger // 1) Hive tests requires us to do so and starting it from eth_sendRawTransaction is not viable as we have not enough data // to initialize it properly. // 2) we cannot propose for block 1 regardless. - go func() { - time.Sleep(10 * time.Millisecond) - baseFee := uint64(0) - if currentBlock.BaseFee() != nil { - baseFee = misc.CalcBaseFee(chainConfig, currentBlock.Header()).Uint64() - } - blobFee := chainConfig.GetMinBlobGasPrice() - if currentBlock.Header().ExcessBlobGas != nil { - excessBlobGas := misc.CalcExcessBlobGas(chainConfig, currentBlock.Header()) - b, err := misc.GetBlobGasPrice(chainConfig, excessBlobGas) - if err == nil { - blobFee = b.Uint64() - } - } - backend.notifications.Accumulator.StartChange(currentBlock.NumberU64(), currentBlock.Hash(), nil, false) - backend.notifications.Accumulator.SendAndReset(ctx, backend.notifications.StateChangesConsumer, baseFee, blobFee, currentBlock.GasLimit(), 0) - }() if !config.DeprecatedTxPool.Disable { backend.txPoolFetch.ConnectCore() @@ -734,8 +717,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger newTxsBroadcaster = casted.NewSlotsStreams } go txpool.MainLoop(backend.sentryCtx, - backend.txPoolDB, backend.chainDB, - backend.txPool, backend.newTxs, backend.txPoolSend, newTxsBroadcaster, + backend.txPoolDB, backend.txPool, backend.newTxs, backend.txPoolSend, newTxsBroadcaster, func() { select { case backend.notifyMiningAboutNewTxs <- struct{}{}: @@ -743,6 +725,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger } }) } + go func() { defer debug.LogPanic() for { diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index c53e1995d..7bf77db28 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -145,12 +145,11 @@ func executeBlock( writeChangesets bool, writeReceipts bool, writeCallTraces bool, - initialCycle bool, stateStream bool, logger log.Logger, ) error { blockNum := block.NumberU64() - stateReader, stateWriter, err := newStateReaderWriter(batch, tx, block, writeChangesets, cfg.accumulator, cfg.blockReader, initialCycle, stateStream) + stateReader, stateWriter, err := newStateReaderWriter(batch, tx, block, writeChangesets, cfg.accumulator, cfg.blockReader, stateStream) if err != nil { return err } @@ -211,16 +210,14 @@ func newStateReaderWriter( writeChangesets bool, accumulator *shards.Accumulator, br services.FullBlockReader, - initialCycle bool, stateStream bool, ) (state.StateReader, state.WriterWithChangeSets, error) { - var stateReader state.StateReader var stateWriter state.WriterWithChangeSets stateReader = state.NewPlainStateReader(batch) - if !initialCycle && stateStream { + if stateStream { txs, err := br.RawTransactions(context.Background(), tx, block.NumberU64(), block.NumberU64()) if err != nil { return nil, nil, err @@ -389,16 +386,20 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, txc wrap.TxContainer, to logPrefix := s.LogPrefix() var to = prevStageProgress + if toBlock > 0 { to = cmp.Min(prevStageProgress, toBlock) } + if to <= s.BlockNumber { return nil } + if to > s.BlockNumber+16 { logger.Info(fmt.Sprintf("[%s] Blocks execution", logPrefix), "from", s.BlockNumber, "to", to) } - stateStream := !initialCycle && cfg.stateStream && to-s.BlockNumber < stateStreamLimit + + stateStream := cfg.stateStream && to-s.BlockNumber < stateStreamLimit // changes are stored through memory buffer logEvery := time.NewTicker(logInterval) @@ -468,7 +469,7 @@ Loop: if cfg.silkworm != nil && !isMemoryMutation { blockNum, err = silkworm.ExecuteBlocks(cfg.silkworm, txc.Tx, cfg.chainConfig.ChainID, blockNum, to, uint64(cfg.batchSize), writeChangeSets, writeReceipts, writeCallTraces) } else { - err = executeBlock(block, txc.Tx, batch, cfg, *cfg.vmConfig, writeChangeSets, writeReceipts, writeCallTraces, initialCycle, stateStream, logger) + err = executeBlock(block, txc.Tx, batch, cfg, *cfg.vmConfig, writeChangeSets, writeReceipts, writeCallTraces, stateStream, logger) } if err != nil { @@ -710,7 +711,7 @@ func unwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, c storageKeyLength := length.Addr + length.Incarnation + length.Hash var accumulator *shards.Accumulator - if !initialCycle && cfg.stateStream && s.BlockNumber-u.UnwindPoint < stateStreamLimit { + if cfg.stateStream && s.BlockNumber-u.UnwindPoint < stateStreamLimit { accumulator = cfg.accumulator hash, err := cfg.blockReader.CanonicalHash(ctx, txc.Tx, u.UnwindPoint) diff --git a/turbo/shards/state_change_accumulator.go b/turbo/shards/state_change_accumulator.go index 8ba19686e..cf0cc8c56 100644 --- a/turbo/shards/state_change_accumulator.go +++ b/turbo/shards/state_change_accumulator.go @@ -32,6 +32,7 @@ func (a *Accumulator) Reset(plainStateID uint64) { a.storageChangeIndex = nil a.plainStateID = plainStateID } + func (a *Accumulator) SendAndReset(ctx context.Context, c StateChangeConsumer, pendingBaseFee uint64, pendingBlobFee uint64, blockGasLimit uint64, finalizedBlock uint64) { if a == nil || c == nil || len(a.changes) == 0 { return diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index a24367e83..e80b5d359 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -250,7 +250,8 @@ type BlockReader struct { func NewBlockReader(snapshots services.BlockSnapshots, borSnapshots services.BlockSnapshots) *BlockReader { borSn, _ := borSnapshots.(*BorRoSnapshots) - return &BlockReader{sn: snapshots.(*RoSnapshots), borSn: borSn} + sn, _ := snapshots.(*RoSnapshots) + return &BlockReader{sn: sn, borSn: borSn} } func (r *BlockReader) CanPruneTo(currentBlockInDB uint64) uint64 { @@ -485,7 +486,7 @@ func (r *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Getter, hash c } func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64, forceCanonical bool) (block *types.Block, senders []common.Address, err error) { maxBlockNumInFiles := r.sn.BlocksAvailable() - if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles { + if tx != nil && (maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles) { if forceCanonical { canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight) if err != nil { @@ -503,6 +504,10 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c return block, senders, nil } + if r.sn == nil { + return + } + view := r.sn.View() defer view.Close() seg, ok := view.HeadersSegment(blockHeight) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index d67457acb..473af9652 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -406,7 +406,13 @@ func (s *RoSnapshots) IndicesMax() uint64 { return s.idxMax.Load() } func (s *RoSnapshots) SegmentsMax() uint64 { return s.segmentsMax.Load() } func (s *RoSnapshots) SegmentsMin() uint64 { return s.segmentsMin.Load() } func (s *RoSnapshots) SetSegmentsMin(min uint64) { s.segmentsMin.Store(min) } -func (s *RoSnapshots) BlocksAvailable() uint64 { return cmp.Min(s.segmentsMax.Load(), s.idxMax.Load()) } +func (s *RoSnapshots) BlocksAvailable() uint64 { + if s == nil { + return 0 + } + + return cmp.Min(s.segmentsMax.Load(), s.idxMax.Load()) +} func (s *RoSnapshots) LogStat(label string) { var m runtime.MemStats dbg.ReadMemStats(&m) diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index 14c839777..6feee7c59 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -311,7 +311,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK shanghaiTime := mock.ChainConfig.ShanghaiTime cancunTime := mock.ChainConfig.CancunTime maxBlobsPerBlock := mock.ChainConfig.GetMaxBlobsPerBlock() - mock.TxPool, err = txpool.New(newTxs, mock.DB, poolCfg, kvcache.NewDummy(), *chainID, shanghaiTime, nil /* agraBlock */, cancunTime, maxBlobsPerBlock, logger) + mock.TxPool, err = txpool.New(newTxs, mock.DB, poolCfg, kvcache.NewDummy(), *chainID, shanghaiTime, nil /* agraBlock */, cancunTime, maxBlobsPerBlock, nil, logger) if err != nil { tb.Fatal(err) } @@ -329,7 +329,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK mock.TxPoolFetch.ConnectSentries() mock.StreamWg.Wait() - go txpool.MainLoop(mock.Ctx, mock.txPoolDB, mock.DB, mock.TxPool, newTxs, mock.TxPoolSend, mock.TxPoolGrpcServer.NewSlotsStreams, func() {}) + go txpool.MainLoop(mock.Ctx, mock.txPoolDB, mock.TxPool, newTxs, mock.TxPoolSend, mock.TxPoolGrpcServer.NewSlotsStreams, func() {}) } // Committed genesis will be shared between download and mock sentry