diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index e964e323e..69a8238f4 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -134,7 +134,7 @@ func NewTestSimulatedBackendWithConfig(t *testing.T, alloc core.GenesisAlloc, co func (b *SimulatedBackend) DB() kv.RwDB { return b.m.DB } func (b *SimulatedBackend) Agg() *state2.AggregatorV3 { return b.m.HistoryV3Components() } func (b *SimulatedBackend) BlockReader() *snapshotsync.BlockReaderWithSnapshots { - return snapshotsync.NewBlockReaderWithSnapshots(b.m.BlockSnapshots) + return snapshotsync.NewBlockReaderWithSnapshots(b.m.BlockSnapshots, b.m.TransactionsV3) } func (b *SimulatedBackend) HistoryV3() bool { return b.m.HistoryV3 } func (b *SimulatedBackend) Engine() consensus.Engine { return b.m.Engine } diff --git a/cmd/erigon-el/backend/backend.go b/cmd/erigon-el/backend/backend.go index c2f2d3d56..3a8f837d7 100644 --- a/cmd/erigon-el/backend/backend.go +++ b/cmd/erigon-el/backend/backend.go @@ -231,6 +231,11 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) ( return err } + config.TransactionsV3, err = kvcfg.TransactionsV3.WriteOnce(tx, config.TransactionsV3) + if err != nil { + return err + } + // if we are in the incorrect syncmode then we change it to the appropriate one if !isCorrectSync { log.Warn("Incorrect snapshot enablement", "got", config.Sync.UseSnapshots, "change_to", useSnapshots) @@ -268,7 +273,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) ( allSnapshots *snapshotsync.RoSnapshots agg *libstate.AggregatorV3 ) - backend.blockReader, allSnapshots, agg, err = backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader) + backend.blockReader, allSnapshots, agg, err = backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader, config.TransactionsV3) if err != nil { return nil, err } @@ -477,7 +482,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) ( mining := stagedsync.New( stagedsync.MiningStages(backend.sentryCtx, stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miner, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, nil, tmpdir), - stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool2, backend.txPool2DB), + stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool2, backend.txPool2DB, allSnapshots, config.TransactionsV3), stagedsync.StageHashStateCfg(backend.chainDB, dirs, config.HistoryV3, backend.agg), stagedsync.StageTrieCfg(backend.chainDB, false, true, true, tmpdir, backend.blockReader, nil, config.HistoryV3, backend.agg), stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit), @@ -495,7 +500,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) ( proposingSync := stagedsync.New( stagedsync.MiningStages(backend.sentryCtx, stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miningStatePos, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, param, tmpdir), - stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool2, backend.txPool2DB), + stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool2, backend.txPool2DB, allSnapshots, config.TransactionsV3), stagedsync.StageHashStateCfg(backend.chainDB, dirs, config.HistoryV3, backend.agg), stagedsync.StageTrieCfg(backend.chainDB, false, true, true, tmpdir, backend.blockReader, nil, config.HistoryV3, backend.agg), stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit), @@ -612,7 +617,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) ( return nil, err } - backend.stagedSync, err = stages3.NewStagedSync(backend.sentryCtx, backend.chainDB, stack.Config().P2P, config, backend.sentriesClient, backend.notifications, backend.downloaderClient, allSnapshots, backend.agg, backend.forkValidator, backend.engine) + backend.stagedSync, err = stages3.NewStagedSync(backend.sentryCtx, backend.chainDB, stack.Config().P2P, config, backend.sentriesClient, backend.notifications, backend.downloaderClient, allSnapshots, backend.agg, backend.forkValidator, backend.engine, config.TransactionsV3) if err != nil { return nil, err } @@ -857,18 +862,13 @@ func (s *Ethereum) NodesInfo(limit int) (*remote.NodesInfoReply, error) { } // sets up blockReader and client downloader -func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snConfig ethconfig.Snapshot, downloaderCfg *downloadercfg.Cfg) (services.FullBlockReader, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) { - if !snConfig.Enabled { - blockReader := snapshotsync.NewBlockReader() - return blockReader, nil, nil, nil - } - +func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snConfig ethconfig.Snapshot, downloaderCfg *downloadercfg.Cfg, transactionsV3 bool) (services.FullBlockReader, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) { allSnapshots := snapshotsync.NewRoSnapshots(snConfig, dirs.Snap) var err error if !snConfig.NoDownloader { allSnapshots.OptimisticalyReopenWithDB(s.chainDB) } - blockReader := snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) + blockReader := snapshotsync.NewBlockReaderWithSnapshots(allSnapshots, transactionsV3) if !snConfig.NoDownloader { if snConfig.DownloaderAddr != "" { diff --git a/cmd/erigon-el/stages/stages.go b/cmd/erigon-el/stages/stages.go index 65c916a5a..80eab4198 100644 --- a/cmd/erigon-el/stages/stages.go +++ b/cmd/erigon-el/stages/stages.go @@ -15,7 +15,6 @@ import ( "github.com/ledgerwatch/erigon/ethdb/prune" "github.com/ledgerwatch/erigon/p2p" "github.com/ledgerwatch/erigon/turbo/engineapi" - "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/shards" "github.com/ledgerwatch/erigon/turbo/snapshotsync" ) @@ -42,14 +41,10 @@ func NewStagedSync(ctx context.Context, agg *state.AggregatorV3, forkValidator *engineapi.ForkValidator, engine consensus.Engine, + transactionsV3 bool, ) (*stagedsync.Sync, error) { dirs := cfg.Dirs - var blockReader services.FullBlockReader - if cfg.Snapshot.Enabled { - blockReader = snapshotsync.NewBlockReaderWithSnapshots(snapshots) - } else { - blockReader = snapshotsync.NewBlockReader() - } + blockReader := snapshotsync.NewBlockReaderWithSnapshots(snapshots, transactionsV3) blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, snapshots, db, snapDownloader, notifications.Events) // During Import we don't want other services like header requests, body requests etc. to be running. @@ -100,6 +95,7 @@ func NewStagedSync(ctx context.Context, snapshots, blockReader, cfg.HistoryV3, + cfg.TransactionsV3, ), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockRetire, controlServer.Hd), stagedsync.StageExecuteBlocksCfg( diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 3e11897d7..47c6e8966 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -585,7 +585,7 @@ func stageBodies(db kv.RwDB, ctx context.Context) error { sn, agg := allSnapshots(ctx, db) defer sn.Close() defer agg.Close() - chainConfig, historyV3 := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db) + chainConfig, historyV3, transactionsV3 := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db), kvcfg.TransactionsV3.FromDB(db) _, _, sync, _, _ := newSync(ctx, db, nil) if err := db.Update(ctx, func(tx kv.RwTx) error { @@ -597,7 +597,7 @@ func stageBodies(db kv.RwDB, ctx context.Context) error { } u := sync.NewUnwindState(stages.Bodies, s.BlockNumber-unwind, s.BlockNumber) - if err := stagedsync.UnwindBodiesStage(u, tx, stagedsync.StageBodiesCfg(db, nil, nil, nil, nil, 0, *chainConfig, sn, getBlockReader(db), historyV3), ctx); err != nil { + if err := stagedsync.UnwindBodiesStage(u, tx, stagedsync.StageBodiesCfg(db, nil, nil, nil, nil, 0, *chainConfig, sn, getBlockReader(db), historyV3, transactionsV3), ctx); err != nil { return err } @@ -1216,11 +1216,9 @@ var _blockReaderSingleton services.FullBlockReader func getBlockReader(db kv.RoDB) (blockReader services.FullBlockReader) { openBlockReaderOnce.Do(func() { - _blockReaderSingleton = snapshotsync.NewBlockReader() - if sn, _ := allSnapshots(context.Background(), db); sn.Cfg().Enabled { - x := snapshotsync.NewBlockReaderWithSnapshots(sn) - _blockReaderSingleton = x - } + sn, _ := allSnapshots(context.Background(), db) + transactionsV3 := kvcfg.TransactionsV3.FromDB(db) + _blockReaderSingleton = snapshotsync.NewBlockReaderWithSnapshots(sn, transactionsV3) }) return _blockReaderSingleton } @@ -1292,7 +1290,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) miningSync := stagedsync.New( stagedsync.MiningStages(ctx, stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, nil, dirs.Tmp), - stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, dirs.Tmp, nil, 0, nil, nil), + stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, dirs.Tmp, nil, 0, nil, nil, allSn, cfg.TransactionsV3), stagedsync.StageHashStateCfg(db, dirs, historyV3, agg), stagedsync.StageTrieCfg(db, false, true, false, dirs.Tmp, br, nil, historyV3, agg), stagedsync.StageMiningFinishCfg(db, *chainConfig, engine, miner, miningCancel), diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 48f96b2c2..33e6d347e 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -344,73 +344,68 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, } // Configure sapshots - if cfg.Snap.Enabled { - allSnapshots = snapshotsync.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap) - // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down - // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection - allSnapshots.OptimisticReopenWithDB(db) - allSnapshots.LogStat() + allSnapshots = snapshotsync.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap) + // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down + // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection + allSnapshots.OptimisticReopenWithDB(db) + allSnapshots.LogStat() - if agg, err = libstate.NewAggregatorV3(ctx, cfg.Dirs.SnapHistory, cfg.Dirs.Tmp, ethconfig.HistoryV3AggregationStep, db); err != nil { - return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err) - } - _ = agg.OpenFolder() - - db.View(context.Background(), func(tx kv.Tx) error { - agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 { - _, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax) - return histBlockNumProgress - }) - return nil - }) - onNewSnapshot = func() { - go func() { // don't block events processing by network communication - reply, err := remoteKvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true)) - if err != nil { - log.Warn("[snapshots] reopen", "err", err) - return - } - if err := allSnapshots.ReopenList(reply.BlocksFiles, true); err != nil { - log.Error("[snapshots] reopen", "err", err) - } else { - allSnapshots.LogStat() - } - - _ = reply.HistoryFiles - - if err = agg.OpenFolder(); err != nil { - log.Error("[snapshots] reopen", "err", err) - } else { - db.View(context.Background(), func(tx kv.Tx) error { - agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 { - _, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax) - return histBlockNumProgress - }) - return nil - }) - } - }() - } - onNewSnapshot() - blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) - - var histV3Enabled bool - _ = db.View(ctx, func(tx kv.Tx) error { - histV3Enabled, _ = kvcfg.HistoryV3.Enabled(tx) - return nil - }) - if histV3Enabled { - log.Info("HistoryV3", "enable", histV3Enabled) - db, err = temporal.New(rwKv, agg, accounts.ConvertV3toV2, historyv2read.RestoreCodeHash, accounts.DecodeIncarnationFromStorage, systemcontracts.SystemContractCodeLookup[cc.ChainName]) - if err != nil { - return nil, nil, nil, nil, nil, nil, nil, nil, nil, err - } - } - stateCache = kvcache.NewDummy() - } else { - blockReader = snapshotsync.NewBlockReader() - stateCache = kvcache.NewDummy() + if agg, err = libstate.NewAggregatorV3(ctx, cfg.Dirs.SnapHistory, cfg.Dirs.Tmp, ethconfig.HistoryV3AggregationStep, db); err != nil { + return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err) } + _ = agg.OpenFolder() + + db.View(context.Background(), func(tx kv.Tx) error { + agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 { + _, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax) + return histBlockNumProgress + }) + return nil + }) + onNewSnapshot = func() { + go func() { // don't block events processing by network communication + reply, err := remoteKvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true)) + if err != nil { + log.Warn("[snapshots] reopen", "err", err) + return + } + if err := allSnapshots.ReopenList(reply.BlocksFiles, true); err != nil { + log.Error("[snapshots] reopen", "err", err) + } else { + allSnapshots.LogStat() + } + + _ = reply.HistoryFiles + + if err = agg.OpenFolder(); err != nil { + log.Error("[snapshots] reopen", "err", err) + } else { + db.View(context.Background(), func(tx kv.Tx) error { + agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 { + _, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax) + return histBlockNumProgress + }) + return nil + }) + } + }() + } + onNewSnapshot() + blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots, ethconfig.Defaults.TransactionsV3) + + var histV3Enabled bool + _ = db.View(ctx, func(tx kv.Tx) error { + histV3Enabled, _ = kvcfg.HistoryV3.Enabled(tx) + return nil + }) + if histV3Enabled { + log.Info("HistoryV3", "enable", histV3Enabled) + db, err = temporal.New(rwKv, agg, accounts.ConvertV3toV2, historyv2read.RestoreCodeHash, accounts.DecodeIncarnationFromStorage, systemcontracts.SystemContractCodeLookup[cc.ChainName]) + if err != nil { + return nil, nil, nil, nil, nil, nil, nil, nil, nil, err + } + } + stateCache = kvcache.NewDummy() } // If DB can't be configured - used PrivateApiAddr as remote DB if db == nil { diff --git a/cmd/rpcdaemon/commands/call_traces_test.go b/cmd/rpcdaemon/commands/call_traces_test.go index 28964b139..730410db4 100644 --- a/cmd/rpcdaemon/commands/call_traces_test.go +++ b/cmd/rpcdaemon/commands/call_traces_test.go @@ -54,7 +54,7 @@ func TestCallTraceOneByOne(t *testing.T) { } agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) api := NewTraceAPI( NewBaseApi(nil, kvcache.New(kvcache.DefaultCoherentConfig), br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, &httpcfg.HttpCfg{}) @@ -103,7 +103,7 @@ func TestCallTraceUnwind(t *testing.T) { } agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) api := NewTraceAPI(NewBaseApi(nil, kvcache.New(kvcache.DefaultCoherentConfig), br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, &httpcfg.HttpCfg{}) if err = m.InsertChain(chainA); err != nil { t.Fatalf("inserting chainA: %v", err) @@ -165,7 +165,7 @@ func TestFilterNoAddresses(t *testing.T) { t.Fatalf("generate chain: %v", err) } agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) api := NewTraceAPI(NewBaseApi(nil, kvcache.New(kvcache.DefaultCoherentConfig), br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, &httpcfg.HttpCfg{}) // Insert blocks 1 by 1, to tirgget possible "off by one" errors for i := 0; i < chain.Length(); i++ { @@ -191,7 +191,7 @@ func TestFilterNoAddresses(t *testing.T) { func TestFilterAddressIntersection(t *testing.T) { m := stages.Mock(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) api := NewTraceAPI(NewBaseApi(nil, kvcache.New(kvcache.DefaultCoherentConfig), br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, &httpcfg.HttpCfg{}) toAddress1, toAddress2, other := common.Address{1}, common.Address{2}, common.Address{3} diff --git a/cmd/rpcdaemon/commands/corner_cases_support_test.go b/cmd/rpcdaemon/commands/corner_cases_support_test.go index df03b071e..52b3f0872 100644 --- a/cmd/rpcdaemon/commands/corner_cases_support_test.go +++ b/cmd/rpcdaemon/commands/corner_cases_support_test.go @@ -20,7 +20,7 @@ func TestNotFoundMustReturnNil(t *testing.T) { require := require.New(t) m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI( NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), diff --git a/cmd/rpcdaemon/commands/debug_api_test.go b/cmd/rpcdaemon/commands/debug_api_test.go index e7a96fee3..4d72128b3 100644 --- a/cmd/rpcdaemon/commands/debug_api_test.go +++ b/cmd/rpcdaemon/commands/debug_api_test.go @@ -53,7 +53,7 @@ var debugTraceTransactionNoRefundTests = []struct { func TestTraceBlockByNumber(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) baseApi := NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine) ethApi := NewEthAPI(baseApi, m.DB, nil, nil, nil, 5000000, 100_000) @@ -102,7 +102,7 @@ func TestTraceBlockByNumber(t *testing.T) { func TestTraceBlockByHash(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) baseApi := NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine) ethApi := NewEthAPI(baseApi, m.DB, nil, nil, nil, 5000000, 100_000) @@ -138,7 +138,7 @@ func TestTraceBlockByHash(t *testing.T) { func TestTraceTransaction(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) base := NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine) api := NewPrivateDebugAPI(base, m.DB, 0) @@ -170,7 +170,7 @@ func TestTraceTransaction(t *testing.T) { func TestTraceTransactionNoRefund(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) agg := m.HistoryV3Components() api := NewPrivateDebugAPI( NewBaseApi(nil, kvcache.New(kvcache.DefaultCoherentConfig), br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), @@ -204,7 +204,7 @@ func TestTraceTransactionNoRefund(t *testing.T) { func TestStorageRangeAt(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) agg := m.HistoryV3Components() api := NewPrivateDebugAPI( NewBaseApi(nil, kvcache.New(kvcache.DefaultCoherentConfig), br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), @@ -301,7 +301,7 @@ func TestStorageRangeAt(t *testing.T) { func TestAccountRange(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) base := NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine) @@ -364,7 +364,7 @@ func TestAccountRange(t *testing.T) { func TestGetModifiedAccountsByNumber(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) base := NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine) @@ -468,7 +468,7 @@ func TestMapTxNum2BlockNum(t *testing.T) { func TestAccountAt(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) base := NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine) api := NewPrivateDebugAPI(base, m.DB, 0) diff --git a/cmd/rpcdaemon/commands/erigon_receipts_test.go b/cmd/rpcdaemon/commands/erigon_receipts_test.go index 38e2d54f0..09f13604b 100644 --- a/cmd/rpcdaemon/commands/erigon_receipts_test.go +++ b/cmd/rpcdaemon/commands/erigon_receipts_test.go @@ -30,7 +30,7 @@ import ( func TestGetLogs(t *testing.T) { assert := assert.New(t) m, _, _ := rpcdaemontest.CreateTestSentry(t) - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) agg := m.HistoryV3Components() baseApi := NewBaseApi(nil, kvcache.New(kvcache.DefaultCoherentConfig), br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine) { @@ -63,7 +63,7 @@ func TestGetLogs(t *testing.T) { func TestErigonGetLatestLogs(t *testing.T) { assert := assert.New(t) m, _, _ := rpcdaemontest.CreateTestSentry(t) - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) db := m.DB agg := m.HistoryV3Components() @@ -98,7 +98,7 @@ func TestErigonGetLatestLogs(t *testing.T) { func TestErigonGetLatestLogsIgnoreTopics(t *testing.T) { assert := assert.New(t) m, _, _ := rpcdaemontest.CreateTestSentry(t) - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) db := m.DB agg := m.HistoryV3Components() @@ -190,7 +190,7 @@ func TestGetBlockReceiptsByBlockHash(t *testing.T) { // Assemble the test environment m := mockWithGenerator(t, 4, generator) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewErigonAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil) diff --git a/cmd/rpcdaemon/commands/eth_api_test.go b/cmd/rpcdaemon/commands/eth_api_test.go index 252244ad9..733a28c42 100644 --- a/cmd/rpcdaemon/commands/eth_api_test.go +++ b/cmd/rpcdaemon/commands/eth_api_test.go @@ -25,7 +25,7 @@ func TestGetBalanceChangesInBlock(t *testing.T) { assert := assert.New(t) myBlockNum := rpc.BlockNumberOrHashWithNumber(0) m, _, _ := rpcdaemontest.CreateTestSentry(t) - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) db := m.DB agg := m.HistoryV3Components() @@ -50,7 +50,7 @@ func TestGetTransactionReceipt(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) db := m.DB agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), db, nil, nil, nil, 5000000, 100_000) // Call GetTransactionReceipt for transaction which is not in the database @@ -62,7 +62,7 @@ func TestGetTransactionReceipt(t *testing.T) { func TestGetTransactionReceiptUnprotected(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) // Call GetTransactionReceipt for un-protected transaction @@ -77,7 +77,7 @@ func TestGetStorageAt_ByBlockNumber_WithRequireCanonicalDefault(t *testing.T) { assert := assert.New(t) m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") @@ -94,7 +94,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault(t *testing.T) { assert := assert.New(t) m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") @@ -111,7 +111,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue(t *testing.T) { assert := assert.New(t) m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") @@ -127,7 +127,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue(t *testing.T) { func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault_BlockNotFoundError(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") @@ -151,7 +151,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault_BlockNotFoundError func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue_BlockNotFoundError(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") @@ -176,7 +176,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault_NonCanonicalBlock( assert := assert.New(t) m, _, orphanedChain := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") @@ -198,7 +198,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalDefault_NonCanonicalBlock( func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue_NonCanonicalBlock(t *testing.T) { m, _, orphanedChain := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) addr := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") @@ -217,7 +217,7 @@ func TestGetStorageAt_ByBlockHash_WithRequireCanonicalTrue_NonCanonicalBlock(t * func TestCall_ByBlockHash_WithRequireCanonicalDefault_NonCanonicalBlock(t *testing.T) { m, _, orphanedChain := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) from := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") @@ -243,7 +243,7 @@ func TestCall_ByBlockHash_WithRequireCanonicalDefault_NonCanonicalBlock(t *testi func TestCall_ByBlockHash_WithRequireCanonicalTrue_NonCanonicalBlock(t *testing.T) { m, _, orphanedChain := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) from := common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") diff --git a/cmd/rpcdaemon/commands/eth_block_test.go b/cmd/rpcdaemon/commands/eth_block_test.go index 4def4f4d2..7de1b60a2 100644 --- a/cmd/rpcdaemon/commands/eth_block_test.go +++ b/cmd/rpcdaemon/commands/eth_block_test.go @@ -26,7 +26,7 @@ import ( func TestGetBlockByNumberWithLatestTag(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) b, err := api.GetBlockByNumber(context.Background(), rpc.LatestBlockNumber, false) @@ -40,7 +40,7 @@ func TestGetBlockByNumberWithLatestTag(t *testing.T) { func TestGetBlockByNumberWithLatestTag_WithHeadHashInDb(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) ctx := context.Background() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) tx, err := m.DB.BeginRw(ctx) @@ -73,7 +73,7 @@ func TestGetBlockByNumberWithLatestTag_WithHeadHashInDb(t *testing.T) { func TestGetBlockByNumberWithPendingTag(t *testing.T) { m := stages.MockWithTxPool(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) @@ -104,7 +104,7 @@ func TestGetBlockByNumberWithPendingTag(t *testing.T) { func TestGetBlockByNumber_WithFinalizedTag_NoFinalizedBlockInDb(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) ctx := context.Background() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) @@ -116,7 +116,7 @@ func TestGetBlockByNumber_WithFinalizedTag_NoFinalizedBlockInDb(t *testing.T) { func TestGetBlockByNumber_WithFinalizedTag_WithFinalizedBlockInDb(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) ctx := context.Background() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) tx, err := m.DB.BeginRw(ctx) @@ -149,7 +149,7 @@ func TestGetBlockByNumber_WithFinalizedTag_WithFinalizedBlockInDb(t *testing.T) func TestGetBlockByNumber_WithSafeTag_NoSafeBlockInDb(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) ctx := context.Background() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) @@ -161,7 +161,7 @@ func TestGetBlockByNumber_WithSafeTag_NoSafeBlockInDb(t *testing.T) { func TestGetBlockByNumber_WithSafeTag_WithSafeBlockInDb(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) ctx := context.Background() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) tx, err := m.DB.BeginRw(ctx) @@ -194,7 +194,7 @@ func TestGetBlockByNumber_WithSafeTag_WithSafeBlockInDb(t *testing.T) { func TestGetBlockTransactionCountByHash(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) ctx := context.Background() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) @@ -230,7 +230,7 @@ func TestGetBlockTransactionCountByHash(t *testing.T) { func TestGetBlockTransactionCountByHash_ZeroTx(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) ctx := context.Background() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) @@ -266,7 +266,7 @@ func TestGetBlockTransactionCountByHash_ZeroTx(t *testing.T) { func TestGetBlockTransactionCountByNumber(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) ctx := context.Background() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) @@ -301,7 +301,7 @@ func TestGetBlockTransactionCountByNumber(t *testing.T) { func TestGetBlockTransactionCountByNumber_ZeroTx(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) ctx := context.Background() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) diff --git a/cmd/rpcdaemon/commands/eth_call_test.go b/cmd/rpcdaemon/commands/eth_call_test.go index 6bf9c25ba..46844c432 100644 --- a/cmd/rpcdaemon/commands/eth_call_test.go +++ b/cmd/rpcdaemon/commands/eth_call_test.go @@ -37,7 +37,7 @@ import ( func TestEstimateGas(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, stages.Mock(t)) mining := txpool.NewMiningClient(conn) @@ -56,7 +56,7 @@ func TestEstimateGas(t *testing.T) { func TestEthCallNonCanonical(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewEthAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, nil, nil, 5000000, 100_000) var from = libcommon.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") @@ -76,7 +76,7 @@ func TestEthCallToPrunedBlock(t *testing.T) { ethCallBlockNumber := rpc.BlockNumber(2) m, bankAddress, contractAddress := chainWithDeployedContract(t) - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) prune(t, m.DB, pruneTo) @@ -101,7 +101,7 @@ func TestGetBlockByTimestampLatestTime(t *testing.T) { ctx := context.Background() m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) tx, err := m.DB.BeginRo(ctx) if err != nil { t.Errorf("fail at beginning tx") @@ -139,7 +139,7 @@ func TestGetBlockByTimestampOldestTime(t *testing.T) { ctx := context.Background() m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) tx, err := m.DB.BeginRo(ctx) if err != nil { t.Errorf("failed at beginning tx") @@ -181,7 +181,7 @@ func TestGetBlockByTimeHigherThanLatestBlock(t *testing.T) { ctx := context.Background() m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) tx, err := m.DB.BeginRo(ctx) if err != nil { t.Errorf("fail at beginning tx") @@ -220,7 +220,7 @@ func TestGetBlockByTimeMiddle(t *testing.T) { ctx := context.Background() m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) tx, err := m.DB.BeginRo(ctx) if err != nil { t.Errorf("fail at beginning tx") @@ -272,7 +272,7 @@ func TestGetBlockByTimestamp(t *testing.T) { ctx := context.Background() m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) tx, err := m.DB.BeginRo(ctx) if err != nil { t.Errorf("fail at beginning tx") diff --git a/cmd/rpcdaemon/commands/eth_filters_test.go b/cmd/rpcdaemon/commands/eth_filters_test.go index af5ae9fbd..fd50a8274 100644 --- a/cmd/rpcdaemon/commands/eth_filters_test.go +++ b/cmd/rpcdaemon/commands/eth_filters_test.go @@ -26,7 +26,7 @@ func TestNewFilters(t *testing.T) { assert := assert.New(t) m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, stages.Mock(t)) mining := txpool.NewMiningClient(conn) diff --git a/cmd/rpcdaemon/commands/eth_mining_test.go b/cmd/rpcdaemon/commands/eth_mining_test.go index e70e6710c..1a782599d 100644 --- a/cmd/rpcdaemon/commands/eth_mining_test.go +++ b/cmd/rpcdaemon/commands/eth_mining_test.go @@ -20,12 +20,13 @@ import ( ) func TestPendingBlock(t *testing.T) { + m := stages.Mock(t) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, stages.Mock(t)) mining := txpool.NewMiningClient(conn) ff := rpchelper.New(ctx, nil, nil, mining, func() {}) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) engine := ethash.NewFaker() - api := NewEthAPI(NewBaseApi(ff, stateCache, snapshotsync.NewBlockReader(), nil, false, rpccfg.DefaultEvmCallTimeout, engine), nil, nil, nil, mining, 5000000, 100_000) + api := NewEthAPI(NewBaseApi(ff, stateCache, snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3), nil, false, rpccfg.DefaultEvmCallTimeout, engine), nil, nil, nil, mining, 5000000, 100_000) expect := uint64(12345) b, err := rlp.EncodeToBytes(types.NewBlockWithHeader(&types.Header{Number: big.NewInt(int64(expect))})) require.NoError(t, err) diff --git a/cmd/rpcdaemon/commands/eth_subscribe_test.go b/cmd/rpcdaemon/commands/eth_subscribe_test.go index 192317e55..17e726edf 100644 --- a/cmd/rpcdaemon/commands/eth_subscribe_test.go +++ b/cmd/rpcdaemon/commands/eth_subscribe_test.go @@ -25,7 +25,7 @@ func TestEthSubscribe(t *testing.T) { if m.HistoryV3 { t.Skip() } - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 7, func(i int, b *core.BlockGen) { b.SetCoinbase(libcommon.Address{1}) }, false /* intermediateHashes */) diff --git a/cmd/rpcdaemon/commands/eth_system_test.go b/cmd/rpcdaemon/commands/eth_system_test.go index 912cf652c..41f823a96 100644 --- a/cmd/rpcdaemon/commands/eth_system_test.go +++ b/cmd/rpcdaemon/commands/eth_system_test.go @@ -44,7 +44,7 @@ func TestGasPrice(t *testing.T) { m := createGasPriceTestKV(t, testCase.chainSize) defer m.DB.Close() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) - base := NewBaseApi(nil, stateCache, snapshotsync.NewBlockReader(), nil, false, rpccfg.DefaultEvmCallTimeout, m.Engine) + base := NewBaseApi(nil, stateCache, snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3), nil, false, rpccfg.DefaultEvmCallTimeout, m.Engine) eth := NewEthAPI(base, m.DB, nil, nil, nil, 5000000, 100_000) ctx := context.Background() diff --git a/cmd/rpcdaemon/commands/gen_traces_test.go b/cmd/rpcdaemon/commands/gen_traces_test.go index f11dd6ecf..40a127775 100644 --- a/cmd/rpcdaemon/commands/gen_traces_test.go +++ b/cmd/rpcdaemon/commands/gen_traces_test.go @@ -30,7 +30,7 @@ Testing tracing RPC API by generating patters of contracts invoking one another func TestGeneratedDebugApi(t *testing.T) { m := rpcdaemontest.CreateTestSentryForTraces(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) baseApi := NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine) api := NewPrivateDebugAPI(baseApi, m.DB, 0) @@ -118,7 +118,7 @@ func TestGeneratedDebugApi(t *testing.T) { func TestGeneratedTraceApi(t *testing.T) { m := rpcdaemontest.CreateTestSentryForTraces(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) baseApi := NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine) api := NewTraceAPI(baseApi, m.DB, &httpcfg.HttpCfg{}) @@ -277,7 +277,7 @@ func TestGeneratedTraceApi(t *testing.T) { func TestGeneratedTraceApiCollision(t *testing.T) { m := rpcdaemontest.CreateTestSentryForTracesCollision(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) baseApi := NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine) api := NewTraceAPI(baseApi, m.DB, &httpcfg.HttpCfg{}) diff --git a/cmd/rpcdaemon/commands/otterscan_contract_creator_test.go b/cmd/rpcdaemon/commands/otterscan_contract_creator_test.go index 0fad7b95a..34157f0b0 100644 --- a/cmd/rpcdaemon/commands/otterscan_contract_creator_test.go +++ b/cmd/rpcdaemon/commands/otterscan_contract_creator_test.go @@ -13,7 +13,7 @@ import ( func TestGetContractCreator(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) api := NewOtterscanAPI(NewBaseApi(nil, nil, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB) addr := libcommon.HexToAddress("0x537e697c7ab75a26f9ecf0ce810e3154dfcaaf44") diff --git a/cmd/rpcdaemon/commands/otterscan_search_backward_test.go b/cmd/rpcdaemon/commands/otterscan_search_backward_test.go index 55d90d0d6..477be4b8c 100644 --- a/cmd/rpcdaemon/commands/otterscan_search_backward_test.go +++ b/cmd/rpcdaemon/commands/otterscan_search_backward_test.go @@ -151,7 +151,7 @@ func TestBackwardBlockProviderWithMultipleChunksBlockNotFound(t *testing.T) { func TestSearchTransactionsBefore(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) api := NewOtterscanAPI(NewBaseApi(nil, nil, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB) addr := libcommon.HexToAddress("0x537e697c7ab75a26f9ecf0ce810e3154dfcaaf44") diff --git a/cmd/rpcdaemon/commands/otterscan_transaction_by_sender_and_nonce_test.go b/cmd/rpcdaemon/commands/otterscan_transaction_by_sender_and_nonce_test.go index afce55ea4..131bcbf0a 100644 --- a/cmd/rpcdaemon/commands/otterscan_transaction_by_sender_and_nonce_test.go +++ b/cmd/rpcdaemon/commands/otterscan_transaction_by_sender_and_nonce_test.go @@ -13,7 +13,7 @@ import ( func TestGetTransactionBySenderAndNonce(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) api := NewOtterscanAPI(NewBaseApi(nil, nil, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB) addr := common.HexToAddress("0x537e697c7ab75a26f9ecf0ce810e3154dfcaaf44") diff --git a/cmd/rpcdaemon/commands/send_transaction_test.go b/cmd/rpcdaemon/commands/send_transaction_test.go index 199f66f57..ad855fe01 100644 --- a/cmd/rpcdaemon/commands/send_transaction_test.go +++ b/cmd/rpcdaemon/commands/send_transaction_test.go @@ -73,7 +73,7 @@ func TestSendRawTransaction(t *testing.T) { txPool := txpool.NewTxpoolClient(conn) ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) api := commands.NewEthAPI(commands.NewBaseApi(ff, stateCache, br, nil, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, nil, txPool, nil, 5000000, 100_000) buf := bytes.NewBuffer(nil) diff --git a/cmd/rpcdaemon/commands/trace_adhoc_test.go b/cmd/rpcdaemon/commands/trace_adhoc_test.go index 3c2076398..92f4aba64 100644 --- a/cmd/rpcdaemon/commands/trace_adhoc_test.go +++ b/cmd/rpcdaemon/commands/trace_adhoc_test.go @@ -23,7 +23,7 @@ func TestEmptyQuery(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) api := NewTraceAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, &httpcfg.HttpCfg{}) // Call GetTransactionReceipt for transaction which is not in the database @@ -43,7 +43,7 @@ func TestCoinbaseBalance(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) api := NewTraceAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, &httpcfg.HttpCfg{}) // Call GetTransactionReceipt for transaction which is not in the database @@ -73,7 +73,7 @@ func TestReplayTransaction(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() stateCache := kvcache.New(kvcache.DefaultCoherentConfig) - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) api := NewTraceAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, &httpcfg.HttpCfg{}) var txnHash libcommon.Hash @@ -103,7 +103,7 @@ func TestReplayTransaction(t *testing.T) { func TestReplayBlockTransactions(t *testing.T) { m, _, _ := rpcdaemontest.CreateTestSentry(t) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) api := NewTraceAPI(NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, &httpcfg.HttpCfg{}) diff --git a/cmd/rpcdaemon/commands/txpool_api_test.go b/cmd/rpcdaemon/commands/txpool_api_test.go index 85fd79cf2..9a1b0dd7d 100644 --- a/cmd/rpcdaemon/commands/txpool_api_test.go +++ b/cmd/rpcdaemon/commands/txpool_api_test.go @@ -39,7 +39,7 @@ func TestTxPoolContent(t *testing.T) { txPool := txpool.NewTxpoolClient(conn) ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}) agg := m.HistoryV3Components() - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) api := NewTxPoolAPI(NewBaseApi(ff, kvcache.New(kvcache.DefaultCoherentConfig), br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, txPool) expectValue := uint64(1234) diff --git a/cmd/rpcdaemon/rpcdaemontest/test_util.go b/cmd/rpcdaemon/rpcdaemontest/test_util.go index 6cb1a5be2..9a5339224 100644 --- a/cmd/rpcdaemon/rpcdaemontest/test_util.go +++ b/cmd/rpcdaemon/rpcdaemontest/test_util.go @@ -294,7 +294,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g ethashApi := apis[1].Service.(*ethash.API) server := grpc.NewServer() - remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, false)) + remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3), nil, nil, nil, false)) txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer) txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi)) listener := bufconn.Listen(1024 * 1024) diff --git a/cmd/state/commands/check_change_sets.go b/cmd/state/commands/check_change_sets.go index fabe1c041..f64d30f18 100644 --- a/cmd/state/commands/check_change_sets.go +++ b/cmd/state/commands/check_change_sets.go @@ -26,13 +26,13 @@ import ( "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" - "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/snapshotsync" ) var ( - historyfile string - nocheck bool + historyfile string + nocheck bool + transactionsV3 bool ) func init() { @@ -41,6 +41,7 @@ func init() { withSnapshotBlocks(checkChangeSetsCmd) checkChangeSetsCmd.Flags().StringVar(&historyfile, "historyfile", "", "path to the file where the changesets and history are expected to be. If omitted, the same as /erion/chaindata") checkChangeSetsCmd.Flags().BoolVar(&nocheck, "nocheck", false, "set to turn off the changeset checking and only execute transaction (for performance testing)") + checkChangeSetsCmd.Flags().BoolVar(&transactionsV3, "experimental.transactions.v3", false, "(this flag is in testing stage) Not recommended yet: Can't change this flag after node creation. New DB table for transactions allows keeping multiple branches of block bodies in the DB simultaneously") rootCmd.AddCommand(checkChangeSetsCmd) } @@ -49,13 +50,13 @@ var checkChangeSetsCmd = &cobra.Command{ Short: "Re-executes historical transactions in read-only mode and checks that their outputs match the database ChangeSets", RunE: func(cmd *cobra.Command, args []string) error { logger := log.New() - return CheckChangeSets(genesis, logger, block, chaindata, historyfile, nocheck) + return CheckChangeSets(genesis, logger, block, chaindata, historyfile, nocheck, transactionsV3) }, } // CheckChangeSets re-executes historical transactions in read-only mode // and checks that their outputs match the database ChangeSets. -func CheckChangeSets(genesis *core.Genesis, logger log.Logger, blockNum uint64, chaindata string, historyfile string, nocheck bool) error { +func CheckChangeSets(genesis *core.Genesis, logger log.Logger, blockNum uint64, chaindata string, historyfile string, nocheck bool, transactionV3 bool) error { if len(historyfile) == 0 { historyfile = chaindata } @@ -74,19 +75,13 @@ func CheckChangeSets(genesis *core.Genesis, logger log.Logger, blockNum uint64, if err != nil { return err } - var blockReader services.FullBlockReader - var allSnapshots *snapshotsync.RoSnapshots - useSnapshots := ethconfig.UseSnapshotsByChainName(chainConfig.ChainName) && snapshotsCli - if useSnapshots { - allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadirCli, "snapshots")) - defer allSnapshots.Close() - if err := allSnapshots.ReopenFolder(); err != nil { - return fmt.Errorf("reopen snapshot segments: %w", err) - } - blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) - } else { - blockReader = snapshotsync.NewBlockReader() + allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadirCli, "snapshots")) + defer allSnapshots.Close() + if err := allSnapshots.ReopenFolder(); err != nil { + return fmt.Errorf("reopen snapshot segments: %w", err) } + blockReader := snapshotsync.NewBlockReaderWithSnapshots(allSnapshots, transactionV3) + chainDb := db defer chainDb.Close() historyDb := chainDb diff --git a/cmd/state/commands/erigon2.go b/cmd/state/commands/erigon2.go index a7dae1b7f..478c1663e 100644 --- a/cmd/state/commands/erigon2.go +++ b/cmd/state/commands/erigon2.go @@ -22,6 +22,7 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/dbg" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/kvcfg" "github.com/ledgerwatch/erigon-lib/kv/mdbx" kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/log/v3" @@ -29,7 +30,6 @@ import ( "github.com/ledgerwatch/erigon/eth/ethconsensusconfig" "github.com/ledgerwatch/erigon/turbo/logging" - "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus/misc" @@ -225,19 +225,13 @@ func Erigon2(genesis *core.Genesis, chainConfig *chain2.Config, logger log.Logge } }() - var blockReader services.FullBlockReader - var allSnapshots *snapshotsync.RoSnapshots - useSnapshots := ethconfig.UseSnapshotsByChainName(chainConfig.ChainName) && snapshotsCli - if useSnapshots { - allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadirCli, "snapshots")) - defer allSnapshots.Close() - if err := allSnapshots.ReopenWithDB(db); err != nil { - return fmt.Errorf("reopen snapshot segments: %w", err) - } - blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) - } else { - blockReader = snapshotsync.NewBlockReader() + allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadirCli, "snapshots")) + defer allSnapshots.Close() + if err := allSnapshots.ReopenWithDB(db); err != nil { + return fmt.Errorf("reopen snapshot segments: %w", err) } + transactionsV3 := kvcfg.TransactionsV3.FromDB(db) + blockReader := snapshotsync.NewBlockReaderWithSnapshots(allSnapshots, transactionsV3) engine := initConsensusEngine(chainConfig, allSnapshots) for !interrupt { diff --git a/cmd/state/commands/erigon4.go b/cmd/state/commands/erigon4.go index 2ef499a85..6cfdf26df 100644 --- a/cmd/state/commands/erigon4.go +++ b/cmd/state/commands/erigon4.go @@ -23,6 +23,7 @@ import ( "github.com/ledgerwatch/erigon-lib/common/datadir" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/kvcfg" kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" libstate "github.com/ledgerwatch/erigon-lib/state" @@ -199,7 +200,8 @@ func Erigon4(genesis *core.Genesis, chainConfig *chain2.Config, logger log.Logge if err := allSnapshots.ReopenFolder(); err != nil { return fmt.Errorf("reopen snapshot segments: %w", err) } - blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) + transactionsV3 := kvcfg.TransactionsV3.FromDB(db) + blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots, transactionsV3) engine := initConsensusEngine(chainConfig, allSnapshots) getHeader := func(hash libcommon.Hash, number uint64) *types.Header { diff --git a/cmd/state/commands/history22.go b/cmd/state/commands/history22.go index 3e98d0c5f..18fefc445 100644 --- a/cmd/state/commands/history22.go +++ b/cmd/state/commands/history22.go @@ -141,7 +141,7 @@ func History22(genesis *core.Genesis, logger log.Logger) error { if err := allSnapshots.ReopenWithDB(db); err != nil { return fmt.Errorf("reopen snapshot segments: %w", err) } - blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) + blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots, ethconfig.Defaults.TransactionsV3) readWrapper := state.NewHistoryReader23(h.MakeContext(), ri) for !interrupt { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index dd8bdcccb..423fe073f 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -610,6 +610,10 @@ var ( Name: "experimental.history.v3", Usage: "(also known as Erigon3) Not recommended yet: Can't change this flag after node creation. New DB and Snapshots format of history allows: parallel blocks execution, get state as of given transaction without executing whole block.", } + TransactionV3Flag = cli.BoolFlag{ + Name: "experimental.transactions.v3", + Usage: "(this flag is in testing stage) Not recommended yet: Can't change this flag after node creation. New DB table for transactions allows keeping multiple branches of block bodies in the DB simultaneously", + } CliqueSnapshotCheckpointIntervalFlag = cli.UintFlag{ Name: "clique.checkpoint", @@ -1497,6 +1501,7 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *nodecfg.Config, cfg *ethconfig.C cfg.Ethstats = ctx.String(EthStatsURLFlag.Name) cfg.P2PEnabled = len(nodeConfig.P2P.SentryAddr) == 0 cfg.HistoryV3 = ctx.Bool(HistoryV3Flag.Name) + cfg.TransactionsV3 = ctx.Bool(TransactionV3Flag.Name) if ctx.IsSet(NetworkIdFlag.Name) { cfg.NetworkID = ctx.Uint64(NetworkIdFlag.Name) } diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 657f60e77..ddae8c640 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -22,6 +22,7 @@ import ( "encoding/binary" "encoding/json" "fmt" + "github.com/ledgerwatch/erigon-lib/kv/kvcfg" "math" "math/big" "time" @@ -385,10 +386,17 @@ func ReadStorageBody(db kv.Getter, hash libcommon.Hash, number uint64) (types.Bo return *bodyForStorage, nil } -func CanonicalTxnByID(db kv.Getter, id uint64) (types.Transaction, error) { +func CanonicalTxnByID(db kv.Getter, id uint64, blockHash libcommon.Hash, transactionsV3 bool) (types.Transaction, error) { txIdKey := make([]byte, 8) binary.BigEndian.PutUint64(txIdKey, id) - v, err := db.GetOne(kv.EthTx, txIdKey) + var v []byte + var err error + if transactionsV3 { + key := append(txIdKey, blockHash.Bytes()...) + v, err = db.GetOne(kv.EthTxV3, key) + } else { + v, err = db.GetOne(kv.EthTx, txIdKey) + } if err != nil { return nil, err } @@ -456,7 +464,7 @@ func NonCanonicalTransactions(db kv.Getter, baseTxId uint64, amount uint32) ([]t return txs, nil } -func WriteTransactions(db kv.RwTx, txs []types.Transaction, baseTxId uint64) error { +func WriteTransactions(db kv.RwTx, txs []types.Transaction, baseTxId uint64, blockHash *libcommon.Hash) error { txId := baseTxId buf := bytes.NewBuffer(nil) for _, tx := range txs { @@ -470,21 +478,35 @@ func WriteTransactions(db kv.RwTx, txs []types.Transaction, baseTxId uint64) err } // If next Append returns KeyExists error - it means you need to open transaction in App code before calling this func. Batch is also fine. - if err := db.Append(kv.EthTx, txIdKey, common.CopyBytes(buf.Bytes())); err != nil { - return err + if blockHash != nil { + key := append(txIdKey, blockHash.Bytes()...) + if err := db.Append(kv.EthTxV3, key, common.CopyBytes(buf.Bytes())); err != nil { + return err + } + } else { + if err := db.Append(kv.EthTx, txIdKey, common.CopyBytes(buf.Bytes())); err != nil { + return err + } } } return nil } -func WriteRawTransactions(tx kv.RwTx, txs [][]byte, baseTxId uint64) error { +func WriteRawTransactions(tx kv.RwTx, txs [][]byte, baseTxId uint64, blockHash *common2.Hash) error { txId := baseTxId for _, txn := range txs { txIdKey := make([]byte, 8) binary.BigEndian.PutUint64(txIdKey, txId) // If next Append returns KeyExists error - it means you need to open transaction in App code before calling this func. Batch is also fine. - if err := tx.Append(kv.EthTx, txIdKey, txn); err != nil { - return fmt.Errorf("txId=%d, baseTxId=%d, %w", txId, baseTxId, err) + if blockHash != nil { + if err := tx.Append(kv.EthTx, txIdKey, txn); err != nil { + return fmt.Errorf("txId=%d, baseTxId=%d, %w", txId, baseTxId, err) + } + } else { + key := append(txIdKey, blockHash.Bytes()...) + if err := tx.Append(kv.EthTxV3, key, txn); err != nil { + return fmt.Errorf("txId=%d, baseTxId=%d, %w", txId, baseTxId, err) + } } txId++ } @@ -676,7 +698,7 @@ func WriteRawBody(db kv.RwTx, hash libcommon.Hash, number uint64, body *types.Ra } lastTxnID = baseTxnID + uint64(data.TxAmount) - 1 firstNonSystemTxnID := baseTxnID + 1 - if err = WriteRawTransactions(db, body.Transactions, firstNonSystemTxnID); err != nil { + if err = WriteRawTransactions(db, body.Transactions, firstNonSystemTxnID, &hash); err != nil { return false, 0, fmt.Errorf("WriteRawTransactions: %w", err) } return true, lastTxnID, nil @@ -698,7 +720,12 @@ func WriteBody(db kv.RwTx, hash libcommon.Hash, number uint64, body *types.Body) if err := WriteBodyForStorage(db, hash, number, &data); err != nil { return fmt.Errorf("failed to write body: %w", err) } - err = WriteTransactions(db, body.Transactions, baseTxId+1) + transactionV3, _ := kvcfg.TransactionsV3.Enabled(db.(kv.Tx)) + if transactionV3 { + err = WriteTransactions(db, body.Transactions, baseTxId+1, &hash) + } else { + err = WriteTransactions(db, body.Transactions, baseTxId+1, nil) + } if err != nil { return fmt.Errorf("failed to WriteTransactions: %w", err) } @@ -724,7 +751,7 @@ func deleteBody(db kv.Deleter, hash libcommon.Hash, number uint64) { } // MakeBodiesCanonical - move all txs of non-canonical blocks from NonCanonicalTxs table to EthTx table -func MakeBodiesCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker, cb func(blockNum, lastTxnNum uint64) error) error { +func MakeBodiesCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker, transactionsV3 bool, cb func(blockNum, lastTxnNum uint64) error) error { for blockNum := from; ; blockNum++ { h, err := ReadCanonicalHash(tx, blockNum) if err != nil { @@ -751,8 +778,15 @@ func MakeBodiesCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPrefix i := uint64(0) if err := tx.ForAmount(kv.NonCanonicalTxs, hexutility.EncodeTs(bodyForStorage.BaseTxId+1), bodyForStorage.TxAmount-2, func(k, v []byte) error { id := newBaseId + 1 + i - if err := tx.Put(kv.EthTx, hexutility.EncodeTs(id), v); err != nil { - return err + if transactionsV3 { + key := append(hexutility.EncodeTs(id), h.Bytes()...) + if err := tx.Put(kv.EthTxV3, key, v); err != nil { + return err + } + } else { + if err := tx.Put(kv.EthTx, hexutility.EncodeTs(id), v); err != nil { + return err + } } if err := tx.Delete(kv.NonCanonicalTxs, k); err != nil { return err diff --git a/eth/backend.go b/eth/backend.go index 23a903219..9ac659a70 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -263,6 +263,11 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere return err } + config.TransactionsV3, err = kvcfg.TransactionsV3.WriteOnce(tx, config.TransactionsV3) + if err != nil { + return err + } + // if we are in the incorrect syncmode then we change it to the appropriate one if !isCorrectSync { log.Warn("Incorrect snapshot enablement", "got", config.Sync.UseSnapshots, "change_to", useSnapshots) @@ -297,7 +302,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere Accumulator: shards.NewAccumulator(), }, } - blockReader, allSnapshots, agg, err := backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader, backend.notifications.Events) + blockReader, allSnapshots, agg, err := backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader, backend.notifications.Events, config.TransactionsV3) if err != nil { return nil, err } @@ -502,7 +507,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere mining := stagedsync.New( stagedsync.MiningStages(backend.sentryCtx, stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miner, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, nil, tmpdir), - stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool2, backend.txPool2DB), + stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool2, backend.txPool2DB, allSnapshots, config.TransactionsV3), stagedsync.StageHashStateCfg(backend.chainDB, dirs, config.HistoryV3, backend.agg), stagedsync.StageTrieCfg(backend.chainDB, false, true, true, tmpdir, blockReader, nil, config.HistoryV3, backend.agg), stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit), @@ -520,7 +525,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere proposingSync := stagedsync.New( stagedsync.MiningStages(backend.sentryCtx, stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miningStatePos, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, param, tmpdir), - stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool2, backend.txPool2DB), + stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool2, backend.txPool2DB, allSnapshots, config.TransactionsV3), stagedsync.StageHashStateCfg(backend.chainDB, dirs, config.HistoryV3, backend.agg), stagedsync.StageTrieCfg(backend.chainDB, false, true, true, tmpdir, blockReader, nil, config.HistoryV3, backend.agg), stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit), @@ -969,18 +974,13 @@ func (s *Ethereum) NodesInfo(limit int) (*remote.NodesInfoReply, error) { } // sets up blockReader and client downloader -func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snConfig ethconfig.Snapshot, downloaderCfg *downloadercfg.Cfg, notifications *shards.Events) (services.FullBlockReader, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) { - if !snConfig.Enabled { - blockReader := snapshotsync.NewBlockReader() - return blockReader, nil, nil, nil - } - +func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snConfig ethconfig.Snapshot, downloaderCfg *downloadercfg.Cfg, notifications *shards.Events, transactionsV3 bool) (services.FullBlockReader, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) { allSnapshots := snapshotsync.NewRoSnapshots(snConfig, dirs.Snap) var err error if !snConfig.NoDownloader { allSnapshots.OptimisticalyReopenWithDB(s.chainDB) } - blockReader := snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) + blockReader := snapshotsync.NewBlockReaderWithSnapshots(allSnapshots, transactionsV3) if !snConfig.NoDownloader { if snConfig.DownloaderAddr != "" { diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index b6896de11..b8a4c1e95 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -230,6 +230,9 @@ type Config struct { // gRPC Address to connect to Heimdall node HeimdallgRPCAddress string + // New DB table for storing transactions allows: keeping multiple branches of block bodies in the DB simultaneously + TransactionsV3 bool + // URL to connect to Heimdall node HeimdallURL string diff --git a/eth/ethconfig/erigon3_test_disable.go b/eth/ethconfig/erigon3_test_disable.go index dff3df5b0..4aef4e7fa 100644 --- a/eth/ethconfig/erigon3_test_disable.go +++ b/eth/ethconfig/erigon3_test_disable.go @@ -3,3 +3,4 @@ package ethconfig const EnableHistoryV3InTest = false +const EnableTransactionsV3InTest = false diff --git a/eth/stagedsync/stage_bodies.go b/eth/stagedsync/stage_bodies.go index ed5f188b6..b57a3eb7b 100644 --- a/eth/stagedsync/stage_bodies.go +++ b/eth/stagedsync/stage_bodies.go @@ -35,10 +35,11 @@ type BodiesCfg struct { snapshots *snapshotsync.RoSnapshots blockReader services.FullBlockReader historyV3 bool + transactionsV3 bool } -func StageBodiesCfg(db kv.RwDB, bd *bodydownload.BodyDownload, bodyReqSend func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool), penalise func(context.Context, []headerdownload.PenaltyItem), blockPropagator adapter.BlockPropagator, timeout int, chanConfig chain.Config, snapshots *snapshotsync.RoSnapshots, blockReader services.FullBlockReader, historyV3 bool) BodiesCfg { - return BodiesCfg{db: db, bd: bd, bodyReqSend: bodyReqSend, penalise: penalise, blockPropagator: blockPropagator, timeout: timeout, chanConfig: chanConfig, snapshots: snapshots, blockReader: blockReader, historyV3: historyV3} +func StageBodiesCfg(db kv.RwDB, bd *bodydownload.BodyDownload, bodyReqSend func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool), penalise func(context.Context, []headerdownload.PenaltyItem), blockPropagator adapter.BlockPropagator, timeout int, chanConfig chain.Config, snapshots *snapshotsync.RoSnapshots, blockReader services.FullBlockReader, historyV3 bool, transactionsV3 bool) BodiesCfg { + return BodiesCfg{db: db, bd: bd, bodyReqSend: bodyReqSend, penalise: penalise, blockPropagator: blockPropagator, timeout: timeout, chanConfig: chanConfig, snapshots: snapshots, blockReader: blockReader, historyV3: historyV3, transactionsV3: transactionsV3} } // BodiesForward progresses Bodies stage in the forward direction @@ -105,7 +106,7 @@ func BodiesForward( // Property of blockchain: same block in different forks will have different hashes. // Means - can mark all canonical blocks as non-canonical on unwind, and // do opposite here - without storing any meta-info. - if err := rawdb.MakeBodiesCanonical(tx, s.BlockNumber+1, ctx, logPrefix, logEvery, func(blockNum, lastTxnNum uint64) error { + if err := rawdb.MakeBodiesCanonical(tx, s.BlockNumber+1, ctx, logPrefix, logEvery, cfg.transactionsV3, func(blockNum, lastTxnNum uint64) error { if cfg.historyV3 { if err := rawdbv3.TxNums.Append(tx, blockNum, lastTxnNum); err != nil { return err diff --git a/eth/stagedsync/stage_bodies_test.go b/eth/stagedsync/stage_bodies_test.go index b93224ae4..603292a24 100644 --- a/eth/stagedsync/stage_bodies_test.go +++ b/eth/stagedsync/stage_bodies_test.go @@ -44,7 +44,7 @@ func TestBodiesUnwind(t *testing.T) { require.Equal(5*(3+2), int(n)) // from 0, 5 block with 3 txn in each } { - err = rawdb.MakeBodiesCanonical(tx, 5+1, ctx, "test", logEvery, nil) // block 5 already canonical, start from next one + err = rawdb.MakeBodiesCanonical(tx, 5+1, ctx, "test", logEvery, false, nil) // block 5 already canonical, start from next one require.NoError(err) n, err := tx.ReadSequence(kv.EthTx) require.NoError(err) @@ -69,7 +69,7 @@ func TestBodiesUnwind(t *testing.T) { require.NoError(err) require.Equal(5*(3+2), int(n)) // from 0, 5 block with 3 txn in each - err = rawdb.MakeBodiesCanonical(tx, 5+1, ctx, "test", logEvery, nil) // block 5 already canonical, start from next one + err = rawdb.MakeBodiesCanonical(tx, 5+1, ctx, "test", logEvery, false, nil) // block 5 already canonical, start from next one require.NoError(err) n, err = tx.ReadSequence(kv.EthTx) require.NoError(err) diff --git a/eth/stagedsync/stage_interhashes_test.go b/eth/stagedsync/stage_interhashes_test.go index d91dcba78..4f272eb19 100644 --- a/eth/stagedsync/stage_interhashes_test.go +++ b/eth/stagedsync/stage_interhashes_test.go @@ -77,7 +77,7 @@ func TestAccountAndStorageTrie(t *testing.T) { // ---------------------------------------------------------------- historyV3 := false - blockReader := snapshotsync.NewBlockReader() + blockReader := snapshotsync.NewBlockReaderWithSnapshots(nil, false) cfg := StageTrieCfg(db, false, true, false, t.TempDir(), blockReader, nil, historyV3, nil) _, err := RegenerateIntermediateHashes("IH", tx, cfg, libcommon.Hash{} /* expectedRootHash */, ctx) assert.Nil(t, err) @@ -199,7 +199,7 @@ func TestAccountTrieAroundExtensionNode(t *testing.T) { hash6 := libcommon.HexToHash("0x3100000000000000000000000000000000000000000000000000000000000000") assert.Nil(t, tx.Put(kv.HashedAccounts, hash6[:], encoded)) - blockReader := snapshotsync.NewBlockReader() + blockReader := snapshotsync.NewBlockReaderWithSnapshots(nil, false) _, err := RegenerateIntermediateHashes("IH", tx, StageTrieCfg(db, false, true, false, t.TempDir(), blockReader, nil, historyV3, nil), libcommon.Hash{} /* expectedRootHash */, ctx) assert.Nil(t, err) @@ -262,7 +262,7 @@ func TestStorageDeletion(t *testing.T) { // Populate account & storage trie DB tables // ---------------------------------------------------------------- historyV3 := false - blockReader := snapshotsync.NewBlockReader() + blockReader := snapshotsync.NewBlockReaderWithSnapshots(nil, false) cfg := StageTrieCfg(db, false, true, false, t.TempDir(), blockReader, nil, historyV3, nil) _, err = RegenerateIntermediateHashes("IH", tx, cfg, libcommon.Hash{} /* expectedRootHash */, ctx) assert.Nil(t, err) @@ -381,7 +381,7 @@ func TestHiveTrieRoot(t *testing.T) { common.FromHex("02081bc16d674ec80000"))) historyV3 := false - blockReader := snapshotsync.NewBlockReader() + blockReader := snapshotsync.NewBlockReaderWithSnapshots(nil, false) cfg := StageTrieCfg(db, false, true, false, t.TempDir(), blockReader, nil, historyV3, nil) _, err := RegenerateIntermediateHashes("IH", tx, cfg, libcommon.Hash{} /* expectedRootHash */, ctx) require.Nil(t, err) diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index 467bd7ee2..98bacc2c1 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -66,6 +66,8 @@ func StageMiningExecCfg( payloadId uint64, txPool2 *txpool.TxPool, txPool2DB kv.RoDB, + snapshots *snapshotsync.RoSnapshots, + transactionsV3 bool, ) MiningExecCfg { return MiningExecCfg{ db: db, @@ -73,7 +75,7 @@ func StageMiningExecCfg( notifier: notifier, chainConfig: chainConfig, engine: engine, - blockReader: snapshotsync.NewBlockReader(), + blockReader: snapshotsync.NewBlockReaderWithSnapshots(snapshots, transactionsV3), vmConfig: vmConfig, tmpdir: tmpdir, interrupt: interrupt, diff --git a/migrations/txs_begin_end.go b/migrations/txs_begin_end.go index 5911040b0..5f27ed11d 100644 --- a/migrations/txs_begin_end.go +++ b/migrations/txs_begin_end.go @@ -239,7 +239,7 @@ func writeRawBodyDeprecated(db kv.RwTx, hash common2.Hash, number uint64, body * if err = rawdb.WriteBodyForStorage(db, hash, number, &data); err != nil { return fmt.Errorf("failed to write body: %w", err) } - if err = rawdb.WriteRawTransactions(db, body.Transactions, baseTxId); err != nil { + if err = rawdb.WriteRawTransactions(db, body.Transactions, baseTxId, &hash); err != nil { return fmt.Errorf("failed to WriteRawTransactions: %w, blockNum=%d", err, number) } return nil diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index 85fc619d1..687409b18 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -118,6 +118,7 @@ var DefaultFlags = []cli.Flag{ &utils.MetricsHTTPFlag, &utils.MetricsPortFlag, &utils.HistoryV3Flag, + &utils.TransactionV3Flag, &utils.IdentityFlag, &utils.CliqueSnapshotCheckpointIntervalFlag, &utils.CliqueSnapshotInmemorySnapshotsFlag, diff --git a/turbo/snapshotsync/block_reader.go b/turbo/snapshotsync/block_reader.go index b3b30315c..e49823a0c 100644 --- a/turbo/snapshotsync/block_reader.go +++ b/turbo/snapshotsync/block_reader.go @@ -18,104 +18,6 @@ import ( "github.com/ledgerwatch/erigon/rlp" ) -// BlockReader can read blocks from db and snapshots -type BlockReader struct { -} - -func NewBlockReader() *BlockReader { - return &BlockReader{} -} - -func (back *BlockReader) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (libcommon.Hash, error) { - return rawdb.ReadCanonicalHash(tx, blockHeight) -} - -func (back *BlockReader) Snapshots() *RoSnapshots { return nil } - -func (back *BlockReader) Header(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (*types.Header, error) { - h := rawdb.ReadHeader(tx, hash, blockHeight) - return h, nil -} - -func (back *BlockReader) Body(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (body *types.Body, txAmount uint32, err error) { - body, _, txAmount = rawdb.ReadBody(tx, hash, blockHeight) - return body, txAmount, nil -} - -func (back *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (body *types.Body, err error) { - return rawdb.ReadBodyWithTransactions(tx, hash, blockHeight) -} - -func (back *BlockReader) BodyRlp(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (bodyRlp rlp.RawValue, err error) { - body, _, err := back.Body(ctx, tx, hash, blockHeight) - if err != nil { - return nil, err - } - bodyRlp, err = rlp.EncodeToBytes(body) - if err != nil { - return nil, err - } - return bodyRlp, nil -} - -func (back *BlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (*types.Header, error) { - h := rawdb.ReadHeaderByNumber(tx, blockHeight) - return h, nil -} - -func (back *BlockReader) HeaderByHash(ctx context.Context, tx kv.Getter, hash libcommon.Hash) (*types.Header, error) { - return rawdb.ReadHeaderByHash(tx, hash) -} - -func (back *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (block *types.Block, senders []libcommon.Address, err error) { - canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight) - if err != nil { - return nil, nil, fmt.Errorf("requested non-canonical hash %x. canonical=%x", hash, canonicalHash) - } - if canonicalHash == hash { - block, senders, err = rawdb.ReadBlockWithSenders(tx, hash, blockHeight) - if err != nil { - return nil, nil, err - } - return block, senders, nil - } - - return rawdb.NonCanonicalBlockWithSenders(tx, hash, blockHeight) -} - -func (back *BlockReader) TxnLookup(ctx context.Context, tx kv.Getter, txnHash libcommon.Hash) (uint64, bool, error) { - n, err := rawdb.ReadTxLookupEntry(tx, txnHash) - if err != nil { - return 0, false, err - } - if n == nil { - return 0, false, nil - } - return *n, true, nil -} -func (back *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNum uint64, i int) (txn types.Transaction, err error) { - canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockNum) - if err != nil { - return nil, err - } - var k [8 + 32]byte - binary.BigEndian.PutUint64(k[:], blockNum) - copy(k[8:], canonicalHash[:]) - b, err := rawdb.ReadBodyForStorageByKey(tx, k[:]) - if err != nil { - return nil, err - } - if b == nil { - return nil, nil - } - - txn, err = rawdb.CanonicalTxnByID(tx, b.BaseTxId+1+uint64(i)) - if err != nil { - return nil, err - } - return txn, nil -} - type RemoteBlockReader struct { client remote.ETHBACKENDClient } @@ -252,11 +154,12 @@ func (back *RemoteBlockReader) BodyRlp(ctx context.Context, tx kv.Getter, hash l // BlockReaderWithSnapshots can read blocks from db and snapshots type BlockReaderWithSnapshots struct { - sn *RoSnapshots + sn *RoSnapshots + TransactionsV3 bool } -func NewBlockReaderWithSnapshots(snapshots *RoSnapshots) *BlockReaderWithSnapshots { - return &BlockReaderWithSnapshots{sn: snapshots} +func NewBlockReaderWithSnapshots(snapshots *RoSnapshots, transactionsV3 bool) *BlockReaderWithSnapshots { + return &BlockReaderWithSnapshots{sn: snapshots, TransactionsV3: transactionsV3} } func (back *BlockReaderWithSnapshots) Snapshots() *RoSnapshots { return back.sn } @@ -763,7 +666,7 @@ func (back *BlockReaderWithSnapshots) TxnByIdxInBlock(ctx context.Context, tx kv return nil, nil } - txn, err = rawdb.CanonicalTxnByID(tx, b.BaseTxId+1+uint64(i)) + txn, err = rawdb.CanonicalTxnByID(tx, b.BaseTxId+1+uint64(i), canonicalHash, back.TransactionsV3) if err != nil { return nil, err } diff --git a/turbo/stages/blockchain_test.go b/turbo/stages/blockchain_test.go index 8db28c526..e349fd9f4 100644 --- a/turbo/stages/blockchain_test.go +++ b/turbo/stages/blockchain_test.go @@ -490,7 +490,7 @@ func TestChainTxReorgs(t *testing.T) { t.Errorf("drop %d: receipt %v found while shouldn't have been", i, rcpt) } } - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) // added tx txs = types.Transactions{pastAdd, freshAdd, futureAdd} @@ -793,7 +793,7 @@ func doModesTest(t *testing.T, pm prune.Mode) error { require.Equal(uint64(0), found.Minimum()) } - br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots) + br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) if pm.TxIndex.Enabled() { b, err := rawdb.ReadBlockByNumber(tx, 1) diff --git a/turbo/stages/headerdownload/header_algo_test.go b/turbo/stages/headerdownload/header_algo_test.go index 6dfcc5306..8b168bbd7 100644 --- a/turbo/stages/headerdownload/header_algo_test.go +++ b/turbo/stages/headerdownload/header_algo_test.go @@ -1,4 +1,4 @@ -package headerdownload +package headerdownload_test import ( "context" @@ -13,9 +13,12 @@ import ( "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/snapshotsync" + "github.com/ledgerwatch/erigon/turbo/stages" + "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" ) func TestInserter1(t *testing.T) { + m := stages.Mock(t) funds := big.NewInt(1000000000) key, _ := crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") address := crypto.PubkeyToAddress(key.PublicKey) @@ -37,7 +40,7 @@ func TestInserter1(t *testing.T) { t.Fatal(err) } defer tx.Rollback() - hi := NewHeaderInserter("headers", big.NewInt(0), 0, snapshotsync.NewBlockReader()) + hi := headerdownload.NewHeaderInserter("headers", big.NewInt(0), 0, snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3)) h1 := types.Header{ Number: big.NewInt(1), Difficulty: big.NewInt(10), @@ -51,11 +54,11 @@ func TestInserter1(t *testing.T) { } h2Hash := h2.Hash() data1, _ := rlp.EncodeToBytes(&h1) - if _, err = hi.FeedHeaderPoW(tx, snapshotsync.NewBlockReader(), &h1, data1, h1Hash, 1); err != nil { + if _, err = hi.FeedHeaderPoW(tx, snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3), &h1, data1, h1Hash, 1); err != nil { t.Errorf("feed empty header 1: %v", err) } data2, _ := rlp.EncodeToBytes(&h2) - if _, err = hi.FeedHeaderPoW(tx, snapshotsync.NewBlockReader(), &h2, data2, h2Hash, 2); err != nil { + if _, err = hi.FeedHeaderPoW(tx, snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3), &h2, data2, h2Hash, 2); err != nil { t.Errorf("feed empty header 2: %v", err) } } diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index cb3fbffeb..8550c8c55 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -97,6 +97,7 @@ type MockSentry struct { txPoolDB kv.RwDB HistoryV3 bool + TransactionsV3 bool agg *libstate.AggregatorV3 BlockSnapshots *snapshotsync.RoSnapshots } @@ -281,11 +282,12 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey PeerId: gointerfaces.ConvertHashToH512([64]byte{0x12, 0x34, 0x50}), // "12345" BlockSnapshots: snapshotsync.NewRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap), HistoryV3: cfg.HistoryV3, + TransactionsV3: cfg.TransactionsV3, } if t != nil { t.Cleanup(mock.Close) } - blockReader := snapshotsync.NewBlockReaderWithSnapshots(mock.BlockSnapshots) + blockReader := snapshotsync.NewBlockReaderWithSnapshots(mock.BlockSnapshots, mock.TransactionsV3) mock.Address = crypto.PubkeyToAddress(mock.Key.PublicKey) @@ -410,6 +412,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey mock.BlockSnapshots, blockReader, cfg.HistoryV3, + cfg.TransactionsV3, ), stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, false, dirs.Tmp, prune, blockRetire, nil), stagedsync.StageExecuteBlocksCfg( @@ -462,7 +465,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey mock.MiningSync = stagedsync.New( stagedsync.MiningStages(mock.Ctx, stagedsync.StageMiningCreateBlockCfg(mock.DB, miner, *mock.ChainConfig, mock.Engine, mock.TxPool, nil, nil, dirs.Tmp), - stagedsync.StageMiningExecCfg(mock.DB, miner, nil, *mock.ChainConfig, mock.Engine, &vm.Config{}, dirs.Tmp, nil, 0, mock.TxPool, nil), + stagedsync.StageMiningExecCfg(mock.DB, miner, nil, *mock.ChainConfig, mock.Engine, &vm.Config{}, dirs.Tmp, nil, 0, mock.TxPool, nil, mock.BlockSnapshots, cfg.TransactionsV3), stagedsync.StageHashStateCfg(mock.DB, dirs, cfg.HistoryV3, mock.agg), stagedsync.StageTrieCfg(mock.DB, false, true, false, dirs.Tmp, blockReader, nil, cfg.HistoryV3, mock.agg), stagedsync.StageMiningFinishCfg(mock.DB, *mock.ChainConfig, mock.Engine, miner, miningCancel), diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index e6145d5f5..a21ba94b7 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -31,7 +31,6 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/p2p" "github.com/ledgerwatch/erigon/turbo/engineapi" - "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/shards" "github.com/ledgerwatch/erigon/turbo/snapshotsync" "github.com/ledgerwatch/erigon/turbo/stages/bodydownload" @@ -358,12 +357,7 @@ func NewDefaultStages(ctx context.Context, engine consensus.Engine, ) []*stagedsync.Stage { dirs := cfg.Dirs - var blockReader services.FullBlockReader - if cfg.Snapshot.Enabled { - blockReader = snapshotsync.NewBlockReaderWithSnapshots(snapshots) - } else { - blockReader = snapshotsync.NewBlockReader() - } + blockReader := snapshotsync.NewBlockReaderWithSnapshots(snapshots, cfg.TransactionsV3) blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, snapshots, db, snapDownloader, notifications.Events) // During Import we don't want other services like header requests, body requests etc. to be running. @@ -411,6 +405,7 @@ func NewDefaultStages(ctx context.Context, snapshots, blockReader, cfg.HistoryV3, + cfg.TransactionsV3, ), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockRetire, controlServer.Hd), stagedsync.StageExecuteBlocksCfg( @@ -443,12 +438,7 @@ func NewDefaultStages(ctx context.Context, } func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config, controlServer *sentry.MultiClient, dirs datadir.Dirs, notifications *shards.Notifications, snapshots *snapshotsync.RoSnapshots, agg *state.AggregatorV3) (*stagedsync.Sync, error) { - var blockReader services.FullBlockReader - if cfg.Snapshot.Enabled { - blockReader = snapshotsync.NewBlockReaderWithSnapshots(snapshots) - } else { - blockReader = snapshotsync.NewBlockReader() - } + blockReader := snapshotsync.NewBlockReaderWithSnapshots(snapshots, cfg.TransactionsV3) return stagedsync.New( stagedsync.StateStages(ctx, @@ -467,7 +457,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config dirs.Tmp, nil, nil, ), - stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, snapshots, blockReader, cfg.HistoryV3), + stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, snapshots, blockReader, cfg.HistoryV3, cfg.TransactionsV3), stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, true, dirs.Tmp, cfg.Prune, nil, controlServer.Hd), stagedsync.StageExecuteBlocksCfg(