From 63afe656863d445fb3918b5ca1fafba2de23de10 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Tue, 23 May 2023 14:49:17 +0700 Subject: [PATCH] Use BlockReader in ForkValidator, CliqueAPI (#7562) --- cmd/erigon-el/backend/backend.go | 219 ++++++++++++----------- cmd/rpcdaemon/commands/daemon.go | 2 +- cmd/rpcdaemon/rpcservices/eth_backend.go | 26 +++ consensus/chain_reader.go | 25 ++- consensus/clique/api.go | 18 +- consensus/clique/clique.go | 5 +- consensus/clique/clique_test.go | 16 +- core/genesis_test.go | 6 +- core/rlp_test.go | 2 +- core/state/temporal/kv_temporal.go | 6 +- eth/backend.go | 205 +++++++++++---------- eth/ethconfig/erigon3_test_disable.go | 2 +- eth/ethconfig/erigon3_test_enable.go | 2 +- eth/ethconfig/erigon4_test_enable.go | 2 +- turbo/engineapi/fork_validator.go | 12 +- turbo/services/interfaces.go | 22 ++- turbo/snapshotsync/block_reader.go | 69 +++++++ turbo/stages/mock_sentry.go | 6 +- 18 files changed, 396 insertions(+), 249 deletions(-) diff --git a/cmd/erigon-el/backend/backend.go b/cmd/erigon-el/backend/backend.go index 4198e7618..0b7816f65 100644 --- a/cmd/erigon-el/backend/backend.go +++ b/cmd/erigon-el/backend/backend.go @@ -154,8 +154,9 @@ type Ethereum struct { blockReader services.FullBlockReader blockWriter *blockio.BlockWriter - agg *libstate.AggregatorV3 - logger log.Logger + agg *libstate.AggregatorV3 + blockSnapshots *snapshotsync.RoSnapshots + logger log.Logger } // New creates a new Ethereum object (including the @@ -177,8 +178,56 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) ( if err != nil { return nil, err } + if err := chainKv.Update(context.Background(), func(tx kv.RwTx) error { + if err = stagedsync.UpdateMetrics(tx); err != nil { + return err + } - var currentBlock *types.Block + config.Prune, err = prune.EnsureNotChanged(tx, config.Prune) + if err != nil { + return err + } + + config.HistoryV3, err = kvcfg.HistoryV3.WriteOnce(tx, config.HistoryV3) + if err != nil { + return err + } + + config.TransactionsV3, err = kvcfg.TransactionsV3.WriteOnce(tx, config.TransactionsV3) + if err != nil { + return err + } + + return nil + }); err != nil { + return nil, err + } + ctx, ctxCancel := context.WithCancel(context.Background()) + + // kv_remote architecture does blocks on stream.Send - means current architecture require unlimited amount of txs to provide good throughput + backend := &Ethereum{ + sentryCtx: ctx, + sentryCancel: ctxCancel, + config: config, + chainDB: chainKv, + networkID: config.NetworkID, + etherbase: config.Miner.Etherbase, + waitForStageLoopStop: make(chan struct{}), + waitForMiningStop: make(chan struct{}), + notifications: &shards.Notifications{ + Events: shards.NewEvents(), + Accumulator: shards.NewAccumulator(), + }, + logger: logger, + } + var ( + allSnapshots *snapshotsync.RoSnapshots + ) + blockReader, blockWriter, allSnapshots, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config.Snapshot, backend.notifications.Events, config.TransactionsV3, logger) + if err != nil { + return nil, err + } + backend.agg, backend.blockSnapshots, backend.blockReader, backend.blockWriter = agg, allSnapshots, blockReader, blockWriter // Check if we have an already initialized chain and fall back to // that if so. Otherwise we need to generate a new genesis spec. @@ -199,7 +248,18 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) ( return genesisErr } - currentBlock = rawdb.ReadCurrentBlock(tx) + isCorrectSync, useSnapshots, err := snap.EnsureNotChanged(tx, config.Snapshot) + if err != nil { + return err + } + // if we are in the incorrect syncmode then we change it to the appropriate one + if !isCorrectSync { + logger.Warn("Incorrect snapshot enablement", "got", config.Sync.UseSnapshots, "change_to", useSnapshots) + config.Sync.UseSnapshots = useSnapshots + config.Snapshot.Enabled = ethconfig.UseSnapshotsByChainName(chainConfig.ChainName) && useSnapshots + } + logger.Info("Effective", "prune_flags", config.Prune.String(), "snapshot_flags", config.Snapshot.String(), "history.v3", config.HistoryV3) + return nil }); err != nil { panic(err) @@ -208,70 +268,8 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) ( logger.Info("Initialised chain configuration", "config", chainConfig, "genesis", genesis.Hash()) - if err := chainKv.Update(context.Background(), func(tx kv.RwTx) error { - if err = stagedsync.UpdateMetrics(tx); err != nil { - return err - } - - config.Prune, err = prune.EnsureNotChanged(tx, config.Prune) - if err != nil { - return err - } - isCorrectSync, useSnapshots, err := snap.EnsureNotChanged(tx, config.Snapshot) - if err != nil { - return err - } - - config.HistoryV3, err = kvcfg.HistoryV3.WriteOnce(tx, config.HistoryV3) - if err != nil { - return err - } - - config.TransactionsV3, err = kvcfg.TransactionsV3.WriteOnce(tx, config.TransactionsV3) - if err != nil { - return err - } - - // if we are in the incorrect syncmode then we change it to the appropriate one - if !isCorrectSync { - logger.Warn("Incorrect snapshot enablement", "got", config.Sync.UseSnapshots, "change_to", useSnapshots) - config.Sync.UseSnapshots = useSnapshots - config.Snapshot.Enabled = useSnapshots - } - logger.Info("Effective", "prune_flags", config.Prune.String(), "snapshot_flags", config.Snapshot.String(), "history.v3", config.HistoryV3) - - return nil - }); err != nil { - return nil, err - } - - ctx, ctxCancel := context.WithCancel(context.Background()) - - // kv_remote architecture does blocks on stream.Send - means current architecture require unlimited amount of txs to provide good throughput - backend := &Ethereum{ - sentryCtx: ctx, - sentryCancel: ctxCancel, - config: config, - chainDB: chainKv, - networkID: config.NetworkID, - etherbase: config.Miner.Etherbase, - chainConfig: chainConfig, - genesisHash: genesis.Hash(), - waitForStageLoopStop: make(chan struct{}), - waitForMiningStop: make(chan struct{}), - notifications: &shards.Notifications{ - Events: shards.NewEvents(), - Accumulator: shards.NewAccumulator(), - }, - logger: logger, - } - var ( - allSnapshots *snapshotsync.RoSnapshots - ) - backend.blockReader, backend.blockWriter, allSnapshots, backend.agg, err = backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader, config.TransactionsV3) - if err != nil { - return nil, err - } + backend.genesisHash = genesis.Hash() + backend.chainConfig = chainConfig if config.HistoryV3 { backend.chainDB, err = temporal.New(backend.chainDB, backend.agg, accounts.ConvertV3toV2, historyv2read.RestoreCodeHash, accounts.DecodeIncarnationFromStorage, systemcontracts.SystemContractCodeLookup[chainConfig.ChainName]) @@ -280,6 +278,9 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) ( } chainKv = backend.chainDB } + if err := backend.setUpSnapDownloader(ctx, config.Downloader); err != nil { + return nil, err + } kvRPC := remotedbserver.NewKvServer(ctx, chainKv, allSnapshots, backend.agg, logger) backend.notifications.StateChangesConsumer = kvRPC @@ -401,6 +402,15 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) ( } return nil } + + var currentBlock *types.Block + if err := chainKv.View(context.Background(), func(tx kv.Tx) error { + currentBlock, err = backend.blockReader.CurrentBlock(tx) + return err + }); err != nil { + panic(err) + } + currentBlockNumber := uint64(0) if currentBlock != nil { currentBlockNumber = currentBlock.NumberU64() @@ -420,7 +430,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) ( } backend.engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, consensusConfig, config.Miner.Notify, config.Miner.Noverify, config.HeimdallgRPCAddress, config.HeimdallURL, config.WithoutHeimdall, stack.DataDir(), false /* readonly */, logger) - backend.forkValidator = engineapi.NewForkValidator(currentBlockNumber, inMemoryExecution, tmpdir) + backend.forkValidator = engineapi.NewForkValidator(currentBlockNumber, inMemoryExecution, tmpdir, backend.blockReader) if err != nil { return nil, err @@ -855,47 +865,29 @@ func (s *Ethereum) NodesInfo(limit int) (*remote.NodesInfoReply, error) { } // sets up blockReader and client downloader -func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snConfig ethconfig.Snapshot, downloaderCfg *downloadercfg.Cfg, transactionsV3 bool) (services.FullBlockReader, *blockio.BlockWriter, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) { - allSnapshots := snapshotsync.NewRoSnapshots(snConfig, dirs.Snap, s.logger) +func (s *Ethereum) setUpSnapDownloader(ctx context.Context, downloaderCfg *downloadercfg.Cfg) error { var err error - if !snConfig.NoDownloader { - allSnapshots.OptimisticalyReopenWithDB(s.chainDB) + if s.config.Snapshot.NoDownloader { + return nil } - blockReader := snapshotsync.NewBlockReader(allSnapshots, transactionsV3) - blockWriter := blockio.NewBlockWriter(transactionsV3) - - if !snConfig.NoDownloader { - if snConfig.DownloaderAddr != "" { - // connect to external Downloader - s.downloaderClient, err = downloadergrpc.NewClient(ctx, snConfig.DownloaderAddr) - } else { - // start embedded Downloader - s.downloader, err = downloader3.New(ctx, downloaderCfg) - if err != nil { - return nil, nil, nil, nil, err - } - s.downloader.MainLoopInBackground(ctx, true) - bittorrentServer, err := downloader3.NewGrpcServer(s.downloader) - if err != nil { - return nil, nil, nil, nil, fmt.Errorf("new server: %w", err) - } - - s.downloaderClient = direct.NewDownloaderClient(bittorrentServer) - } + if s.config.Snapshot.DownloaderAddr != "" { + // connect to external Downloader + s.downloaderClient, err = downloadergrpc.NewClient(ctx, s.config.Snapshot.DownloaderAddr) + } else { + // start embedded Downloader + s.downloader, err = downloader3.New(ctx, downloaderCfg) if err != nil { - return nil, nil, nil, nil, err + return err + } + s.downloader.MainLoopInBackground(ctx, true) + bittorrentServer, err := downloader3.NewGrpcServer(s.downloader) + if err != nil { + return fmt.Errorf("new server: %w", err) } - } - dir.MustExist(dirs.SnapHistory) - agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, s.chainDB, s.logger) - if err != nil { - return nil, nil, nil, nil, err + s.downloaderClient = direct.NewDownloaderClient(bittorrentServer) } - if err = agg.OpenFolder(); err != nil { - return nil, nil, nil, nil, err - } - agg.OnFreeze(func(frozenFileNames []string) { + s.agg.OnFreeze(func(frozenFileNames []string) { events := s.notifications.Events events.OnNewSnapshot() if s.downloaderClient != nil { @@ -910,7 +902,26 @@ func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snCo } } }) + return err +} +func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConfig ethconfig.Snapshot, notifications *shards.Events, transactionsV3 bool, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) { + allSnapshots := snapshotsync.NewRoSnapshots(snConfig, dirs.Snap, logger) + var err error + if !snConfig.NoDownloader { + allSnapshots.OptimisticalyReopenWithDB(db) + } + blockReader := snapshotsync.NewBlockReader(allSnapshots, transactionsV3) + blockWriter := blockio.NewBlockWriter(transactionsV3) + + dir.MustExist(dirs.SnapHistory) + agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db, logger) + if err != nil { + return nil, nil, nil, nil, err + } + if err = agg.OpenFolder(); err != nil { + return nil, nil, nil, nil, err + } return blockReader, blockWriter, allSnapshots, agg, nil } diff --git a/cmd/rpcdaemon/commands/daemon.go b/cmd/rpcdaemon/commands/daemon.go index 623e72a73..9902ab6f1 100644 --- a/cmd/rpcdaemon/commands/daemon.go +++ b/cmd/rpcdaemon/commands/daemon.go @@ -131,7 +131,7 @@ func APIList(db kv.RoDB, borDb kv.RoDB, eth rpchelper.ApiBackend, txPool txpool. Version: "1.0", }) case "clique": - list = append(list, clique.NewCliqueAPI(db, engine)) + list = append(list, clique.NewCliqueAPI(db, engine, blockReader)) } } diff --git a/cmd/rpcdaemon/rpcservices/eth_backend.go b/cmd/rpcdaemon/rpcservices/eth_backend.go index 5e0bcc434..1aa5c74dd 100644 --- a/cmd/rpcdaemon/rpcservices/eth_backend.go +++ b/cmd/rpcdaemon/rpcservices/eth_backend.go @@ -14,6 +14,7 @@ import ( "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/log/v3" "google.golang.org/grpc" "google.golang.org/grpc/status" @@ -44,6 +45,31 @@ func NewRemoteBackend(client remote.ETHBACKENDClient, db kv.RoDB, blockReader se } } +func (back *RemoteBackend) CurrentBlock(db kv.Tx) (*types.Block, error) { + panic("not implemented") +} +func (back *RemoteBackend) RawTransactions(ctx context.Context, tx kv.Getter, fromBlock, toBlock uint64) (txs [][]byte, err error) { + panic("not implemented") +} +func (back *RemoteBackend) BlockByNumber(ctx context.Context, db kv.Tx, number uint64) (*types.Block, error) { + hash, err := rawdb.ReadCanonicalHash(db, number) + if err != nil { + return nil, fmt.Errorf("failed ReadCanonicalHash: %w", err) + } + if hash == (libcommon.Hash{}) { + return nil, nil + } + block, _, err := back.BlockWithSenders(ctx, db, hash, number) + return block, err +} +func (r *RemoteBackend) BlockByHash(ctx context.Context, db kv.Tx, hash libcommon.Hash) (*types.Block, error) { + number := rawdb.ReadHeaderNumber(db, hash) + if number == nil { + return nil, nil + } + block, _, err := r.BlockWithSenders(ctx, db, hash, *number) + return block, err +} func (back *RemoteBackend) TxsV3Enabled() bool { panic("not implemented") } diff --git a/consensus/chain_reader.go b/consensus/chain_reader.go index 57eaa85c9..2323ff577 100644 --- a/consensus/chain_reader.go +++ b/consensus/chain_reader.go @@ -1,11 +1,13 @@ package consensus import ( + "context" "math/big" "github.com/ledgerwatch/erigon-lib/chain" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/erigon/core/rawdb" @@ -14,8 +16,9 @@ import ( // Implements consensus.ChainReader type ChainReaderImpl struct { - Cfg chain.Config - Db kv.Getter + Cfg chain.Config + Db kv.Getter + BlockReader services.FullBlockReader } // Config retrieves the blockchain's chain configuration. @@ -27,12 +30,14 @@ func (cr ChainReaderImpl) Config() *chain.Config { func (cr ChainReaderImpl) CurrentHeader() *types.Header { hash := rawdb.ReadHeadHeaderHash(cr.Db) number := rawdb.ReadHeaderNumber(cr.Db, hash) - return rawdb.ReadHeader(cr.Db, hash, *number) + h, _ := cr.BlockReader.Header(context.Background(), cr.Db, hash, *number) + return h } // GetHeader retrieves a block header from the database by hash and number. func (cr ChainReaderImpl) GetHeader(hash libcommon.Hash, number uint64) *types.Header { - return rawdb.ReadHeader(cr.Db, hash, number) + h, _ := cr.BlockReader.Header(context.Background(), cr.Db, hash, number) + return h } // GetHeaderByNumber retrieves a block header from the database by number. @@ -42,23 +47,27 @@ func (cr ChainReaderImpl) GetHeaderByNumber(number uint64) *types.Header { log.Error("ReadCanonicalHash failed", "err", err) return nil } - return rawdb.ReadHeader(cr.Db, hash, number) + h, _ := cr.BlockReader.Header(context.Background(), cr.Db, hash, number) + return h } // GetHeaderByHash retrieves a block header from the database by its hash. func (cr ChainReaderImpl) GetHeaderByHash(hash libcommon.Hash) *types.Header { number := rawdb.ReadHeaderNumber(cr.Db, hash) - return rawdb.ReadHeader(cr.Db, hash, *number) + h, _ := cr.BlockReader.Header(context.Background(), cr.Db, hash, *number) + return h } // GetBlock retrieves a block from the database by hash and number. func (cr ChainReaderImpl) GetBlock(hash libcommon.Hash, number uint64) *types.Block { - return rawdb.ReadBlock(cr.Db, hash, number) + b, _, _ := cr.BlockReader.BlockWithSenders(context.Background(), cr.Db, hash, number) + return b } // HasBlock retrieves a block from the database by hash and number. func (cr ChainReaderImpl) HasBlock(hash libcommon.Hash, number uint64) bool { - return rawdb.HasBlock(cr.Db, hash, number) + b, _ := cr.BlockReader.BodyRlp(context.Background(), cr.Db, hash, number) + return b != nil } // GetTd retrieves the total difficulty from the database by hash and number. diff --git a/consensus/clique/api.go b/consensus/clique/api.go index dccbf6757..07ac51f18 100644 --- a/consensus/clique/api.go +++ b/consensus/clique/api.go @@ -25,6 +25,7 @@ import ( "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/rpc" + "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/log/v3" ) @@ -32,9 +33,10 @@ import ( // mechanisms of the proof-of-authority scheme. type API struct { // chain consensus.ChainHeaderReader - db kv.RoDB - clique *Clique - logger log.Logger + db kv.RoDB + clique *Clique + logger log.Logger + blockReader services.FullBlockReader } // GetSnapshot retrieves the state snapshot at a given block. @@ -44,7 +46,7 @@ func (api *API) GetSnapshot(ctx context.Context, number *rpc.BlockNumber) (*Snap return nil, err } defer tx.Rollback() - chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx} + chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx, BlockReader: api.blockReader} // Retrieve the requested block number (or current if none requested) var header *types.Header @@ -73,7 +75,7 @@ func (api *API) GetSnapshotAtHash(ctx context.Context, hash libcommon.Hash) (*Sn return nil, err } defer tx.Rollback() - chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx} + chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx, BlockReader: api.blockReader} header := chain.GetHeaderByHash(hash) if header == nil { @@ -95,7 +97,7 @@ func (api *API) GetSigners(ctx context.Context, number *rpc.BlockNumber) ([]libc return nil, err } defer tx.Rollback() - chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx} + chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx, BlockReader: api.blockReader} // Retrieve the requested block number (or current if none requested) var header *types.Header @@ -123,7 +125,7 @@ func (api *API) GetSignersAtHash(ctx context.Context, hash libcommon.Hash) ([]li return nil, err } defer tx.Rollback() - chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx} + chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx, BlockReader: api.blockReader} header := chain.GetHeaderByHash(hash) if header == nil { @@ -182,7 +184,7 @@ func (api *API) Status(ctx context.Context) (*status, error) { return nil, err } defer tx.Rollback() - chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx} + chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx, BlockReader: api.blockReader} var ( numBlocks = uint64(64) diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 6b784eaec..432d57cd6 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -30,6 +30,7 @@ import ( "github.com/goccy/go-json" lru "github.com/hashicorp/golang-lru/v2" + "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/erigon-lib/chain" @@ -536,7 +537,7 @@ func (c *Clique) APIs(chain consensus.ChainHeaderReader) []rpc.API { } } -func NewCliqueAPI(db kv.RoDB, engine consensus.EngineReader) rpc.API { +func NewCliqueAPI(db kv.RoDB, engine consensus.EngineReader, blockReader services.FullBlockReader) rpc.API { var c *Clique if casted, ok := engine.(*Clique); ok { c = casted @@ -545,7 +546,7 @@ func NewCliqueAPI(db kv.RoDB, engine consensus.EngineReader) rpc.API { return rpc.API{ Namespace: "clique", Version: "1.0", - Service: &API{db: db, clique: c}, + Service: &API{db: db, clique: c, blockReader: blockReader}, Public: false, } } diff --git a/consensus/clique/clique_test.go b/consensus/clique/clique_test.go index a29cfb8c8..e3b5e3fc1 100644 --- a/consensus/clique/clique_test.go +++ b/consensus/clique/clique_test.go @@ -17,7 +17,6 @@ package clique_test import ( - "context" "math/big" "testing" @@ -60,12 +59,13 @@ func TestReimportMirroredState(t *testing.T) { } copy(genspec.ExtraData[clique.ExtraVanity:], addr[:]) m := stages.MockWithGenesisEngine(t, genspec, engine, false) + br, _ := m.NewBlocksIO() // Generate a batch of blocks, each properly signed getHeader := func(hash libcommon.Hash, number uint64) (h *types.Header) { - if err := m.DB.View(context.Background(), func(tx kv.Tx) error { - h = rawdb.ReadHeader(tx, hash, number) - return nil + if err := m.DB.View(m.Ctx, func(tx kv.Tx) (err error) { + h, err = br.Header(m.Ctx, tx, hash, number) + return err }); err != nil { panic(err) } @@ -109,8 +109,8 @@ func TestReimportMirroredState(t *testing.T) { if err := m.InsertChain(chain.Slice(0, 2)); err != nil { t.Fatalf("failed to insert initial blocks: %v", err) } - if err := m.DB.View(context.Background(), func(tx kv.Tx) error { - if head, err1 := rawdb.ReadBlockByHash(tx, rawdb.ReadHeadHeaderHash(tx)); err1 != nil { + if err := m.DB.View(m.Ctx, func(tx kv.Tx) error { + if head, err1 := br.BlockByHash(m.Ctx, tx, rawdb.ReadHeadHeaderHash(tx)); err1 != nil { t.Errorf("could not read chain head: %v", err1) } else if head.NumberU64() != 2 { t.Errorf("chain head mismatch: have %d, want %d", head.NumberU64(), 2) @@ -126,8 +126,8 @@ func TestReimportMirroredState(t *testing.T) { if err := m.InsertChain(chain.Slice(2, chain.Length())); err != nil { t.Fatalf("failed to insert final block: %v", err) } - if err := m.DB.View(context.Background(), func(tx kv.Tx) error { - if head, err1 := rawdb.ReadBlockByHash(tx, rawdb.ReadHeadHeaderHash(tx)); err1 != nil { + if err := m.DB.View(m.Ctx, func(tx kv.Tx) error { + if head, err1 := br.CurrentBlock(tx); err1 != nil { t.Errorf("could not read chain head: %v", err1) } else if head.NumberU64() != 3 { t.Errorf("chain head mismatch: have %d, want %d", head.NumberU64(), 3) diff --git a/core/genesis_test.go b/core/genesis_test.go index 0402814a9..4e0f3761d 100644 --- a/core/genesis_test.go +++ b/core/genesis_test.go @@ -25,7 +25,7 @@ import ( func TestGenesisBlockHashes(t *testing.T) { logger := log.New() - _, db, _ := temporal.NewTestDB(t, context.Background(), datadir.New(t.TempDir()), nil, logger) + _, _, db, _ := temporal.NewTestDB(t, context.Background(), datadir.New(t.TempDir()), nil, logger) check := func(network string) { genesis := core.GenesisBlockByChainName(network) tx, err := db.BeginRw(context.Background()) @@ -74,7 +74,7 @@ func TestGenesisBlockRoots(t *testing.T) { func TestCommitGenesisIdempotency(t *testing.T) { logger := log.New() - _, db, _ := temporal.NewTestDB(t, context.Background(), datadir.New(t.TempDir()), nil, logger) + _, _, db, _ := temporal.NewTestDB(t, context.Background(), datadir.New(t.TempDir()), nil, logger) tx, err := db.BeginRw(context.Background()) require.NoError(t, err) defer tx.Rollback() @@ -111,7 +111,7 @@ func TestAllocConstructor(t *testing.T) { }, } - historyV3, db, _ := temporal.NewTestDB(t, context.Background(), datadir.New(t.TempDir()), nil, logger) + historyV3, _, db, _ := temporal.NewTestDB(t, context.Background(), datadir.New(t.TempDir()), nil, logger) _, _, err := core.CommitGenesisBlock(db, genSpec, "", logger) require.NoError(err) diff --git a/core/rlp_test.go b/core/rlp_test.go index ce5dcadc7..55336b2a6 100644 --- a/core/rlp_test.go +++ b/core/rlp_test.go @@ -39,7 +39,7 @@ import ( func getBlock(tb testing.TB, transactions int, uncles int, dataSize int, tmpDir string) *types.Block { logger := log.New() - _, db, _ := temporal.NewTestDB(tb, context.Background(), datadir.New(tmpDir), nil, logger) + _, _, db, _ := temporal.NewTestDB(tb, context.Background(), datadir.New(tmpDir), nil, logger) var ( aa = libcommon.HexToAddress("0x000000000000000000000000000000000000aaaa") // Generate a canonical chain to act as the main dataset diff --git a/core/state/temporal/kv_temporal.go b/core/state/temporal/kv_temporal.go index 1edab4fc2..c93d3a9fb 100644 --- a/core/state/temporal/kv_temporal.go +++ b/core/state/temporal/kv_temporal.go @@ -473,8 +473,9 @@ func (tx *Tx) HistoryRange(name kv.History, fromTs, toTs int, asc order.By, limi } // TODO: need remove `gspec` param (move SystemContractCodeLookup feature somewhere) -func NewTestDB(tb testing.TB, ctx context.Context, dirs datadir.Dirs, gspec *types.Genesis, logger log.Logger) (histV3 bool, db kv.RwDB, agg *state.AggregatorV3) { +func NewTestDB(tb testing.TB, ctx context.Context, dirs datadir.Dirs, gspec *types.Genesis, logger log.Logger) (histV3, txsV3 bool, db kv.RwDB, agg *state.AggregatorV3) { HistoryV3 := ethconfig.EnableHistoryV3InTest + TxsV3 := ethconfig.EnableTxsV3InTest if tb != nil { db = memdb.NewTestDB(tb) @@ -483,6 +484,7 @@ func NewTestDB(tb testing.TB, ctx context.Context, dirs datadir.Dirs, gspec *typ } _ = db.UpdateNosync(context.Background(), func(tx kv.RwTx) error { _, _ = kvcfg.HistoryV3.WriteOnce(tx, HistoryV3) + _, _ = kvcfg.TransactionsV3.WriteOnce(tx, TxsV3) return nil }) @@ -507,5 +509,5 @@ func NewTestDB(tb testing.TB, ctx context.Context, dirs datadir.Dirs, gspec *typ panic(err) } } - return HistoryV3, db, agg + return HistoryV3, TxsV3, db, agg } diff --git a/eth/backend.go b/eth/backend.go index 4aa1168be..ae140532c 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -34,6 +34,7 @@ import ( clcore "github.com/ledgerwatch/erigon/cl/phase1/core" "github.com/ledgerwatch/erigon/cl/phase1/execution_client" "github.com/ledgerwatch/erigon/core/rawdb/blockio" + "github.com/ledgerwatch/erigon/turbo/snapshotsync/snap" "github.com/holiman/uint256" "github.com/ledgerwatch/log/v3" @@ -107,7 +108,6 @@ import ( "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/shards" "github.com/ledgerwatch/erigon/turbo/snapshotsync" - "github.com/ledgerwatch/erigon/turbo/snapshotsync/snap" stages2 "github.com/ledgerwatch/erigon/turbo/stages" "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" ) @@ -211,7 +211,54 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere return nil, err } - var currentBlock *types.Block + if err := chainKv.Update(context.Background(), func(tx kv.RwTx) error { + if err = stagedsync.UpdateMetrics(tx); err != nil { + return err + } + + config.Prune, err = prune.EnsureNotChanged(tx, config.Prune) + if err != nil { + return err + } + + config.HistoryV3, err = kvcfg.HistoryV3.WriteOnce(tx, config.HistoryV3) + if err != nil { + return err + } + + config.TransactionsV3, err = kvcfg.TransactionsV3.WriteOnce(tx, config.TransactionsV3) + if err != nil { + return err + } + + return nil + }); err != nil { + return nil, err + } + + ctx, ctxCancel := context.WithCancel(context.Background()) + + // kv_remote architecture does blocks on stream.Send - means current architecture require unlimited amount of txs to provide good throughput + backend := &Ethereum{ + sentryCtx: ctx, + sentryCancel: ctxCancel, + config: config, + chainDB: chainKv, + networkID: config.NetworkID, + etherbase: config.Miner.Etherbase, + waitForStageLoopStop: make(chan struct{}), + waitForMiningStop: make(chan struct{}), + notifications: &shards.Notifications{ + Events: shards.NewEvents(), + Accumulator: shards.NewAccumulator(), + }, + logger: logger, + } + blockReader, blockWriter, allSnapshots, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config.Snapshot, backend.notifications.Events, config.TransactionsV3, logger) + if err != nil { + return nil, err + } + backend.agg, backend.blockSnapshots, backend.blockReader, backend.blockWriter = agg, allSnapshots, blockReader, blockWriter // Check if we have an already initialized chain and fall back to // that if so. Otherwise we need to generate a new genesis spec. @@ -232,40 +279,10 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere return genesisErr } - currentBlock = rawdb.ReadCurrentBlock(tx) - return nil - }); err != nil { - panic(err) - } - - config.Snapshot.Enabled = config.Sync.UseSnapshots - - logger.Info("Initialised chain configuration", "config", chainConfig, "genesis", genesis.Hash()) - - if err := chainKv.Update(context.Background(), func(tx kv.RwTx) error { - if err = stagedsync.UpdateMetrics(tx); err != nil { - return err - } - - config.Prune, err = prune.EnsureNotChanged(tx, config.Prune) - if err != nil { - return err - } isCorrectSync, useSnapshots, err := snap.EnsureNotChanged(tx, config.Snapshot) if err != nil { return err } - - config.HistoryV3, err = kvcfg.HistoryV3.WriteOnce(tx, config.HistoryV3) - if err != nil { - return err - } - - config.TransactionsV3, err = kvcfg.TransactionsV3.WriteOnce(tx, config.TransactionsV3) - if err != nil { - return err - } - // if we are in the incorrect syncmode then we change it to the appropriate one if !isCorrectSync { logger.Warn("Incorrect snapshot enablement", "got", config.Sync.UseSnapshots, "change_to", useSnapshots) @@ -276,35 +293,20 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere return nil }); err != nil { - return nil, err + panic(err) } - ctx, ctxCancel := context.WithCancel(context.Background()) + backend.chainConfig = chainConfig + backend.genesisBlock = genesis + backend.genesisHash = genesis.Hash() - // kv_remote architecture does blocks on stream.Send - means current architecture require unlimited amount of txs to provide good throughput - backend := &Ethereum{ - sentryCtx: ctx, - sentryCancel: ctxCancel, - config: config, - chainDB: chainKv, - networkID: config.NetworkID, - etherbase: config.Miner.Etherbase, - chainConfig: chainConfig, - genesisBlock: genesis, - genesisHash: genesis.Hash(), - waitForStageLoopStop: make(chan struct{}), - waitForMiningStop: make(chan struct{}), - notifications: &shards.Notifications{ - Events: shards.NewEvents(), - Accumulator: shards.NewAccumulator(), - }, - logger: logger, - } - blockReader, blockWriter, allSnapshots, agg, err := backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader, backend.notifications.Events, config.TransactionsV3) - if err != nil { + config.Snapshot.Enabled = config.Sync.UseSnapshots + + logger.Info("Initialised chain configuration", "config", chainConfig, "genesis", genesis.Hash()) + + if err := backend.setUpSnapDownloader(ctx, config.Downloader); err != nil { return nil, err } - backend.agg, backend.blockSnapshots, backend.blockReader, backend.blockWriter = agg, allSnapshots, blockReader, blockWriter if config.HistoryV3 { backend.chainDB, err = temporal.New(backend.chainDB, agg, accounts.ConvertV3toV2, historyv2read.RestoreCodeHash, accounts.DecodeIncarnationFromStorage, systemcontracts.SystemContractCodeLookup[chainConfig.ChainName]) @@ -441,6 +443,14 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere } return nil } + var currentBlock *types.Block + if err := chainKv.View(context.Background(), func(tx kv.Tx) error { + currentBlock, err = blockReader.CurrentBlock(tx) + return err + }); err != nil { + panic(err) + } + currentBlockNumber := uint64(0) if currentBlock != nil { currentBlockNumber = currentBlock.NumberU64() @@ -460,7 +470,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere } backend.engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, consensusConfig, config.Miner.Notify, config.Miner.Noverify, config.HeimdallgRPCAddress, config.HeimdallURL, config.WithoutHeimdall, stack.DataDir(), false /* readonly */, logger) - backend.forkValidator = engineapi.NewForkValidator(currentBlockNumber, inMemoryExecution, tmpdir) + backend.forkValidator = engineapi.NewForkValidator(currentBlockNumber, inMemoryExecution, tmpdir, backend.blockReader) backend.sentriesClient, err = sentry.NewMultiClient( chainKv, @@ -985,56 +995,63 @@ func (s *Ethereum) NodesInfo(limit int) (*remote.NodesInfoReply, error) { } // sets up blockReader and client downloader -func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snConfig ethconfig.Snapshot, downloaderCfg *downloadercfg.Cfg, notifications *shards.Events, transactionsV3 bool) (services.FullBlockReader, *blockio.BlockWriter, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) { - allSnapshots := snapshotsync.NewRoSnapshots(snConfig, dirs.Snap, s.logger) +func (s *Ethereum) setUpSnapDownloader(ctx context.Context, downloaderCfg *downloadercfg.Cfg) error { + var err error + if s.config.Snapshot.NoDownloader { + return nil + } + if s.config.Snapshot.DownloaderAddr != "" { + // connect to external Downloader + s.downloaderClient, err = downloadergrpc.NewClient(ctx, s.config.Snapshot.DownloaderAddr) + } else { + // start embedded Downloader + s.downloader, err = downloader3.New(ctx, downloaderCfg) + if err != nil { + return err + } + s.downloader.MainLoopInBackground(ctx, true) + bittorrentServer, err := downloader3.NewGrpcServer(s.downloader) + if err != nil { + return fmt.Errorf("new server: %w", err) + } + + s.downloaderClient = direct.NewDownloaderClient(bittorrentServer) + } + s.agg.OnFreeze(func(frozenFileNames []string) { + events := s.notifications.Events + events.OnNewSnapshot() + if s.downloaderClient != nil { + req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, 0, len(frozenFileNames))} + for _, fName := range frozenFileNames { + req.Items = append(req.Items, &proto_downloader.DownloadItem{ + Path: filepath.Join("history", fName), + }) + } + if _, err := s.downloaderClient.Download(ctx, req); err != nil { + s.logger.Warn("[snapshots] notify downloader", "err", err) + } + } + }) + return err +} + +func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConfig ethconfig.Snapshot, notifications *shards.Events, transactionsV3 bool, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) { + allSnapshots := snapshotsync.NewRoSnapshots(snConfig, dirs.Snap, logger) var err error if !snConfig.NoDownloader { - allSnapshots.OptimisticalyReopenWithDB(s.chainDB) + allSnapshots.OptimisticalyReopenWithDB(db) } blockReader := snapshotsync.NewBlockReader(allSnapshots, transactionsV3) blockWriter := blockio.NewBlockWriter(transactionsV3) - if !snConfig.NoDownloader { - if snConfig.DownloaderAddr != "" { - // connect to external Downloader - s.downloaderClient, err = downloadergrpc.NewClient(ctx, snConfig.DownloaderAddr) - } else { - // start embedded Downloader - s.downloader, err = downloader3.New(ctx, downloaderCfg) - if err != nil { - return nil, nil, nil, nil, err - } - s.downloader.MainLoopInBackground(ctx, true) - bittorrentServer, err := downloader3.NewGrpcServer(s.downloader) - if err != nil { - return nil, nil, nil, nil, fmt.Errorf("new server: %w", err) - } - - s.downloaderClient = direct.NewDownloaderClient(bittorrentServer) - } - if err != nil { - return nil, nil, nil, nil, err - } - } - dir.MustExist(dirs.SnapHistory) - agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, s.chainDB, s.logger) + agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db, logger) if err != nil { return nil, nil, nil, nil, err } if err = agg.OpenFolder(); err != nil { return nil, nil, nil, nil, err } - agg.OnFreeze(func(frozenFileNames []string) { - notifications.OnNewSnapshot() - req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, 0, len(frozenFileNames))} - for _, fName := range frozenFileNames { - req.Items = append(req.Items, &proto_downloader.DownloadItem{ - Path: filepath.Join("history", fName), - }) - } - }) - return blockReader, blockWriter, allSnapshots, agg, nil } diff --git a/eth/ethconfig/erigon3_test_disable.go b/eth/ethconfig/erigon3_test_disable.go index 3071e5f47..4c255f393 100644 --- a/eth/ethconfig/erigon3_test_disable.go +++ b/eth/ethconfig/erigon3_test_disable.go @@ -4,4 +4,4 @@ package ethconfig const EnableHistoryV3InTest = false const EnableHistoryV4InTest = false -const EnableTransactionsV3InTest = false +const EnableTxsV3InTest = false diff --git a/eth/ethconfig/erigon3_test_enable.go b/eth/ethconfig/erigon3_test_enable.go index 78678865f..27cbfa156 100644 --- a/eth/ethconfig/erigon3_test_enable.go +++ b/eth/ethconfig/erigon3_test_enable.go @@ -4,4 +4,4 @@ package ethconfig const EnableHistoryV3InTest = true const EnableHistoryV4InTest = false -const EnableTransactionsV3InTest = false +const EnableTxsV3InTest = false diff --git a/eth/ethconfig/erigon4_test_enable.go b/eth/ethconfig/erigon4_test_enable.go index 185530fd3..479b7b22d 100644 --- a/eth/ethconfig/erigon4_test_enable.go +++ b/eth/ethconfig/erigon4_test_enable.go @@ -4,4 +4,4 @@ package ethconfig const EnableHistoryV3InTest = true const EnableHistoryV4InTest = true -const EnableTransactionsV3InTest = false +const EnableTxsV3InTest = false diff --git a/turbo/engineapi/fork_validator.go b/turbo/engineapi/fork_validator.go index cb96b9a1e..afb5c2881 100644 --- a/turbo/engineapi/fork_validator.go +++ b/turbo/engineapi/fork_validator.go @@ -22,6 +22,7 @@ import ( "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/memdb" + "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/erigon/common/dbutils" @@ -49,9 +50,11 @@ type ForkValidator struct { extendingForkHeadHash libcommon.Hash // this is the function we use to perform payload validation. validatePayload validatePayloadFunc + blockReader services.FullBlockReader // this is the current point where we processed the chain so far. currentHeight uint64 tmpDir string + // we want fork validator to be thread safe so let lock sync.Mutex } @@ -63,12 +66,13 @@ func NewForkValidatorMock(currentHeight uint64) *ForkValidator { } } -func NewForkValidator(currentHeight uint64, validatePayload validatePayloadFunc, tmpDir string) *ForkValidator { +func NewForkValidator(currentHeight uint64, validatePayload validatePayloadFunc, tmpDir string, blockReader services.FullBlockReader) *ForkValidator { return &ForkValidator{ sideForksBlock: make(map[libcommon.Hash]types.RawBlock), validatePayload: validatePayload, currentHeight: currentHeight, tmpDir: tmpDir, + blockReader: blockReader, } } @@ -84,12 +88,12 @@ func (fv *ForkValidator) notifyTxPool(to uint64, accumulator *shards.Accumulator if err != nil { return fmt.Errorf("read canonical hash of unwind point: %w", err) } - header := rawdb.ReadHeader(fv.extendingFork, hash, to) + header, _ := fv.blockReader.Header(context.Background(), fv.extendingFork, hash, to) if header == nil { return fmt.Errorf("could not find header for block: %d", to) } - txs, err := rawdb.RawTransactionsRange(fv.extendingFork, to, to+1) + txs, err := fv.blockReader.RawTransactions(context.Background(), fv.extendingFork, to, to+1) if err != nil { return err } @@ -282,7 +286,7 @@ func (fv *ForkValidator) validateAndStorePayload(tx kv.RwTx, header *types.Heade // If we do not have the body we can recover it from the batch. if body == nil { var bodyFromDb *types.Body - bodyFromDb, criticalError = rawdb.ReadBodyWithTransactions(tx, header.Hash(), header.Number.Uint64()) + bodyFromDb, criticalError = fv.blockReader.BodyWithTransactions(context.Background(), tx, header.Hash(), header.Number.Uint64()) if criticalError != nil { return } diff --git a/turbo/services/interfaces.go b/turbo/services/interfaces.go index f33fc4547..fd9e73e22 100644 --- a/turbo/services/interfaces.go +++ b/turbo/services/interfaces.go @@ -3,7 +3,7 @@ package services import ( "context" - libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/rlp" @@ -14,29 +14,33 @@ type All struct { } type BlockReader interface { - BlockWithSenders(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (block *types.Block, senders []libcommon.Address, err error) + BlockByNumber(ctx context.Context, db kv.Tx, number uint64) (*types.Block, error) + BlockByHash(ctx context.Context, db kv.Tx, hash common.Hash) (*types.Block, error) + CurrentBlock(db kv.Tx) (*types.Block, error) + BlockWithSenders(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (block *types.Block, senders []common.Address, err error) TxsV3Enabled() bool } type HeaderReader interface { - Header(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (*types.Header, error) + Header(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (*types.Header, error) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (*types.Header, error) - HeaderByHash(ctx context.Context, tx kv.Getter, hash libcommon.Hash) (*types.Header, error) + HeaderByHash(ctx context.Context, tx kv.Getter, hash common.Hash) (*types.Header, error) } type CanonicalReader interface { - CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (libcommon.Hash, error) + CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (common.Hash, error) } type BodyReader interface { - BodyWithTransactions(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (body *types.Body, err error) - BodyRlp(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (bodyRlp rlp.RawValue, err error) - Body(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (body *types.Body, txAmount uint32, err error) + BodyWithTransactions(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, err error) + BodyRlp(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (bodyRlp rlp.RawValue, err error) + Body(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, txAmount uint32, err error) } type TxnReader interface { - TxnLookup(ctx context.Context, tx kv.Getter, txnHash libcommon.Hash) (uint64, bool, error) + TxnLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNum uint64, i int) (txn types.Transaction, err error) + RawTransactions(ctx context.Context, tx kv.Getter, fromBlock, toBlock uint64) (txs [][]byte, err error) } type HeaderAndCanonicalReader interface { HeaderReader diff --git a/turbo/snapshotsync/block_reader.go b/turbo/snapshotsync/block_reader.go index 5d6f87539..9e64158ff 100644 --- a/turbo/snapshotsync/block_reader.go +++ b/turbo/snapshotsync/block_reader.go @@ -22,6 +22,37 @@ type RemoteBlockReader struct { client remote.ETHBACKENDClient } +func (r *RemoteBlockReader) CurrentBlock(db kv.Tx) (*types.Block, error) { + headHash := rawdb.ReadHeadBlockHash(db) + headNumber := rawdb.ReadHeaderNumber(db, headHash) + if headNumber == nil { + return nil, nil + } + block, _, err := r.BlockWithSenders(context.Background(), db, headHash, *headNumber) + return block, err +} +func (r *RemoteBlockReader) RawTransactions(ctx context.Context, tx kv.Getter, fromBlock, toBlock uint64) (txs [][]byte, err error) { + panic("not implemented") +} +func (r *RemoteBlockReader) BlockByNumber(ctx context.Context, db kv.Tx, number uint64) (*types.Block, error) { + hash, err := rawdb.ReadCanonicalHash(db, number) + if err != nil { + return nil, fmt.Errorf("failed ReadCanonicalHash: %w", err) + } + if hash == (libcommon.Hash{}) { + return nil, nil + } + block, _, err := r.BlockWithSenders(ctx, db, hash, number) + return block, err +} +func (r *RemoteBlockReader) BlockByHash(ctx context.Context, db kv.Tx, hash libcommon.Hash) (*types.Block, error) { + number := rawdb.ReadHeaderNumber(db, hash) + if number == nil { + return nil, nil + } + block, _, err := r.BlockWithSenders(ctx, db, hash, *number) + return block, err +} func (r *RemoteBlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (*types.Header, error) { canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight) if err != nil { @@ -341,6 +372,13 @@ func (r *BlockReader) Body(ctx context.Context, tx kv.Getter, hash libcommon.Has func (r *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (block *types.Block, senders []libcommon.Address, err error) { if blockHeight >= r.sn.BlocksAvailable() { + if r.TransactionsV3 { + block, senders, err = rawdb.ReadBlockWithSenders(tx, hash, blockHeight) + if err != nil { + return nil, nil, err + } + return block, senders, nil + } canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight) if err != nil { return nil, nil, fmt.Errorf("requested non-canonical hash %x. canonical=%x", hash, canonicalHash) @@ -738,3 +776,34 @@ func (r *BlockReader) IterateBodies(f func(blockNum, baseTxNum, txAmount uint64) return nil } func (r *BlockReader) TxsV3Enabled() bool { return r.TransactionsV3 } +func (r *BlockReader) BlockByNumber(ctx context.Context, db kv.Tx, number uint64) (*types.Block, error) { + hash, err := rawdb.ReadCanonicalHash(db, number) + if err != nil { + return nil, fmt.Errorf("failed ReadCanonicalHash: %w", err) + } + if hash == (libcommon.Hash{}) { + return nil, nil + } + block, _, err := r.BlockWithSenders(ctx, db, hash, number) + return block, err +} +func (r *BlockReader) BlockByHash(ctx context.Context, db kv.Tx, hash libcommon.Hash) (*types.Block, error) { + number := rawdb.ReadHeaderNumber(db, hash) + if number == nil { + return nil, nil + } + block, _, err := r.BlockWithSenders(ctx, db, hash, *number) + return block, err +} +func (r *BlockReader) CurrentBlock(db kv.Tx) (*types.Block, error) { + headHash := rawdb.ReadHeadBlockHash(db) + headNumber := rawdb.ReadHeaderNumber(db, headHash) + if headNumber == nil { + return nil, nil + } + block, _, err := r.BlockWithSenders(context.Background(), db, headHash, *headNumber) + return block, err +} +func (r *BlockReader) RawTransactions(ctx context.Context, tx kv.Getter, fromBlock, toBlock uint64) (txs [][]byte, err error) { + return rawdb.RawTransactionsRange(tx, fromBlock, toBlock) +} diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index f540a79af..6c68a7f4c 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -232,8 +232,9 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK logger := log.New() ctx, ctxCancel := context.WithCancel(context.Background()) - histV3, db, agg := temporal.NewTestDB(tb, ctx, dirs, gspec, logger) + histV3, txsV3, db, agg := temporal.NewTestDB(tb, ctx, dirs, gspec, logger) cfg.HistoryV3 = histV3 + cfg.TransactionsV3 = txsV3 erigonGrpcServeer := remotedbserver.NewKvServer(ctx, db, nil, nil, logger) allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, logger) @@ -337,7 +338,8 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK } return nil } - forkValidator := engineapi.NewForkValidator(1, inMemoryExecution, dirs.Tmp) + br, _ := mock.NewBlocksIO() + forkValidator := engineapi.NewForkValidator(1, inMemoryExecution, dirs.Tmp, br) networkID := uint64(1) mock.sentriesClient, err = sentry.NewMultiClient( mock.DB,