Fix startup sync for txpool processing for bor block production (#9219)

When the sync loop first runs it suppresses block sync events both in
the initial loop and when the blocks being processed are greater than
1000.

This fix removed the first check, because otherwise the first block
received by the process ends up not getting sent to the tx pool. Which
means it won't produce new block for polygon.

As well as this fix - I have also moved the gas initialization to the
txpool start method rather than prompting it with a 'synthetic block
event'

As the txpool start has access to the core & tx DB's it can find the
current block and chain config internally so that it doesn't need to be
externally activated it can just do this itself on start up. This has
the advantage of making the txpool more self contained.
This commit is contained in:
Mark Holt 2024-01-13 10:33:34 +00:00 committed by GitHub
parent 2b0fd6d447
commit d8b91c4d02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 110 additions and 57 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/ledgerwatch/erigon-lib/types"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcdaemontest"
common2 "github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus/misc"
"github.com/ledgerwatch/erigon/ethdb/privateapi"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
@ -163,7 +164,7 @@ func doTxpool(ctx context.Context, logger log.Logger) error {
newTxs := make(chan types.Announcements, 1024)
defer close(newTxs)
txPoolDB, txPool, fetch, send, txpoolGrpcServer, err := txpooluitl.AllComponents(ctx, cfg,
kvcache.New(cacheConfig), newTxs, coreDB, sentryClients, kvClient, logger)
kvcache.New(cacheConfig), newTxs, coreDB, sentryClients, kvClient, misc.Eip1559FeeCalculator, logger)
if err != nil {
return err
}
@ -178,7 +179,7 @@ func doTxpool(ctx context.Context, logger log.Logger) error {
}
notifyMiner := func() {}
txpool.MainLoop(ctx, txPoolDB, coreDB, txPool, newTxs, send, txpoolGrpcServer.NewSlotsStreams, notifyMiner)
txpool.MainLoop(ctx, txPoolDB, txPool, newTxs, send, txpoolGrpcServer.NewSlotsStreams, notifyMiner)
grpcServer.GracefulStop()
return nil

View File

@ -22,9 +22,12 @@ import (
"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/polygon/bor/borcfg"
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/params"
)
@ -56,6 +59,42 @@ func VerifyEip1559Header(config *chain.Config, parent, header *types.Header, ski
return nil
}
var Eip1559FeeCalculator eip1559Calculator
type eip1559Calculator struct{}
func (f eip1559Calculator) CurrentFees(chainConfig *chain.Config, db kv.Getter) (baseFee uint64, blobFee uint64, minBlobGasPrice uint64, err error) {
hash := rawdb.ReadHeadHeaderHash(db)
if hash == (libcommon.Hash{}) {
return 0, 0, 0, fmt.Errorf("can't get head header hash")
}
currentHeader, err := rawdb.ReadHeaderByHash(db, hash)
if err != nil {
return 0, 0, 0, err
}
if chainConfig != nil {
if currentHeader.BaseFee != nil {
baseFee = CalcBaseFee(chainConfig, currentHeader).Uint64()
}
if currentHeader.ExcessBlobGas != nil {
excessBlobGas := CalcExcessBlobGas(chainConfig, currentHeader)
b, err := GetBlobGasPrice(chainConfig, excessBlobGas)
if err == nil {
blobFee = b.Uint64()
}
}
}
minBlobGasPrice = chainConfig.GetMinBlobGasPrice()
return baseFee, blobFee, minBlobGasPrice, nil
}
// CalcBaseFee calculates the basefee of the header.
func CalcBaseFee(config *chain.Config, parent *types.Header) *big.Int {
// If the current block is the first EIP-1559 block, return the InitialBaseFee.

View File

@ -233,28 +233,28 @@ func (c *Config) GetBurntContract(num uint64) *common.Address {
}
func (c *Config) GetMinBlobGasPrice() uint64 {
if c.MinBlobGasPrice != nil {
if c != nil && c.MinBlobGasPrice != nil {
return *c.MinBlobGasPrice
}
return 1 // MIN_BLOB_GASPRICE (EIP-4844)
}
func (c *Config) GetMaxBlobGasPerBlock() uint64 {
if c.MaxBlobGasPerBlock != nil {
if c != nil && c.MaxBlobGasPerBlock != nil {
return *c.MaxBlobGasPerBlock
}
return 786432 // MAX_BLOB_GAS_PER_BLOCK (EIP-4844)
}
func (c *Config) GetTargetBlobGasPerBlock() uint64 {
if c.TargetBlobGasPerBlock != nil {
if c != nil && c.TargetBlobGasPerBlock != nil {
return *c.TargetBlobGasPerBlock
}
return 393216 // TARGET_BLOB_GAS_PER_BLOCK (EIP-4844)
}
func (c *Config) GetBlobGasPriceUpdateFraction() uint64 {
if c.BlobGasPriceUpdateFraction != nil {
if c != nil && c.BlobGasPriceUpdateFraction != nil {
return *c.BlobGasPriceUpdateFraction
}
return 3338477 // BLOB_GASPRICE_UPDATE_FRACTION (EIP-4844)

View File

@ -225,11 +225,17 @@ type TxPool struct {
cancunTime *uint64
isPostCancun atomic.Bool
maxBlobsPerBlock uint64
feeCalculator FeeCalculator
logger log.Logger
}
type FeeCalculator interface {
CurrentFees(chainConfig *chain.Config, db kv.Getter) (baseFee uint64, blobFee uint64, minBlobGasPrice uint64, err error)
}
func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, cache kvcache.Cache,
chainID uint256.Int, shanghaiTime, agraBlock, cancunTime *big.Int, maxBlobsPerBlock uint64, logger log.Logger,
chainID uint256.Int, shanghaiTime, agraBlock, cancunTime *big.Int, maxBlobsPerBlock uint64,
feeCalculator FeeCalculator, logger log.Logger,
) (*TxPool, error) {
localsHistory, err := simplelru.NewLRU[string, struct{}](10_000, nil)
if err != nil {
@ -275,6 +281,7 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config,
minedBlobTxsByBlock: map[uint64][]*metaTx{},
minedBlobTxsByHash: map[string]*metaTx{},
maxBlobsPerBlock: maxBlobsPerBlock,
feeCalculator: feeCalculator,
logger: logger,
}
@ -331,7 +338,6 @@ func (p *TxPool) Start(ctx context.Context, db kv.RwDB) error {
}
func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error {
defer newBlockTimer.ObserveDuration(time.Now())
//t := time.Now()
@ -1700,7 +1706,7 @@ const txMaxBroadcastSize = 4 * 1024
//
// promote/demote transactions
// reorgs
func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs chan types.Announcements, send *Send, newSlotsStreams *NewSlotsStreams, notifyMiningAboutNewSlots func()) {
func MainLoop(ctx context.Context, db kv.RwDB, p *TxPool, newTxs chan types.Announcements, send *Send, newSlotsStreams *NewSlotsStreams, notifyMiningAboutNewSlots func()) {
syncToNewPeersEvery := time.NewTicker(p.cfg.SyncToNewPeersEvery)
defer syncToNewPeersEvery.Stop()
processRemoteTxsEvery := time.NewTicker(p.cfg.ProcessRemoteTxsEvery)
@ -2072,8 +2078,15 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
i++
}
var pendingBaseFee uint64
{
var pendingBaseFee, pendingBlobFee, minBlobGasPrice uint64
if p.feeCalculator != nil {
if chainConfig, _ := ChainConfig(tx); chainConfig != nil {
pendingBaseFee, pendingBlobFee, minBlobGasPrice, _ = p.feeCalculator.CurrentFees(chainConfig, coreTx)
}
}
if pendingBaseFee == 0 {
v, err := tx.GetOne(kv.PoolInfo, PoolPendingBaseFeeKey)
if err != nil {
return err
@ -2082,8 +2095,8 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
pendingBaseFee = binary.BigEndian.Uint64(v)
}
}
var pendingBlobFee uint64 = 1 // MIN_BLOB_GAS_PRICE A/EIP-4844
{
if pendingBlobFee == 0 {
v, err := tx.GetOne(kv.PoolInfo, PoolPendingBlobFeeKey)
if err != nil {
return err
@ -2093,6 +2106,10 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
}
}
if pendingBlobFee == 0 {
pendingBlobFee = minBlobGasPrice
}
err = p.senders.registerNewSenders(&txs, p.logger)
if err != nil {
return err

View File

@ -314,7 +314,7 @@ func FuzzOnNewBlocks(f *testing.F) {
cfg := txpoolcfg.DefaultConfig
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New())
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New())
assert.NoError(err)
err = pool.Start(ctx, db)
@ -540,7 +540,7 @@ func FuzzOnNewBlocks(f *testing.F) {
check(p2pReceived, types.TxSlots{}, "after_flush")
checkNotify(p2pReceived, types.TxSlots{}, "after_flush")
p2, err := New(ch, coreDB, txpoolcfg.DefaultConfig, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New())
p2, err := New(ch, coreDB, txpoolcfg.DefaultConfig, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New())
assert.NoError(err)
p2.senders = pool.senders // senders are not persisted

View File

@ -53,7 +53,7 @@ func TestNonceFromAddress(t *testing.T) {
cfg := txpoolcfg.DefaultConfig
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New())
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New())
assert.NoError(err)
require.True(pool != nil)
ctx := context.Background()
@ -173,7 +173,7 @@ func TestReplaceWithHigherFee(t *testing.T) {
cfg := txpoolcfg.DefaultConfig
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New())
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New())
assert.NoError(err)
require.NotEqual(nil, pool)
ctx := context.Background()
@ -290,7 +290,7 @@ func TestReverseNonces(t *testing.T) {
cfg := txpoolcfg.DefaultConfig
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New())
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New())
assert.NoError(err)
require.True(pool != nil)
ctx := context.Background()
@ -417,7 +417,7 @@ func TestTxPoke(t *testing.T) {
cfg := txpoolcfg.DefaultConfig
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New())
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New())
assert.NoError(err)
require.True(pool != nil)
ctx := context.Background()
@ -682,7 +682,7 @@ func TestShanghaiValidateTx(t *testing.T) {
}
cache := &kvcache.DummyCache{}
pool, err := New(ch, coreDB, cfg, cache, *u256.N1, shanghaiTime, nil /* agraBlock */, nil /* cancunTime */, fixedgas.DefaultMaxBlobsPerBlock, logger)
pool, err := New(ch, coreDB, cfg, cache, *u256.N1, shanghaiTime, nil /* agraBlock */, nil /* cancunTime */, fixedgas.DefaultMaxBlobsPerBlock, nil, logger)
asrt.NoError(err)
ctx := context.Background()
tx, err := coreDB.BeginRw(ctx)
@ -728,7 +728,7 @@ func TestBlobTxReplacement(t *testing.T) {
db, coreDB := memdb.NewTestPoolDB(t), memdb.NewTestDB(t)
cfg := txpoolcfg.DefaultConfig
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, common.Big0, nil, common.Big0, fixedgas.DefaultMaxBlobsPerBlock, log.New())
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, common.Big0, nil, common.Big0, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New())
assert.NoError(err)
require.True(pool != nil)
ctx := context.Background()
@ -953,7 +953,7 @@ func TestDropRemoteAtNoGossip(t *testing.T) {
logger := log.New()
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
txPool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, big.NewInt(0), big.NewInt(0), nil, fixedgas.DefaultMaxBlobsPerBlock, logger)
txPool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, big.NewInt(0), big.NewInt(0), nil, fixedgas.DefaultMaxBlobsPerBlock, nil, logger)
assert.NoError(err)
require.True(txPool != nil)

View File

@ -101,7 +101,7 @@ func SaveChainConfigIfNeed(ctx context.Context, coreDB kv.RoDB, txPoolDB kv.RwDB
}
func AllComponents(ctx context.Context, cfg txpoolcfg.Config, cache kvcache.Cache, newTxs chan types.Announcements, chainDB kv.RoDB,
sentryClients []direct.SentryClient, stateChangesClient txpool.StateChangesClient, logger log.Logger) (kv.RwDB, *txpool.TxPool, *txpool.Fetch, *txpool.Send, *txpool.GrpcServer, error) {
sentryClients []direct.SentryClient, stateChangesClient txpool.StateChangesClient, feeCalculator txpool.FeeCalculator, logger log.Logger) (kv.RwDB, *txpool.TxPool, *txpool.Fetch, *txpool.Send, *txpool.GrpcServer, error) {
opts := mdbx.NewMDBX(logger).Label(kv.TxPoolDB).Path(cfg.DBDir).
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }).
WriteMergeThreshold(3 * 8192).
@ -144,7 +144,7 @@ func AllComponents(ctx context.Context, cfg txpoolcfg.Config, cache kvcache.Cach
cancunTime = cfg.OverrideCancunTime
}
txPool, err := txpool.New(newTxs, chainDB, cfg, cache, *chainID, shanghaiTime, agraBlock, cancunTime, maxBlobsPerBlock, logger)
txPool, err := txpool.New(newTxs, chainDB, cfg, cache, *chainID, shanghaiTime, agraBlock, cancunTime, maxBlobsPerBlock, feeCalculator, logger)
if err != nil {
return nil, nil, nil, nil, nil, err
}

View File

@ -604,7 +604,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
backend.newTxs = make(chan types2.Announcements, 1024)
//defer close(newTxs)
backend.txPoolDB, backend.txPool, backend.txPoolFetch, backend.txPoolSend, backend.txPoolGrpcServer, err = txpooluitl.AllComponents(
ctx, config.TxPool, kvcache.NewDummy(), backend.newTxs, backend.chainDB, backend.sentriesClient.Sentries(), stateDiffClient, logger,
ctx, config.TxPool, kvcache.NewDummy(), backend.newTxs, backend.chainDB, backend.sentriesClient.Sentries(), stateDiffClient, misc.Eip1559FeeCalculator, logger,
)
if err != nil {
return nil, err
@ -708,23 +708,6 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
// 1) Hive tests requires us to do so and starting it from eth_sendRawTransaction is not viable as we have not enough data
// to initialize it properly.
// 2) we cannot propose for block 1 regardless.
go func() {
time.Sleep(10 * time.Millisecond)
baseFee := uint64(0)
if currentBlock.BaseFee() != nil {
baseFee = misc.CalcBaseFee(chainConfig, currentBlock.Header()).Uint64()
}
blobFee := chainConfig.GetMinBlobGasPrice()
if currentBlock.Header().ExcessBlobGas != nil {
excessBlobGas := misc.CalcExcessBlobGas(chainConfig, currentBlock.Header())
b, err := misc.GetBlobGasPrice(chainConfig, excessBlobGas)
if err == nil {
blobFee = b.Uint64()
}
}
backend.notifications.Accumulator.StartChange(currentBlock.NumberU64(), currentBlock.Hash(), nil, false)
backend.notifications.Accumulator.SendAndReset(ctx, backend.notifications.StateChangesConsumer, baseFee, blobFee, currentBlock.GasLimit(), 0)
}()
if !config.DeprecatedTxPool.Disable {
backend.txPoolFetch.ConnectCore()
@ -734,8 +717,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
newTxsBroadcaster = casted.NewSlotsStreams
}
go txpool.MainLoop(backend.sentryCtx,
backend.txPoolDB, backend.chainDB,
backend.txPool, backend.newTxs, backend.txPoolSend, newTxsBroadcaster,
backend.txPoolDB, backend.txPool, backend.newTxs, backend.txPoolSend, newTxsBroadcaster,
func() {
select {
case backend.notifyMiningAboutNewTxs <- struct{}{}:
@ -743,6 +725,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
}
})
}
go func() {
defer debug.LogPanic()
for {

View File

@ -145,12 +145,11 @@ func executeBlock(
writeChangesets bool,
writeReceipts bool,
writeCallTraces bool,
initialCycle bool,
stateStream bool,
logger log.Logger,
) error {
blockNum := block.NumberU64()
stateReader, stateWriter, err := newStateReaderWriter(batch, tx, block, writeChangesets, cfg.accumulator, cfg.blockReader, initialCycle, stateStream)
stateReader, stateWriter, err := newStateReaderWriter(batch, tx, block, writeChangesets, cfg.accumulator, cfg.blockReader, stateStream)
if err != nil {
return err
}
@ -211,16 +210,14 @@ func newStateReaderWriter(
writeChangesets bool,
accumulator *shards.Accumulator,
br services.FullBlockReader,
initialCycle bool,
stateStream bool,
) (state.StateReader, state.WriterWithChangeSets, error) {
var stateReader state.StateReader
var stateWriter state.WriterWithChangeSets
stateReader = state.NewPlainStateReader(batch)
if !initialCycle && stateStream {
if stateStream {
txs, err := br.RawTransactions(context.Background(), tx, block.NumberU64(), block.NumberU64())
if err != nil {
return nil, nil, err
@ -389,16 +386,20 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, txc wrap.TxContainer, to
logPrefix := s.LogPrefix()
var to = prevStageProgress
if toBlock > 0 {
to = cmp.Min(prevStageProgress, toBlock)
}
if to <= s.BlockNumber {
return nil
}
if to > s.BlockNumber+16 {
logger.Info(fmt.Sprintf("[%s] Blocks execution", logPrefix), "from", s.BlockNumber, "to", to)
}
stateStream := !initialCycle && cfg.stateStream && to-s.BlockNumber < stateStreamLimit
stateStream := cfg.stateStream && to-s.BlockNumber < stateStreamLimit
// changes are stored through memory buffer
logEvery := time.NewTicker(logInterval)
@ -468,7 +469,7 @@ Loop:
if cfg.silkworm != nil && !isMemoryMutation {
blockNum, err = silkworm.ExecuteBlocks(cfg.silkworm, txc.Tx, cfg.chainConfig.ChainID, blockNum, to, uint64(cfg.batchSize), writeChangeSets, writeReceipts, writeCallTraces)
} else {
err = executeBlock(block, txc.Tx, batch, cfg, *cfg.vmConfig, writeChangeSets, writeReceipts, writeCallTraces, initialCycle, stateStream, logger)
err = executeBlock(block, txc.Tx, batch, cfg, *cfg.vmConfig, writeChangeSets, writeReceipts, writeCallTraces, stateStream, logger)
}
if err != nil {
@ -710,7 +711,7 @@ func unwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, c
storageKeyLength := length.Addr + length.Incarnation + length.Hash
var accumulator *shards.Accumulator
if !initialCycle && cfg.stateStream && s.BlockNumber-u.UnwindPoint < stateStreamLimit {
if cfg.stateStream && s.BlockNumber-u.UnwindPoint < stateStreamLimit {
accumulator = cfg.accumulator
hash, err := cfg.blockReader.CanonicalHash(ctx, txc.Tx, u.UnwindPoint)

View File

@ -32,6 +32,7 @@ func (a *Accumulator) Reset(plainStateID uint64) {
a.storageChangeIndex = nil
a.plainStateID = plainStateID
}
func (a *Accumulator) SendAndReset(ctx context.Context, c StateChangeConsumer, pendingBaseFee uint64, pendingBlobFee uint64, blockGasLimit uint64, finalizedBlock uint64) {
if a == nil || c == nil || len(a.changes) == 0 {
return

View File

@ -250,7 +250,8 @@ type BlockReader struct {
func NewBlockReader(snapshots services.BlockSnapshots, borSnapshots services.BlockSnapshots) *BlockReader {
borSn, _ := borSnapshots.(*BorRoSnapshots)
return &BlockReader{sn: snapshots.(*RoSnapshots), borSn: borSn}
sn, _ := snapshots.(*RoSnapshots)
return &BlockReader{sn: sn, borSn: borSn}
}
func (r *BlockReader) CanPruneTo(currentBlockInDB uint64) uint64 {
@ -485,7 +486,7 @@ func (r *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Getter, hash c
}
func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64, forceCanonical bool) (block *types.Block, senders []common.Address, err error) {
maxBlockNumInFiles := r.sn.BlocksAvailable()
if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles {
if tx != nil && (maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles) {
if forceCanonical {
canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight)
if err != nil {
@ -503,6 +504,10 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c
return block, senders, nil
}
if r.sn == nil {
return
}
view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)

View File

@ -406,7 +406,13 @@ func (s *RoSnapshots) IndicesMax() uint64 { return s.idxMax.Load() }
func (s *RoSnapshots) SegmentsMax() uint64 { return s.segmentsMax.Load() }
func (s *RoSnapshots) SegmentsMin() uint64 { return s.segmentsMin.Load() }
func (s *RoSnapshots) SetSegmentsMin(min uint64) { s.segmentsMin.Store(min) }
func (s *RoSnapshots) BlocksAvailable() uint64 { return cmp.Min(s.segmentsMax.Load(), s.idxMax.Load()) }
func (s *RoSnapshots) BlocksAvailable() uint64 {
if s == nil {
return 0
}
return cmp.Min(s.segmentsMax.Load(), s.idxMax.Load())
}
func (s *RoSnapshots) LogStat(label string) {
var m runtime.MemStats
dbg.ReadMemStats(&m)

View File

@ -311,7 +311,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
shanghaiTime := mock.ChainConfig.ShanghaiTime
cancunTime := mock.ChainConfig.CancunTime
maxBlobsPerBlock := mock.ChainConfig.GetMaxBlobsPerBlock()
mock.TxPool, err = txpool.New(newTxs, mock.DB, poolCfg, kvcache.NewDummy(), *chainID, shanghaiTime, nil /* agraBlock */, cancunTime, maxBlobsPerBlock, logger)
mock.TxPool, err = txpool.New(newTxs, mock.DB, poolCfg, kvcache.NewDummy(), *chainID, shanghaiTime, nil /* agraBlock */, cancunTime, maxBlobsPerBlock, nil, logger)
if err != nil {
tb.Fatal(err)
}
@ -329,7 +329,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
mock.TxPoolFetch.ConnectSentries()
mock.StreamWg.Wait()
go txpool.MainLoop(mock.Ctx, mock.txPoolDB, mock.DB, mock.TxPool, newTxs, mock.TxPoolSend, mock.TxPoolGrpcServer.NewSlotsStreams, func() {})
go txpool.MainLoop(mock.Ctx, mock.txPoolDB, mock.TxPool, newTxs, mock.TxPoolSend, mock.TxPoolGrpcServer.NewSlotsStreams, func() {})
}
// Committed genesis will be shared between download and mock sentry