Use BlockReader in ForkValidator, CliqueAPI (#7562)

This commit is contained in:
Alex Sharov 2023-05-23 14:49:17 +07:00 committed by GitHub
parent d279c43c81
commit 63afe65686
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 396 additions and 249 deletions

View File

@ -154,8 +154,9 @@ type Ethereum struct {
blockReader services.FullBlockReader
blockWriter *blockio.BlockWriter
agg *libstate.AggregatorV3
logger log.Logger
agg *libstate.AggregatorV3
blockSnapshots *snapshotsync.RoSnapshots
logger log.Logger
}
// New creates a new Ethereum object (including the
@ -177,8 +178,56 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
if err != nil {
return nil, err
}
if err := chainKv.Update(context.Background(), func(tx kv.RwTx) error {
if err = stagedsync.UpdateMetrics(tx); err != nil {
return err
}
var currentBlock *types.Block
config.Prune, err = prune.EnsureNotChanged(tx, config.Prune)
if err != nil {
return err
}
config.HistoryV3, err = kvcfg.HistoryV3.WriteOnce(tx, config.HistoryV3)
if err != nil {
return err
}
config.TransactionsV3, err = kvcfg.TransactionsV3.WriteOnce(tx, config.TransactionsV3)
if err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
ctx, ctxCancel := context.WithCancel(context.Background())
// kv_remote architecture does blocks on stream.Send - means current architecture require unlimited amount of txs to provide good throughput
backend := &Ethereum{
sentryCtx: ctx,
sentryCancel: ctxCancel,
config: config,
chainDB: chainKv,
networkID: config.NetworkID,
etherbase: config.Miner.Etherbase,
waitForStageLoopStop: make(chan struct{}),
waitForMiningStop: make(chan struct{}),
notifications: &shards.Notifications{
Events: shards.NewEvents(),
Accumulator: shards.NewAccumulator(),
},
logger: logger,
}
var (
allSnapshots *snapshotsync.RoSnapshots
)
blockReader, blockWriter, allSnapshots, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config.Snapshot, backend.notifications.Events, config.TransactionsV3, logger)
if err != nil {
return nil, err
}
backend.agg, backend.blockSnapshots, backend.blockReader, backend.blockWriter = agg, allSnapshots, blockReader, blockWriter
// Check if we have an already initialized chain and fall back to
// that if so. Otherwise we need to generate a new genesis spec.
@ -199,7 +248,18 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
return genesisErr
}
currentBlock = rawdb.ReadCurrentBlock(tx)
isCorrectSync, useSnapshots, err := snap.EnsureNotChanged(tx, config.Snapshot)
if err != nil {
return err
}
// if we are in the incorrect syncmode then we change it to the appropriate one
if !isCorrectSync {
logger.Warn("Incorrect snapshot enablement", "got", config.Sync.UseSnapshots, "change_to", useSnapshots)
config.Sync.UseSnapshots = useSnapshots
config.Snapshot.Enabled = ethconfig.UseSnapshotsByChainName(chainConfig.ChainName) && useSnapshots
}
logger.Info("Effective", "prune_flags", config.Prune.String(), "snapshot_flags", config.Snapshot.String(), "history.v3", config.HistoryV3)
return nil
}); err != nil {
panic(err)
@ -208,70 +268,8 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
logger.Info("Initialised chain configuration", "config", chainConfig, "genesis", genesis.Hash())
if err := chainKv.Update(context.Background(), func(tx kv.RwTx) error {
if err = stagedsync.UpdateMetrics(tx); err != nil {
return err
}
config.Prune, err = prune.EnsureNotChanged(tx, config.Prune)
if err != nil {
return err
}
isCorrectSync, useSnapshots, err := snap.EnsureNotChanged(tx, config.Snapshot)
if err != nil {
return err
}
config.HistoryV3, err = kvcfg.HistoryV3.WriteOnce(tx, config.HistoryV3)
if err != nil {
return err
}
config.TransactionsV3, err = kvcfg.TransactionsV3.WriteOnce(tx, config.TransactionsV3)
if err != nil {
return err
}
// if we are in the incorrect syncmode then we change it to the appropriate one
if !isCorrectSync {
logger.Warn("Incorrect snapshot enablement", "got", config.Sync.UseSnapshots, "change_to", useSnapshots)
config.Sync.UseSnapshots = useSnapshots
config.Snapshot.Enabled = useSnapshots
}
logger.Info("Effective", "prune_flags", config.Prune.String(), "snapshot_flags", config.Snapshot.String(), "history.v3", config.HistoryV3)
return nil
}); err != nil {
return nil, err
}
ctx, ctxCancel := context.WithCancel(context.Background())
// kv_remote architecture does blocks on stream.Send - means current architecture require unlimited amount of txs to provide good throughput
backend := &Ethereum{
sentryCtx: ctx,
sentryCancel: ctxCancel,
config: config,
chainDB: chainKv,
networkID: config.NetworkID,
etherbase: config.Miner.Etherbase,
chainConfig: chainConfig,
genesisHash: genesis.Hash(),
waitForStageLoopStop: make(chan struct{}),
waitForMiningStop: make(chan struct{}),
notifications: &shards.Notifications{
Events: shards.NewEvents(),
Accumulator: shards.NewAccumulator(),
},
logger: logger,
}
var (
allSnapshots *snapshotsync.RoSnapshots
)
backend.blockReader, backend.blockWriter, allSnapshots, backend.agg, err = backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader, config.TransactionsV3)
if err != nil {
return nil, err
}
backend.genesisHash = genesis.Hash()
backend.chainConfig = chainConfig
if config.HistoryV3 {
backend.chainDB, err = temporal.New(backend.chainDB, backend.agg, accounts.ConvertV3toV2, historyv2read.RestoreCodeHash, accounts.DecodeIncarnationFromStorage, systemcontracts.SystemContractCodeLookup[chainConfig.ChainName])
@ -280,6 +278,9 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
}
chainKv = backend.chainDB
}
if err := backend.setUpSnapDownloader(ctx, config.Downloader); err != nil {
return nil, err
}
kvRPC := remotedbserver.NewKvServer(ctx, chainKv, allSnapshots, backend.agg, logger)
backend.notifications.StateChangesConsumer = kvRPC
@ -401,6 +402,15 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
}
return nil
}
var currentBlock *types.Block
if err := chainKv.View(context.Background(), func(tx kv.Tx) error {
currentBlock, err = backend.blockReader.CurrentBlock(tx)
return err
}); err != nil {
panic(err)
}
currentBlockNumber := uint64(0)
if currentBlock != nil {
currentBlockNumber = currentBlock.NumberU64()
@ -420,7 +430,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
}
backend.engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, consensusConfig, config.Miner.Notify, config.Miner.Noverify,
config.HeimdallgRPCAddress, config.HeimdallURL, config.WithoutHeimdall, stack.DataDir(), false /* readonly */, logger)
backend.forkValidator = engineapi.NewForkValidator(currentBlockNumber, inMemoryExecution, tmpdir)
backend.forkValidator = engineapi.NewForkValidator(currentBlockNumber, inMemoryExecution, tmpdir, backend.blockReader)
if err != nil {
return nil, err
@ -855,47 +865,29 @@ func (s *Ethereum) NodesInfo(limit int) (*remote.NodesInfoReply, error) {
}
// sets up blockReader and client downloader
func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snConfig ethconfig.Snapshot, downloaderCfg *downloadercfg.Cfg, transactionsV3 bool) (services.FullBlockReader, *blockio.BlockWriter, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) {
allSnapshots := snapshotsync.NewRoSnapshots(snConfig, dirs.Snap, s.logger)
func (s *Ethereum) setUpSnapDownloader(ctx context.Context, downloaderCfg *downloadercfg.Cfg) error {
var err error
if !snConfig.NoDownloader {
allSnapshots.OptimisticalyReopenWithDB(s.chainDB)
if s.config.Snapshot.NoDownloader {
return nil
}
blockReader := snapshotsync.NewBlockReader(allSnapshots, transactionsV3)
blockWriter := blockio.NewBlockWriter(transactionsV3)
if !snConfig.NoDownloader {
if snConfig.DownloaderAddr != "" {
// connect to external Downloader
s.downloaderClient, err = downloadergrpc.NewClient(ctx, snConfig.DownloaderAddr)
} else {
// start embedded Downloader
s.downloader, err = downloader3.New(ctx, downloaderCfg)
if err != nil {
return nil, nil, nil, nil, err
}
s.downloader.MainLoopInBackground(ctx, true)
bittorrentServer, err := downloader3.NewGrpcServer(s.downloader)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("new server: %w", err)
}
s.downloaderClient = direct.NewDownloaderClient(bittorrentServer)
}
if s.config.Snapshot.DownloaderAddr != "" {
// connect to external Downloader
s.downloaderClient, err = downloadergrpc.NewClient(ctx, s.config.Snapshot.DownloaderAddr)
} else {
// start embedded Downloader
s.downloader, err = downloader3.New(ctx, downloaderCfg)
if err != nil {
return nil, nil, nil, nil, err
return err
}
s.downloader.MainLoopInBackground(ctx, true)
bittorrentServer, err := downloader3.NewGrpcServer(s.downloader)
if err != nil {
return fmt.Errorf("new server: %w", err)
}
}
dir.MustExist(dirs.SnapHistory)
agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, s.chainDB, s.logger)
if err != nil {
return nil, nil, nil, nil, err
s.downloaderClient = direct.NewDownloaderClient(bittorrentServer)
}
if err = agg.OpenFolder(); err != nil {
return nil, nil, nil, nil, err
}
agg.OnFreeze(func(frozenFileNames []string) {
s.agg.OnFreeze(func(frozenFileNames []string) {
events := s.notifications.Events
events.OnNewSnapshot()
if s.downloaderClient != nil {
@ -910,7 +902,26 @@ func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snCo
}
}
})
return err
}
func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConfig ethconfig.Snapshot, notifications *shards.Events, transactionsV3 bool, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) {
allSnapshots := snapshotsync.NewRoSnapshots(snConfig, dirs.Snap, logger)
var err error
if !snConfig.NoDownloader {
allSnapshots.OptimisticalyReopenWithDB(db)
}
blockReader := snapshotsync.NewBlockReader(allSnapshots, transactionsV3)
blockWriter := blockio.NewBlockWriter(transactionsV3)
dir.MustExist(dirs.SnapHistory)
agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db, logger)
if err != nil {
return nil, nil, nil, nil, err
}
if err = agg.OpenFolder(); err != nil {
return nil, nil, nil, nil, err
}
return blockReader, blockWriter, allSnapshots, agg, nil
}

View File

@ -131,7 +131,7 @@ func APIList(db kv.RoDB, borDb kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.
Version: "1.0",
})
case "clique":
list = append(list, clique.NewCliqueAPI(db, engine))
list = append(list, clique.NewCliqueAPI(db, engine, blockReader))
}
}

View File

@ -14,6 +14,7 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/log/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
@ -44,6 +45,31 @@ func NewRemoteBackend(client remote.ETHBACKENDClient, db kv.RoDB, blockReader se
}
}
func (back *RemoteBackend) CurrentBlock(db kv.Tx) (*types.Block, error) {
panic("not implemented")
}
func (back *RemoteBackend) RawTransactions(ctx context.Context, tx kv.Getter, fromBlock, toBlock uint64) (txs [][]byte, err error) {
panic("not implemented")
}
func (back *RemoteBackend) BlockByNumber(ctx context.Context, db kv.Tx, number uint64) (*types.Block, error) {
hash, err := rawdb.ReadCanonicalHash(db, number)
if err != nil {
return nil, fmt.Errorf("failed ReadCanonicalHash: %w", err)
}
if hash == (libcommon.Hash{}) {
return nil, nil
}
block, _, err := back.BlockWithSenders(ctx, db, hash, number)
return block, err
}
func (r *RemoteBackend) BlockByHash(ctx context.Context, db kv.Tx, hash libcommon.Hash) (*types.Block, error) {
number := rawdb.ReadHeaderNumber(db, hash)
if number == nil {
return nil, nil
}
block, _, err := r.BlockWithSenders(ctx, db, hash, *number)
return block, err
}
func (back *RemoteBackend) TxsV3Enabled() bool {
panic("not implemented")
}

View File

@ -1,11 +1,13 @@
package consensus
import (
"context"
"math/big"
"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/core/rawdb"
@ -14,8 +16,9 @@ import (
// Implements consensus.ChainReader
type ChainReaderImpl struct {
Cfg chain.Config
Db kv.Getter
Cfg chain.Config
Db kv.Getter
BlockReader services.FullBlockReader
}
// Config retrieves the blockchain's chain configuration.
@ -27,12 +30,14 @@ func (cr ChainReaderImpl) Config() *chain.Config {
func (cr ChainReaderImpl) CurrentHeader() *types.Header {
hash := rawdb.ReadHeadHeaderHash(cr.Db)
number := rawdb.ReadHeaderNumber(cr.Db, hash)
return rawdb.ReadHeader(cr.Db, hash, *number)
h, _ := cr.BlockReader.Header(context.Background(), cr.Db, hash, *number)
return h
}
// GetHeader retrieves a block header from the database by hash and number.
func (cr ChainReaderImpl) GetHeader(hash libcommon.Hash, number uint64) *types.Header {
return rawdb.ReadHeader(cr.Db, hash, number)
h, _ := cr.BlockReader.Header(context.Background(), cr.Db, hash, number)
return h
}
// GetHeaderByNumber retrieves a block header from the database by number.
@ -42,23 +47,27 @@ func (cr ChainReaderImpl) GetHeaderByNumber(number uint64) *types.Header {
log.Error("ReadCanonicalHash failed", "err", err)
return nil
}
return rawdb.ReadHeader(cr.Db, hash, number)
h, _ := cr.BlockReader.Header(context.Background(), cr.Db, hash, number)
return h
}
// GetHeaderByHash retrieves a block header from the database by its hash.
func (cr ChainReaderImpl) GetHeaderByHash(hash libcommon.Hash) *types.Header {
number := rawdb.ReadHeaderNumber(cr.Db, hash)
return rawdb.ReadHeader(cr.Db, hash, *number)
h, _ := cr.BlockReader.Header(context.Background(), cr.Db, hash, *number)
return h
}
// GetBlock retrieves a block from the database by hash and number.
func (cr ChainReaderImpl) GetBlock(hash libcommon.Hash, number uint64) *types.Block {
return rawdb.ReadBlock(cr.Db, hash, number)
b, _, _ := cr.BlockReader.BlockWithSenders(context.Background(), cr.Db, hash, number)
return b
}
// HasBlock retrieves a block from the database by hash and number.
func (cr ChainReaderImpl) HasBlock(hash libcommon.Hash, number uint64) bool {
return rawdb.HasBlock(cr.Db, hash, number)
b, _ := cr.BlockReader.BodyRlp(context.Background(), cr.Db, hash, number)
return b != nil
}
// GetTd retrieves the total difficulty from the database by hash and number.

View File

@ -25,6 +25,7 @@ import (
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
)
@ -32,9 +33,10 @@ import (
// mechanisms of the proof-of-authority scheme.
type API struct {
// chain consensus.ChainHeaderReader
db kv.RoDB
clique *Clique
logger log.Logger
db kv.RoDB
clique *Clique
logger log.Logger
blockReader services.FullBlockReader
}
// GetSnapshot retrieves the state snapshot at a given block.
@ -44,7 +46,7 @@ func (api *API) GetSnapshot(ctx context.Context, number *rpc.BlockNumber) (*Snap
return nil, err
}
defer tx.Rollback()
chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx}
chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx, BlockReader: api.blockReader}
// Retrieve the requested block number (or current if none requested)
var header *types.Header
@ -73,7 +75,7 @@ func (api *API) GetSnapshotAtHash(ctx context.Context, hash libcommon.Hash) (*Sn
return nil, err
}
defer tx.Rollback()
chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx}
chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx, BlockReader: api.blockReader}
header := chain.GetHeaderByHash(hash)
if header == nil {
@ -95,7 +97,7 @@ func (api *API) GetSigners(ctx context.Context, number *rpc.BlockNumber) ([]libc
return nil, err
}
defer tx.Rollback()
chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx}
chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx, BlockReader: api.blockReader}
// Retrieve the requested block number (or current if none requested)
var header *types.Header
@ -123,7 +125,7 @@ func (api *API) GetSignersAtHash(ctx context.Context, hash libcommon.Hash) ([]li
return nil, err
}
defer tx.Rollback()
chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx}
chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx, BlockReader: api.blockReader}
header := chain.GetHeaderByHash(hash)
if header == nil {
@ -182,7 +184,7 @@ func (api *API) Status(ctx context.Context) (*status, error) {
return nil, err
}
defer tx.Rollback()
chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx}
chain := consensus.ChainReaderImpl{Cfg: *api.clique.ChainConfig, Db: tx, BlockReader: api.blockReader}
var (
numBlocks = uint64(64)

View File

@ -30,6 +30,7 @@ import (
"github.com/goccy/go-json"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon-lib/chain"
@ -536,7 +537,7 @@ func (c *Clique) APIs(chain consensus.ChainHeaderReader) []rpc.API {
}
}
func NewCliqueAPI(db kv.RoDB, engine consensus.EngineReader) rpc.API {
func NewCliqueAPI(db kv.RoDB, engine consensus.EngineReader, blockReader services.FullBlockReader) rpc.API {
var c *Clique
if casted, ok := engine.(*Clique); ok {
c = casted
@ -545,7 +546,7 @@ func NewCliqueAPI(db kv.RoDB, engine consensus.EngineReader) rpc.API {
return rpc.API{
Namespace: "clique",
Version: "1.0",
Service: &API{db: db, clique: c},
Service: &API{db: db, clique: c, blockReader: blockReader},
Public: false,
}
}

View File

@ -17,7 +17,6 @@
package clique_test
import (
"context"
"math/big"
"testing"
@ -60,12 +59,13 @@ func TestReimportMirroredState(t *testing.T) {
}
copy(genspec.ExtraData[clique.ExtraVanity:], addr[:])
m := stages.MockWithGenesisEngine(t, genspec, engine, false)
br, _ := m.NewBlocksIO()
// Generate a batch of blocks, each properly signed
getHeader := func(hash libcommon.Hash, number uint64) (h *types.Header) {
if err := m.DB.View(context.Background(), func(tx kv.Tx) error {
h = rawdb.ReadHeader(tx, hash, number)
return nil
if err := m.DB.View(m.Ctx, func(tx kv.Tx) (err error) {
h, err = br.Header(m.Ctx, tx, hash, number)
return err
}); err != nil {
panic(err)
}
@ -109,8 +109,8 @@ func TestReimportMirroredState(t *testing.T) {
if err := m.InsertChain(chain.Slice(0, 2)); err != nil {
t.Fatalf("failed to insert initial blocks: %v", err)
}
if err := m.DB.View(context.Background(), func(tx kv.Tx) error {
if head, err1 := rawdb.ReadBlockByHash(tx, rawdb.ReadHeadHeaderHash(tx)); err1 != nil {
if err := m.DB.View(m.Ctx, func(tx kv.Tx) error {
if head, err1 := br.BlockByHash(m.Ctx, tx, rawdb.ReadHeadHeaderHash(tx)); err1 != nil {
t.Errorf("could not read chain head: %v", err1)
} else if head.NumberU64() != 2 {
t.Errorf("chain head mismatch: have %d, want %d", head.NumberU64(), 2)
@ -126,8 +126,8 @@ func TestReimportMirroredState(t *testing.T) {
if err := m.InsertChain(chain.Slice(2, chain.Length())); err != nil {
t.Fatalf("failed to insert final block: %v", err)
}
if err := m.DB.View(context.Background(), func(tx kv.Tx) error {
if head, err1 := rawdb.ReadBlockByHash(tx, rawdb.ReadHeadHeaderHash(tx)); err1 != nil {
if err := m.DB.View(m.Ctx, func(tx kv.Tx) error {
if head, err1 := br.CurrentBlock(tx); err1 != nil {
t.Errorf("could not read chain head: %v", err1)
} else if head.NumberU64() != 3 {
t.Errorf("chain head mismatch: have %d, want %d", head.NumberU64(), 3)

View File

@ -25,7 +25,7 @@ import (
func TestGenesisBlockHashes(t *testing.T) {
logger := log.New()
_, db, _ := temporal.NewTestDB(t, context.Background(), datadir.New(t.TempDir()), nil, logger)
_, _, db, _ := temporal.NewTestDB(t, context.Background(), datadir.New(t.TempDir()), nil, logger)
check := func(network string) {
genesis := core.GenesisBlockByChainName(network)
tx, err := db.BeginRw(context.Background())
@ -74,7 +74,7 @@ func TestGenesisBlockRoots(t *testing.T) {
func TestCommitGenesisIdempotency(t *testing.T) {
logger := log.New()
_, db, _ := temporal.NewTestDB(t, context.Background(), datadir.New(t.TempDir()), nil, logger)
_, _, db, _ := temporal.NewTestDB(t, context.Background(), datadir.New(t.TempDir()), nil, logger)
tx, err := db.BeginRw(context.Background())
require.NoError(t, err)
defer tx.Rollback()
@ -111,7 +111,7 @@ func TestAllocConstructor(t *testing.T) {
},
}
historyV3, db, _ := temporal.NewTestDB(t, context.Background(), datadir.New(t.TempDir()), nil, logger)
historyV3, _, db, _ := temporal.NewTestDB(t, context.Background(), datadir.New(t.TempDir()), nil, logger)
_, _, err := core.CommitGenesisBlock(db, genSpec, "", logger)
require.NoError(err)

View File

@ -39,7 +39,7 @@ import (
func getBlock(tb testing.TB, transactions int, uncles int, dataSize int, tmpDir string) *types.Block {
logger := log.New()
_, db, _ := temporal.NewTestDB(tb, context.Background(), datadir.New(tmpDir), nil, logger)
_, _, db, _ := temporal.NewTestDB(tb, context.Background(), datadir.New(tmpDir), nil, logger)
var (
aa = libcommon.HexToAddress("0x000000000000000000000000000000000000aaaa")
// Generate a canonical chain to act as the main dataset

View File

@ -473,8 +473,9 @@ func (tx *Tx) HistoryRange(name kv.History, fromTs, toTs int, asc order.By, limi
}
// TODO: need remove `gspec` param (move SystemContractCodeLookup feature somewhere)
func NewTestDB(tb testing.TB, ctx context.Context, dirs datadir.Dirs, gspec *types.Genesis, logger log.Logger) (histV3 bool, db kv.RwDB, agg *state.AggregatorV3) {
func NewTestDB(tb testing.TB, ctx context.Context, dirs datadir.Dirs, gspec *types.Genesis, logger log.Logger) (histV3, txsV3 bool, db kv.RwDB, agg *state.AggregatorV3) {
HistoryV3 := ethconfig.EnableHistoryV3InTest
TxsV3 := ethconfig.EnableTxsV3InTest
if tb != nil {
db = memdb.NewTestDB(tb)
@ -483,6 +484,7 @@ func NewTestDB(tb testing.TB, ctx context.Context, dirs datadir.Dirs, gspec *typ
}
_ = db.UpdateNosync(context.Background(), func(tx kv.RwTx) error {
_, _ = kvcfg.HistoryV3.WriteOnce(tx, HistoryV3)
_, _ = kvcfg.TransactionsV3.WriteOnce(tx, TxsV3)
return nil
})
@ -507,5 +509,5 @@ func NewTestDB(tb testing.TB, ctx context.Context, dirs datadir.Dirs, gspec *typ
panic(err)
}
}
return HistoryV3, db, agg
return HistoryV3, TxsV3, db, agg
}

View File

@ -34,6 +34,7 @@ import (
clcore "github.com/ledgerwatch/erigon/cl/phase1/core"
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
"github.com/holiman/uint256"
"github.com/ledgerwatch/log/v3"
@ -107,7 +108,6 @@ import (
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
stages2 "github.com/ledgerwatch/erigon/turbo/stages"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
)
@ -211,7 +211,54 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
return nil, err
}
var currentBlock *types.Block
if err := chainKv.Update(context.Background(), func(tx kv.RwTx) error {
if err = stagedsync.UpdateMetrics(tx); err != nil {
return err
}
config.Prune, err = prune.EnsureNotChanged(tx, config.Prune)
if err != nil {
return err
}
config.HistoryV3, err = kvcfg.HistoryV3.WriteOnce(tx, config.HistoryV3)
if err != nil {
return err
}
config.TransactionsV3, err = kvcfg.TransactionsV3.WriteOnce(tx, config.TransactionsV3)
if err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
ctx, ctxCancel := context.WithCancel(context.Background())
// kv_remote architecture does blocks on stream.Send - means current architecture require unlimited amount of txs to provide good throughput
backend := &Ethereum{
sentryCtx: ctx,
sentryCancel: ctxCancel,
config: config,
chainDB: chainKv,
networkID: config.NetworkID,
etherbase: config.Miner.Etherbase,
waitForStageLoopStop: make(chan struct{}),
waitForMiningStop: make(chan struct{}),
notifications: &shards.Notifications{
Events: shards.NewEvents(),
Accumulator: shards.NewAccumulator(),
},
logger: logger,
}
blockReader, blockWriter, allSnapshots, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config.Snapshot, backend.notifications.Events, config.TransactionsV3, logger)
if err != nil {
return nil, err
}
backend.agg, backend.blockSnapshots, backend.blockReader, backend.blockWriter = agg, allSnapshots, blockReader, blockWriter
// Check if we have an already initialized chain and fall back to
// that if so. Otherwise we need to generate a new genesis spec.
@ -232,40 +279,10 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
return genesisErr
}
currentBlock = rawdb.ReadCurrentBlock(tx)
return nil
}); err != nil {
panic(err)
}
config.Snapshot.Enabled = config.Sync.UseSnapshots
logger.Info("Initialised chain configuration", "config", chainConfig, "genesis", genesis.Hash())
if err := chainKv.Update(context.Background(), func(tx kv.RwTx) error {
if err = stagedsync.UpdateMetrics(tx); err != nil {
return err
}
config.Prune, err = prune.EnsureNotChanged(tx, config.Prune)
if err != nil {
return err
}
isCorrectSync, useSnapshots, err := snap.EnsureNotChanged(tx, config.Snapshot)
if err != nil {
return err
}
config.HistoryV3, err = kvcfg.HistoryV3.WriteOnce(tx, config.HistoryV3)
if err != nil {
return err
}
config.TransactionsV3, err = kvcfg.TransactionsV3.WriteOnce(tx, config.TransactionsV3)
if err != nil {
return err
}
// if we are in the incorrect syncmode then we change it to the appropriate one
if !isCorrectSync {
logger.Warn("Incorrect snapshot enablement", "got", config.Sync.UseSnapshots, "change_to", useSnapshots)
@ -276,35 +293,20 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
return nil
}); err != nil {
return nil, err
panic(err)
}
ctx, ctxCancel := context.WithCancel(context.Background())
backend.chainConfig = chainConfig
backend.genesisBlock = genesis
backend.genesisHash = genesis.Hash()
// kv_remote architecture does blocks on stream.Send - means current architecture require unlimited amount of txs to provide good throughput
backend := &Ethereum{
sentryCtx: ctx,
sentryCancel: ctxCancel,
config: config,
chainDB: chainKv,
networkID: config.NetworkID,
etherbase: config.Miner.Etherbase,
chainConfig: chainConfig,
genesisBlock: genesis,
genesisHash: genesis.Hash(),
waitForStageLoopStop: make(chan struct{}),
waitForMiningStop: make(chan struct{}),
notifications: &shards.Notifications{
Events: shards.NewEvents(),
Accumulator: shards.NewAccumulator(),
},
logger: logger,
}
blockReader, blockWriter, allSnapshots, agg, err := backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader, backend.notifications.Events, config.TransactionsV3)
if err != nil {
config.Snapshot.Enabled = config.Sync.UseSnapshots
logger.Info("Initialised chain configuration", "config", chainConfig, "genesis", genesis.Hash())
if err := backend.setUpSnapDownloader(ctx, config.Downloader); err != nil {
return nil, err
}
backend.agg, backend.blockSnapshots, backend.blockReader, backend.blockWriter = agg, allSnapshots, blockReader, blockWriter
if config.HistoryV3 {
backend.chainDB, err = temporal.New(backend.chainDB, agg, accounts.ConvertV3toV2, historyv2read.RestoreCodeHash, accounts.DecodeIncarnationFromStorage, systemcontracts.SystemContractCodeLookup[chainConfig.ChainName])
@ -441,6 +443,14 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
}
return nil
}
var currentBlock *types.Block
if err := chainKv.View(context.Background(), func(tx kv.Tx) error {
currentBlock, err = blockReader.CurrentBlock(tx)
return err
}); err != nil {
panic(err)
}
currentBlockNumber := uint64(0)
if currentBlock != nil {
currentBlockNumber = currentBlock.NumberU64()
@ -460,7 +470,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
}
backend.engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, consensusConfig, config.Miner.Notify, config.Miner.Noverify, config.HeimdallgRPCAddress, config.HeimdallURL,
config.WithoutHeimdall, stack.DataDir(), false /* readonly */, logger)
backend.forkValidator = engineapi.NewForkValidator(currentBlockNumber, inMemoryExecution, tmpdir)
backend.forkValidator = engineapi.NewForkValidator(currentBlockNumber, inMemoryExecution, tmpdir, backend.blockReader)
backend.sentriesClient, err = sentry.NewMultiClient(
chainKv,
@ -985,56 +995,63 @@ func (s *Ethereum) NodesInfo(limit int) (*remote.NodesInfoReply, error) {
}
// sets up blockReader and client downloader
func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snConfig ethconfig.Snapshot, downloaderCfg *downloadercfg.Cfg, notifications *shards.Events, transactionsV3 bool) (services.FullBlockReader, *blockio.BlockWriter, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) {
allSnapshots := snapshotsync.NewRoSnapshots(snConfig, dirs.Snap, s.logger)
func (s *Ethereum) setUpSnapDownloader(ctx context.Context, downloaderCfg *downloadercfg.Cfg) error {
var err error
if s.config.Snapshot.NoDownloader {
return nil
}
if s.config.Snapshot.DownloaderAddr != "" {
// connect to external Downloader
s.downloaderClient, err = downloadergrpc.NewClient(ctx, s.config.Snapshot.DownloaderAddr)
} else {
// start embedded Downloader
s.downloader, err = downloader3.New(ctx, downloaderCfg)
if err != nil {
return err
}
s.downloader.MainLoopInBackground(ctx, true)
bittorrentServer, err := downloader3.NewGrpcServer(s.downloader)
if err != nil {
return fmt.Errorf("new server: %w", err)
}
s.downloaderClient = direct.NewDownloaderClient(bittorrentServer)
}
s.agg.OnFreeze(func(frozenFileNames []string) {
events := s.notifications.Events
events.OnNewSnapshot()
if s.downloaderClient != nil {
req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, 0, len(frozenFileNames))}
for _, fName := range frozenFileNames {
req.Items = append(req.Items, &proto_downloader.DownloadItem{
Path: filepath.Join("history", fName),
})
}
if _, err := s.downloaderClient.Download(ctx, req); err != nil {
s.logger.Warn("[snapshots] notify downloader", "err", err)
}
}
})
return err
}
func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConfig ethconfig.Snapshot, notifications *shards.Events, transactionsV3 bool, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) {
allSnapshots := snapshotsync.NewRoSnapshots(snConfig, dirs.Snap, logger)
var err error
if !snConfig.NoDownloader {
allSnapshots.OptimisticalyReopenWithDB(s.chainDB)
allSnapshots.OptimisticalyReopenWithDB(db)
}
blockReader := snapshotsync.NewBlockReader(allSnapshots, transactionsV3)
blockWriter := blockio.NewBlockWriter(transactionsV3)
if !snConfig.NoDownloader {
if snConfig.DownloaderAddr != "" {
// connect to external Downloader
s.downloaderClient, err = downloadergrpc.NewClient(ctx, snConfig.DownloaderAddr)
} else {
// start embedded Downloader
s.downloader, err = downloader3.New(ctx, downloaderCfg)
if err != nil {
return nil, nil, nil, nil, err
}
s.downloader.MainLoopInBackground(ctx, true)
bittorrentServer, err := downloader3.NewGrpcServer(s.downloader)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("new server: %w", err)
}
s.downloaderClient = direct.NewDownloaderClient(bittorrentServer)
}
if err != nil {
return nil, nil, nil, nil, err
}
}
dir.MustExist(dirs.SnapHistory)
agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, s.chainDB, s.logger)
agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db, logger)
if err != nil {
return nil, nil, nil, nil, err
}
if err = agg.OpenFolder(); err != nil {
return nil, nil, nil, nil, err
}
agg.OnFreeze(func(frozenFileNames []string) {
notifications.OnNewSnapshot()
req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, 0, len(frozenFileNames))}
for _, fName := range frozenFileNames {
req.Items = append(req.Items, &proto_downloader.DownloadItem{
Path: filepath.Join("history", fName),
})
}
})
return blockReader, blockWriter, allSnapshots, agg, nil
}

View File

@ -4,4 +4,4 @@ package ethconfig
const EnableHistoryV3InTest = false
const EnableHistoryV4InTest = false
const EnableTransactionsV3InTest = false
const EnableTxsV3InTest = false

View File

@ -4,4 +4,4 @@ package ethconfig
const EnableHistoryV3InTest = true
const EnableHistoryV4InTest = false
const EnableTransactionsV3InTest = false
const EnableTxsV3InTest = false

View File

@ -4,4 +4,4 @@ package ethconfig
const EnableHistoryV3InTest = true
const EnableHistoryV4InTest = true
const EnableTransactionsV3InTest = false
const EnableTxsV3InTest = false

View File

@ -22,6 +22,7 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/common/dbutils"
@ -49,9 +50,11 @@ type ForkValidator struct {
extendingForkHeadHash libcommon.Hash
// this is the function we use to perform payload validation.
validatePayload validatePayloadFunc
blockReader services.FullBlockReader
// this is the current point where we processed the chain so far.
currentHeight uint64
tmpDir string
// we want fork validator to be thread safe so let
lock sync.Mutex
}
@ -63,12 +66,13 @@ func NewForkValidatorMock(currentHeight uint64) *ForkValidator {
}
}
func NewForkValidator(currentHeight uint64, validatePayload validatePayloadFunc, tmpDir string) *ForkValidator {
func NewForkValidator(currentHeight uint64, validatePayload validatePayloadFunc, tmpDir string, blockReader services.FullBlockReader) *ForkValidator {
return &ForkValidator{
sideForksBlock: make(map[libcommon.Hash]types.RawBlock),
validatePayload: validatePayload,
currentHeight: currentHeight,
tmpDir: tmpDir,
blockReader: blockReader,
}
}
@ -84,12 +88,12 @@ func (fv *ForkValidator) notifyTxPool(to uint64, accumulator *shards.Accumulator
if err != nil {
return fmt.Errorf("read canonical hash of unwind point: %w", err)
}
header := rawdb.ReadHeader(fv.extendingFork, hash, to)
header, _ := fv.blockReader.Header(context.Background(), fv.extendingFork, hash, to)
if header == nil {
return fmt.Errorf("could not find header for block: %d", to)
}
txs, err := rawdb.RawTransactionsRange(fv.extendingFork, to, to+1)
txs, err := fv.blockReader.RawTransactions(context.Background(), fv.extendingFork, to, to+1)
if err != nil {
return err
}
@ -282,7 +286,7 @@ func (fv *ForkValidator) validateAndStorePayload(tx kv.RwTx, header *types.Heade
// If we do not have the body we can recover it from the batch.
if body == nil {
var bodyFromDb *types.Body
bodyFromDb, criticalError = rawdb.ReadBodyWithTransactions(tx, header.Hash(), header.Number.Uint64())
bodyFromDb, criticalError = fv.blockReader.BodyWithTransactions(context.Background(), tx, header.Hash(), header.Number.Uint64())
if criticalError != nil {
return
}

View File

@ -3,7 +3,7 @@ package services
import (
"context"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/rlp"
@ -14,29 +14,33 @@ type All struct {
}
type BlockReader interface {
BlockWithSenders(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (block *types.Block, senders []libcommon.Address, err error)
BlockByNumber(ctx context.Context, db kv.Tx, number uint64) (*types.Block, error)
BlockByHash(ctx context.Context, db kv.Tx, hash common.Hash) (*types.Block, error)
CurrentBlock(db kv.Tx) (*types.Block, error)
BlockWithSenders(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (block *types.Block, senders []common.Address, err error)
TxsV3Enabled() bool
}
type HeaderReader interface {
Header(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (*types.Header, error)
Header(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (*types.Header, error)
HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (*types.Header, error)
HeaderByHash(ctx context.Context, tx kv.Getter, hash libcommon.Hash) (*types.Header, error)
HeaderByHash(ctx context.Context, tx kv.Getter, hash common.Hash) (*types.Header, error)
}
type CanonicalReader interface {
CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (libcommon.Hash, error)
CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (common.Hash, error)
}
type BodyReader interface {
BodyWithTransactions(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (body *types.Body, err error)
BodyRlp(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (bodyRlp rlp.RawValue, err error)
Body(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (body *types.Body, txAmount uint32, err error)
BodyWithTransactions(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, err error)
BodyRlp(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (bodyRlp rlp.RawValue, err error)
Body(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, txAmount uint32, err error)
}
type TxnReader interface {
TxnLookup(ctx context.Context, tx kv.Getter, txnHash libcommon.Hash) (uint64, bool, error)
TxnLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error)
TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNum uint64, i int) (txn types.Transaction, err error)
RawTransactions(ctx context.Context, tx kv.Getter, fromBlock, toBlock uint64) (txs [][]byte, err error)
}
type HeaderAndCanonicalReader interface {
HeaderReader

View File

@ -22,6 +22,37 @@ type RemoteBlockReader struct {
client remote.ETHBACKENDClient
}
func (r *RemoteBlockReader) CurrentBlock(db kv.Tx) (*types.Block, error) {
headHash := rawdb.ReadHeadBlockHash(db)
headNumber := rawdb.ReadHeaderNumber(db, headHash)
if headNumber == nil {
return nil, nil
}
block, _, err := r.BlockWithSenders(context.Background(), db, headHash, *headNumber)
return block, err
}
func (r *RemoteBlockReader) RawTransactions(ctx context.Context, tx kv.Getter, fromBlock, toBlock uint64) (txs [][]byte, err error) {
panic("not implemented")
}
func (r *RemoteBlockReader) BlockByNumber(ctx context.Context, db kv.Tx, number uint64) (*types.Block, error) {
hash, err := rawdb.ReadCanonicalHash(db, number)
if err != nil {
return nil, fmt.Errorf("failed ReadCanonicalHash: %w", err)
}
if hash == (libcommon.Hash{}) {
return nil, nil
}
block, _, err := r.BlockWithSenders(ctx, db, hash, number)
return block, err
}
func (r *RemoteBlockReader) BlockByHash(ctx context.Context, db kv.Tx, hash libcommon.Hash) (*types.Block, error) {
number := rawdb.ReadHeaderNumber(db, hash)
if number == nil {
return nil, nil
}
block, _, err := r.BlockWithSenders(ctx, db, hash, *number)
return block, err
}
func (r *RemoteBlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (*types.Header, error) {
canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight)
if err != nil {
@ -341,6 +372,13 @@ func (r *BlockReader) Body(ctx context.Context, tx kv.Getter, hash libcommon.Has
func (r *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (block *types.Block, senders []libcommon.Address, err error) {
if blockHeight >= r.sn.BlocksAvailable() {
if r.TransactionsV3 {
block, senders, err = rawdb.ReadBlockWithSenders(tx, hash, blockHeight)
if err != nil {
return nil, nil, err
}
return block, senders, nil
}
canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight)
if err != nil {
return nil, nil, fmt.Errorf("requested non-canonical hash %x. canonical=%x", hash, canonicalHash)
@ -738,3 +776,34 @@ func (r *BlockReader) IterateBodies(f func(blockNum, baseTxNum, txAmount uint64)
return nil
}
func (r *BlockReader) TxsV3Enabled() bool { return r.TransactionsV3 }
func (r *BlockReader) BlockByNumber(ctx context.Context, db kv.Tx, number uint64) (*types.Block, error) {
hash, err := rawdb.ReadCanonicalHash(db, number)
if err != nil {
return nil, fmt.Errorf("failed ReadCanonicalHash: %w", err)
}
if hash == (libcommon.Hash{}) {
return nil, nil
}
block, _, err := r.BlockWithSenders(ctx, db, hash, number)
return block, err
}
func (r *BlockReader) BlockByHash(ctx context.Context, db kv.Tx, hash libcommon.Hash) (*types.Block, error) {
number := rawdb.ReadHeaderNumber(db, hash)
if number == nil {
return nil, nil
}
block, _, err := r.BlockWithSenders(ctx, db, hash, *number)
return block, err
}
func (r *BlockReader) CurrentBlock(db kv.Tx) (*types.Block, error) {
headHash := rawdb.ReadHeadBlockHash(db)
headNumber := rawdb.ReadHeaderNumber(db, headHash)
if headNumber == nil {
return nil, nil
}
block, _, err := r.BlockWithSenders(context.Background(), db, headHash, *headNumber)
return block, err
}
func (r *BlockReader) RawTransactions(ctx context.Context, tx kv.Getter, fromBlock, toBlock uint64) (txs [][]byte, err error) {
return rawdb.RawTransactionsRange(tx, fromBlock, toBlock)
}

View File

@ -232,8 +232,9 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
logger := log.New()
ctx, ctxCancel := context.WithCancel(context.Background())
histV3, db, agg := temporal.NewTestDB(tb, ctx, dirs, gspec, logger)
histV3, txsV3, db, agg := temporal.NewTestDB(tb, ctx, dirs, gspec, logger)
cfg.HistoryV3 = histV3
cfg.TransactionsV3 = txsV3
erigonGrpcServeer := remotedbserver.NewKvServer(ctx, db, nil, nil, logger)
allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, logger)
@ -337,7 +338,8 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
}
return nil
}
forkValidator := engineapi.NewForkValidator(1, inMemoryExecution, dirs.Tmp)
br, _ := mock.NewBlocksIO()
forkValidator := engineapi.NewForkValidator(1, inMemoryExecution, dirs.Tmp, br)
networkID := uint64(1)
mock.sentriesClient, err = sentry.NewMultiClient(
mock.DB,