Introduce BlockWriter object (txsV3 step 0) (#7559)

This commit is contained in:
Alex Sharov 2023-05-22 15:49:21 +07:00 committed by GitHub
parent a8ec9eb471
commit 4d0dee6fb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 532 additions and 301 deletions

View File

@ -50,7 +50,6 @@ import (
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/event"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/stages"
)
@ -124,11 +123,12 @@ func NewTestSimulatedBackendWithConfig(t *testing.T, alloc types.GenesisAlloc, c
}
func (b *SimulatedBackend) DB() kv.RwDB { return b.m.DB }
func (b *SimulatedBackend) Agg() *state2.AggregatorV3 { return b.m.HistoryV3Components() }
func (b *SimulatedBackend) HistoryV3() bool { return b.m.HistoryV3 }
func (b *SimulatedBackend) Engine() consensus.Engine { return b.m.Engine }
func (b *SimulatedBackend) BlockReader() services.FullBlockReader {
return snapshotsync.NewBlockReader(b.m.BlockSnapshots, b.m.TransactionsV3)
br, _ := b.m.NewBlocksIO()
return br
}
func (b *SimulatedBackend) HistoryV3() bool { return b.m.HistoryV3 }
func (b *SimulatedBackend) Engine() consensus.Engine { return b.m.Engine }
// Close terminates the underlying blockchain's update loop.
func (b *SimulatedBackend) Close() {

View File

@ -5,18 +5,21 @@ import (
"net"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/gointerfaces/execution"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"google.golang.org/grpc"
"github.com/ledgerwatch/log/v3"
)
func main() {
datadir := flag.String("datadir", "", "non in-memory db for EL simulation")
datadirPtr := flag.String("datadir2", "", "non in-memory db for EL simulation")
flag.Parse()
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))
lis, err := net.Listen("tcp", "127.0.0.1:8989")
@ -24,19 +27,24 @@ func main() {
log.Warn("[Exec] could not serve service", "reason", err)
}
maxReceiveSize := 500 * datasize.MB
dirs := datadir.New(*datadirPtr)
s := grpc.NewServer(grpc.MaxRecvMsgSize(int(maxReceiveSize)))
var db kv.RwDB
if *datadir == "" {
if *datadirPtr == "" {
db = memdb.New("")
} else {
db, err = mdbx.Open(*datadir, log.Root(), false)
db, err = mdbx.Open(dirs.DataDir, log.Root(), false)
if err != nil {
log.Error("Could not open database", "err", err)
return
}
}
execution.RegisterExecutionServer(s, NewEth1Execution(db))
blockSnapshots := snapshotsync.NewRoSnapshots(ethconfig.Snapshot{Enabled: false}, dirs.Snap, log.New())
transactionsV3 := false
blockReader := snapshotsync.NewBlockReader(blockSnapshots, transactionsV3)
blockWriter := blockio.NewBlockWriter(transactionsV3)
execution.RegisterExecutionServer(s, NewEth1Execution(db, blockReader, blockWriter))
log.Info("Serving mock Execution layer.")
if err := s.Serve(lis); err != nil {
log.Error("failed to serve", "err", err)

View File

@ -14,6 +14,8 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces/execution"
types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
@ -23,13 +25,17 @@ import (
type Eth1Execution struct {
execution.UnimplementedExecutionServer
db kv.RwDB
mu sync.Mutex
db kv.RwDB
blockReader services.FullBlockReader
blockWriter *blockio.BlockWriter
mu sync.Mutex
}
func NewEth1Execution(db kv.RwDB) *Eth1Execution {
func NewEth1Execution(db kv.RwDB, blockReader services.FullBlockReader, blockWriter *blockio.BlockWriter) *Eth1Execution {
return &Eth1Execution{
db: db,
db: db,
blockReader: blockReader,
blockWriter: blockWriter,
}
}
@ -47,7 +53,9 @@ func (e *Eth1Execution) InsertHeaders(ctx context.Context, req *execution.Insert
if err != nil {
return nil, err
}
rawdb.WriteHeader(tx, h)
if err := e.blockWriter.WriteHeader(tx, h); err != nil {
return nil, err
}
}
return &execution.EmptyMessage{}, tx.Commit()
}
@ -80,7 +88,7 @@ func (e *Eth1Execution) InsertBodies(ctx context.Context, req *execution.InsertB
Amount: withdrawal.Amount,
})
}
if _, _, err := rawdb.WriteRawBodyIfNotExists(tx, gointerfaces.ConvertH256ToHash(body.BlockHash),
if _, _, err := e.blockWriter.WriteRawBodyIfNotExists(tx, gointerfaces.ConvertH256ToHash(body.BlockHash),
body.BlockNumber, &types.RawBody{
Transactions: body.Transactions,
Uncles: uncles,
@ -119,12 +127,21 @@ func (e *Eth1Execution) GetHeader(ctx context.Context, req *execution.GetSegment
var header *types.Header
if req.BlockHash != nil && req.BlockNumber != nil {
blockHash := gointerfaces.ConvertH256ToHash(req.BlockHash)
header = rawdb.ReadHeader(tx, blockHash, *req.BlockNumber)
header, err = e.blockReader.Header(ctx, tx, blockHash, *req.BlockNumber)
if err != nil {
return nil, err
}
} else if req.BlockHash != nil {
blockHash := gointerfaces.ConvertH256ToHash(req.BlockHash)
header, err = rawdb.ReadHeaderByHash(tx, blockHash)
header, err = e.blockReader.HeaderByHash(ctx, tx, blockHash)
if err != nil {
return nil, err
}
} else if req.BlockNumber != nil {
header = rawdb.ReadHeaderByNumber(tx, *req.BlockNumber)
header, err = e.blockReader.HeaderByNumber(ctx, tx, *req.BlockNumber)
if err != nil {
return nil, err
}
}
if err != nil {
return nil, err
@ -152,14 +169,28 @@ func (e *Eth1Execution) GetBody(ctx context.Context, req *execution.GetSegmentRe
var body *types.Body
if req.BlockHash != nil && req.BlockNumber != nil {
blockHash := gointerfaces.ConvertH256ToHash(req.BlockHash)
body = rawdb.ReadCanonicalBodyWithTransactions(tx, blockHash, *req.BlockNumber)
if ok, err := rawdb.IsCanonicalHash(tx, blockHash); err != nil {
return nil, err
} else if ok {
body, err = e.blockReader.BodyWithTransactions(ctx, tx, blockHash, *req.BlockNumber)
if err != nil {
return nil, err
}
}
} else if req.BlockHash != nil {
blockHash := gointerfaces.ConvertH256ToHash(req.BlockHash)
blockNumber := rawdb.ReadHeaderNumber(tx, blockHash)
if blockNumber == nil {
return nil, nil
if ok, err := rawdb.IsCanonicalHash(tx, blockHash); err != nil {
return nil, err
} else if ok {
blockNumber := rawdb.ReadHeaderNumber(tx, blockHash)
if blockNumber == nil {
return nil, nil
}
body, err = e.blockReader.BodyWithTransactions(ctx, tx, blockHash, *req.BlockNumber)
if err != nil {
return nil, err
}
}
body = rawdb.ReadCanonicalBodyWithTransactions(tx, blockHash, *blockNumber)
}
if err != nil {
return nil, err

View File

@ -16,6 +16,7 @@ import (
"github.com/c2h5oh/datasize"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
@ -151,6 +152,7 @@ type Ethereum struct {
forkValidator *engineapi.ForkValidator
downloader *downloader3.Downloader
blockReader services.FullBlockReader
blockWriter *blockio.BlockWriter
agg *libstate.AggregatorV3
logger log.Logger
@ -265,23 +267,21 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
}
var (
allSnapshots *snapshotsync.RoSnapshots
agg *libstate.AggregatorV3
)
backend.blockReader, allSnapshots, agg, err = backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader, config.TransactionsV3)
backend.blockReader, backend.blockWriter, allSnapshots, backend.agg, err = backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader, config.TransactionsV3)
if err != nil {
return nil, err
}
backend.agg = agg
if config.HistoryV3 {
backend.chainDB, err = temporal.New(backend.chainDB, agg, accounts.ConvertV3toV2, historyv2read.RestoreCodeHash, accounts.DecodeIncarnationFromStorage, systemcontracts.SystemContractCodeLookup[chainConfig.ChainName])
backend.chainDB, err = temporal.New(backend.chainDB, backend.agg, accounts.ConvertV3toV2, historyv2read.RestoreCodeHash, accounts.DecodeIncarnationFromStorage, systemcontracts.SystemContractCodeLookup[chainConfig.ChainName])
if err != nil {
return nil, err
}
chainKv = backend.chainDB
}
kvRPC := remotedbserver.NewKvServer(ctx, chainKv, allSnapshots, agg, logger)
kvRPC := remotedbserver.NewKvServer(ctx, chainKv, allSnapshots, backend.agg, logger)
backend.notifications.StateChangesConsumer = kvRPC
backend.gasPrice, _ = uint256.FromBig(config.Miner.GasPrice)
@ -388,7 +388,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
return err
}
// We start the mining step
if err := stages2.StateStep(ctx, batch, stateSync, backend.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil {
if err := stages2.StateStep(ctx, batch, backend.blockWriter, stateSync, backend.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil {
logger.Warn("Could not validate block", "err", err)
return err
}
@ -840,13 +840,14 @@ func (s *Ethereum) NodesInfo(limit int) (*remote.NodesInfoReply, error) {
}
// sets up blockReader and client downloader
func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snConfig ethconfig.Snapshot, downloaderCfg *downloadercfg.Cfg, transactionsV3 bool) (services.FullBlockReader, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) {
func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snConfig ethconfig.Snapshot, downloaderCfg *downloadercfg.Cfg, transactionsV3 bool) (services.FullBlockReader, *blockio.BlockWriter, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) {
allSnapshots := snapshotsync.NewRoSnapshots(snConfig, dirs.Snap, s.logger)
var err error
if !snConfig.NoDownloader {
allSnapshots.OptimisticalyReopenWithDB(s.chainDB)
}
blockReader := snapshotsync.NewBlockReader(allSnapshots, transactionsV3)
blockWriter := blockio.NewBlockWriter(transactionsV3)
if !snConfig.NoDownloader {
if snConfig.DownloaderAddr != "" {
@ -856,28 +857,28 @@ func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snCo
// start embedded Downloader
s.downloader, err = downloader3.New(ctx, downloaderCfg)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
s.downloader.MainLoopInBackground(ctx, true)
bittorrentServer, err := downloader3.NewGrpcServer(s.downloader)
if err != nil {
return nil, nil, nil, fmt.Errorf("new server: %w", err)
return nil, nil, nil, nil, fmt.Errorf("new server: %w", err)
}
s.downloaderClient = direct.NewDownloaderClient(bittorrentServer)
}
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
}
dir.MustExist(dirs.SnapHistory)
agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, s.chainDB, s.logger)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err = agg.OpenFolder(); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
agg.OnFreeze(func(frozenFileNames []string) {
events := s.notifications.Events
@ -895,7 +896,7 @@ func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snCo
}
})
return blockReader, allSnapshots, agg, nil
return blockReader, blockWriter, allSnapshots, agg, nil
}
func (s *Ethereum) Peers(ctx context.Context) (*remote.PeersReply, error) {
@ -936,7 +937,7 @@ func (s *Ethereum) Start() error {
}
maxReceiveSize := 500 * datasize.MB
server := grpc.NewServer(grpc.MaxRecvMsgSize(int(maxReceiveSize)))
execution.RegisterExecutionServer(server, eth1.NewEth1Execution(s.chainDB, s.blockReader, s.stagedSync))
execution.RegisterExecutionServer(server, eth1.NewEth1Execution(s.chainDB, s.blockReader, s.blockWriter, s.stagedSync))
s.logger.Info("Execution Module Server started!")
if err := server.Serve(lis); err != nil {
panic(err)

View File

@ -13,6 +13,7 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces/execution"
types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/log/v3"
@ -30,14 +31,16 @@ type Eth1Execution struct {
db kv.RwDB
executionPipeline *stagedsync.Sync
blockReader services.FullBlockReader
blockWriter *blockio.BlockWriter
mu sync.Mutex
}
func NewEth1Execution(db kv.RwDB, blockReader services.FullBlockReader, executionPipeline *stagedsync.Sync) *Eth1Execution {
func NewEth1Execution(db kv.RwDB, blockReader services.FullBlockReader, blockWriter *blockio.BlockWriter, executionPipeline *stagedsync.Sync) *Eth1Execution {
return &Eth1Execution{
db: db,
executionPipeline: executionPipeline,
blockReader: blockReader,
blockWriter: blockWriter,
}
}
@ -55,7 +58,9 @@ func (e *Eth1Execution) InsertHeaders(ctx context.Context, req *execution.Insert
if err != nil {
return nil, err
}
rawdb.WriteHeader(tx, h)
if err := e.blockWriter.WriteHeader(tx, h); err != nil {
return nil, err
}
}
return &execution.EmptyMessage{}, tx.Commit()
}
@ -79,7 +84,7 @@ func (e *Eth1Execution) InsertBodies(ctx context.Context, req *execution.InsertB
uncles = append(uncles, h)
}
// Withdrawals processing
if _, _, err := rawdb.WriteRawBodyIfNotExists(tx, gointerfaces.ConvertH256ToHash(body.BlockHash),
if _, _, err := e.blockWriter.WriteRawBodyIfNotExists(tx, gointerfaces.ConvertH256ToHash(body.BlockHash),
body.BlockNumber, &types.RawBody{
Transactions: body.Transactions,
Uncles: uncles,

View File

@ -6,6 +6,7 @@ import (
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/cmd/sentry/sentry"
"github.com/ledgerwatch/erigon/consensus"
@ -48,6 +49,7 @@ func NewStagedSync(ctx context.Context,
dirs := cfg.Dirs
blockReader := snapshotsync.NewBlockReader(snapshots, transactionsV3)
blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, snapshots, db, snapDownloader, notifications.Events, logger)
blockWriter := blockio.NewBlockWriter(transactionsV3)
// During Import we don't want other services like header requests, body requests etc. to be running.
// Hence we run it in the test mode.
@ -80,12 +82,13 @@ func NewStagedSync(ctx context.Context,
p2pCfg.NoDiscovery,
snapshots,
blockReader,
blockWriter,
dirs.Tmp,
notifications,
forkValidator,
),
stagedsync.StageCumulativeIndexCfg(db),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageBodiesCfg(
db,
controlServer.Bd,
@ -99,7 +102,7 @@ func NewStagedSync(ctx context.Context,
cfg.HistoryV3,
cfg.TransactionsV3,
),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockRetire, controlServer.Hd),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockRetire, blockWriter, controlServer.Hd),
stagedsync.StageExecuteBlocksCfg(
db,
cfg.Prune,
@ -124,7 +127,7 @@ func NewStagedSync(ctx context.Context,
stagedsync.StageHistoryCfg(db, cfg.Prune, dirs.Tmp),
stagedsync.StageLogIndexCfg(db, cfg.Prune, dirs.Tmp),
stagedsync.StageCallTracesCfg(db, cfg.Prune, 0, dirs.Tmp),
stagedsync.StageTxLookupCfg(db, cfg.Prune, dirs.Tmp, snapshots, controlServer.ChainConfig.Bor),
stagedsync.StageTxLookupCfg(db, cfg.Prune, dirs.Tmp, snapshots, controlServer.ChainConfig.Bor, blockReader),
stagedsync.StageFinishCfg(db, dirs.Tmp, forkValidator),
runInTestMode),
stagedsync.DefaultUnwindOrder,

View File

@ -11,6 +11,7 @@ import (
"time"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/secp256k1"
"github.com/spf13/cobra"
@ -694,7 +695,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
sn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer agg.Close()
br := getBlockReader(db, logger)
br, bw := blocksIO(db, logger)
engine, _, _, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
chainConfig, _, _ := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db), fromdb.PruneMode(db)
@ -705,7 +706,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
if reset {
dirs := datadir.New(datadirCli)
if err := reset2.ResetBlocks(tx, db, sn, agg, br, dirs, *chainConfig, engine, logger); err != nil {
if err := reset2.ResetBlocks(tx, db, sn, agg, br, bw, dirs, *chainConfig, engine, logger); err != nil {
return err
}
return nil
@ -730,7 +731,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
return fmt.Errorf("re-read Headers progress: %w", err)
}
{ // hard-unwind stage_body also
if err := rawdb.TruncateBlocks(ctx, tx, progress+1); err != nil {
if err := bw.TruncateBlocks(ctx, tx, progress+1); err != nil {
return err
}
progressBodies, err := stages.GetStageProgress(tx, stages.Bodies)
@ -747,7 +748,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
if err = rawdb.TruncateCanonicalHash(tx, progress+1, false); err != nil {
return err
}
if err = rawdb.TruncateTd(tx, progress+1); err != nil {
if err = bw.TruncateTd(tx, progress+1); err != nil {
return err
}
hash, err := rawdb.ReadCanonicalHash(tx, progress-1)
@ -779,7 +780,9 @@ func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error {
}
u := sync.NewUnwindState(stages.Bodies, s.BlockNumber-unwind, s.BlockNumber)
if err := stagedsync.UnwindBodiesStage(u, tx, stagedsync.StageBodiesCfg(db, nil, nil, nil, nil, 0, *chainConfig, sn, getBlockReader(db, logger), historyV3, transactionsV3), ctx); err != nil {
br, _ := blocksIO(db, logger)
cfg := stagedsync.StageBodiesCfg(db, nil, nil, nil, nil, 0, *chainConfig, sn, br, historyV3, transactionsV3)
if err := stagedsync.UnwindBodiesStage(u, tx, cfg, ctx); err != nil {
return err
}
@ -809,7 +812,8 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
must(sync.SetCurrentStage(stages.Senders))
if reset {
return db.Update(ctx, func(tx kv.RwTx) error { return reset2.ResetSenders(ctx, db, tx) })
_, bw := blocksIO(db, logger)
return db.Update(ctx, func(tx kv.RwTx) error { return reset2.ResetSenders(ctx, db, tx, bw) })
}
tx, err := db.BeginRw(ctx)
@ -867,7 +871,8 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
return err
}
cfg := stagedsync.StageSendersCfg(db, chainConfig, false, tmpdir, pm, br, nil)
_, bw := blocksIO(db, logger)
cfg := stagedsync.StageSendersCfg(db, chainConfig, false, tmpdir, pm, br, bw, nil)
if unwind > 0 {
u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber)
if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil {
@ -930,9 +935,10 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {
syncCfg.ReconWorkerCount = int(reconWorkers)
genesis := core.GenesisBlockByChainName(chain)
br, _ := blocksIO(db, logger)
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, nil, chainConfig, engine, vmConfig, nil,
/*stateStream=*/ false,
/*badBlockHalt=*/ false, historyV3, dirs, getBlockReader(db, logger), nil, genesis, syncCfg, agg)
/*badBlockHalt=*/ false, historyV3, dirs, br, nil, genesis, syncCfg, agg)
if unwind > 0 {
u := sync.NewUnwindState(stages.Execution, s.BlockNumber-unwind, s.BlockNumber)
err := stagedsync.UnwindExecutionStage(u, s, nil, ctx, cfg, true, logger)
@ -993,7 +999,8 @@ func stageTrie(db kv.RwDB, ctx context.Context, logger log.Logger) error {
logger.Info("StageExec", "progress", execStage.BlockNumber)
logger.Info("StageTrie", "progress", s.BlockNumber)
cfg := stagedsync.StageTrieCfg(db, true /* checkRoot */, true /* saveHashesToDb */, false /* badBlockHalt */, dirs.Tmp, getBlockReader(db, logger), nil /* hd */, historyV3, agg)
br, _ := blocksIO(db, logger)
cfg := stagedsync.StageTrieCfg(db, true /* checkRoot */, true /* saveHashesToDb */, false /* badBlockHalt */, dirs.Tmp, br, nil /* hd */, historyV3, agg)
if unwind > 0 {
u := sync.NewUnwindState(stages.IntermediateHashes, s.BlockNumber-unwind, s.BlockNumber)
if err := stagedsync.UnwindIntermediateHashesStage(u, s, tx, cfg, ctx, logger); err != nil {
@ -1293,7 +1300,8 @@ func stageTxLookup(db kv.RwDB, ctx context.Context, logger log.Logger) error {
}
logger.Info("Stage", "name", s.ID, "progress", s.BlockNumber)
cfg := stagedsync.StageTxLookupCfg(db, pm, dirs.Tmp, sn, chainConfig.Bor)
br, _ := blocksIO(db, logger)
cfg := stagedsync.StageTxLookupCfg(db, pm, dirs.Tmp, sn, chainConfig.Bor, br)
if unwind > 0 {
u := sync.NewUnwindState(stages.TxLookup, s.BlockNumber-unwind, s.BlockNumber)
err = stagedsync.UnwindTxLookup(u, s, tx, cfg, ctx, logger)
@ -1395,14 +1403,16 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*snapshot
var openBlockReaderOnce sync.Once
var _blockReaderSingleton services.FullBlockReader
var _blockWriterSingleton *blockio.BlockWriter
func getBlockReader(db kv.RoDB, logger log.Logger) (blockReader services.FullBlockReader) {
func blocksIO(db kv.RoDB, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter) {
openBlockReaderOnce.Do(func() {
sn, _ := allSnapshots(context.Background(), db, logger)
transactionsV3 := kvcfg.TransactionsV3.FromDB(db)
_blockReaderSingleton = snapshotsync.NewBlockReader(sn, transactionsV3)
_blockWriterSingleton = blockio.NewBlockWriter(transactionsV3)
})
return _blockReaderSingleton
return _blockReaderSingleton, _blockWriterSingleton
}
var openDomainsOnce sync.Once
@ -1513,7 +1523,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
engine := initConsensusEngine(chainConfig, cfg.Dirs.DataDir, db, logger)
br := getBlockReader(db, logger)
br, _ := blocksIO(db, logger)
sentryControlServer, err := sentry.NewMultiClient(
db,
"",

View File

@ -302,13 +302,14 @@ func loopProcessDomains(chainDb, stateDb kv.RwDB, ctx context.Context, logger lo
}
aggWriter, aggReader := WrapAggregator(agg, stateTx)
br, _ := blocksIO(chainDb, logger)
proc := blockProcessor{
chainConfig: fromdb.ChainConfig(chainDb),
vmConfig: vm.Config{},
engine: engine,
reader: aggReader,
writer: aggWriter,
blockReader: getBlockReader(chainDb, logger),
blockReader: br,
stateTx: stateTx,
stateDb: stateDb,
blockNum: latestBlock,

View File

@ -231,8 +231,9 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.
syncCfg.ExecWorkerCount = int(workers)
syncCfg.ReconWorkerCount = int(reconWorkers)
br, _ := blocksIO(db, logger1)
execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, changeSetHook, chainConfig, engine, vmConfig, changesAcc, false, false, historyV3, dirs,
getBlockReader(db, logger1), nil, genesis, syncCfg, agg)
br, nil, genesis, syncCfg, agg)
execUntilFunc := func(execToBlock uint64) func(firstCycle bool, badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx, logger log.Logger) error {
return func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx, logger log.Logger) error {
@ -485,8 +486,9 @@ func loopIh(db kv.RwDB, ctx context.Context, unwind uint64, logger log.Logger) e
}
_ = sync.SetCurrentStage(stages.IntermediateHashes)
u = &stagedsync.UnwindState{ID: stages.IntermediateHashes, UnwindPoint: to}
br, _ := blocksIO(db, logger)
if err = stagedsync.UnwindIntermediateHashesStage(u, stage(sync, tx, nil, stages.IntermediateHashes), tx, stagedsync.StageTrieCfg(db, true, true, false, dirs.Tmp,
getBlockReader(db, logger), nil, historyV3, agg), ctx, logger); err != nil {
br, nil, historyV3, agg), ctx, logger); err != nil {
return err
}
must(tx.Commit())
@ -564,9 +566,10 @@ func loopExec(db kv.RwDB, ctx context.Context, unwind uint64, logger log.Logger)
syncCfg.ReconWorkerCount = int(reconWorkers)
initialCycle := false
br, _ := blocksIO(db, logger)
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, nil, chainConfig, engine, vmConfig, nil,
/*stateStream=*/ false,
/*badBlockHalt=*/ false, historyV3, dirs, getBlockReader(db, logger), nil, genesis, syncCfg, agg)
/*badBlockHalt=*/ false, historyV3, dirs, br, nil, genesis, syncCfg, agg)
// set block limit of execute stage
sync.MockExecFunc(stages.Execution, func(firstCycle bool, badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx, logger log.Logger) error {

View File

@ -44,6 +44,9 @@ func NewRemoteBackend(client remote.ETHBACKENDClient, db kv.RoDB, blockReader se
}
}
func (back *RemoteBackend) TxsV3Enabled() bool {
panic("not implemented")
}
func (back *RemoteBackend) EnsureVersionCompatibility() bool {
versionReply, err := back.remoteEthBackend.Version(context.Background(), &emptypb.Empty{}, grpc.WaitForReady(true))
if err != nil {

View File

@ -27,6 +27,8 @@ import (
"github.com/c2h5oh/datasize"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
@ -246,10 +248,19 @@ func write(tx kv.RwTx, g *types.Genesis, tmpDir string) (*types.Block, *state.In
if err := config.CheckConfigForkOrder(); err != nil {
return nil, nil, err
}
if err := rawdb.WriteTd(tx, block.Hash(), block.NumberU64(), g.Difficulty); err != nil {
transactionV3, err := kvcfg.TransactionsV3.Enabled(tx)
if err != nil {
return nil, nil, err
}
if err := rawdb.WriteBlock(tx, block); err != nil {
blockWriter := blockio.NewBlockWriter(transactionV3)
if err := blockWriter.WriteHeader(tx, block.HeaderNoCopy()); err != nil {
return nil, nil, err
}
if err := blockWriter.WriteBody(tx, block.Hash(), block.NumberU64(), block.Body()); err != nil {
return nil, nil, err
}
if err := blockWriter.WriteTd(tx, block.Hash(), block.NumberU64(), g.Difficulty); err != nil {
return nil, nil, err
}
if err := rawdbv3.TxNums.WriteForGenesis(tx, 1); err != nil {

View File

@ -320,23 +320,38 @@ func ReadHeadersByNumber(db kv.Tx, number uint64) ([]*types.Header, error) {
// WriteHeader stores a block header into the database and also stores the hash-
// to-number mapping.
func WriteHeader(db kv.Putter, header *types.Header) {
func WriteHeader(db kv.RwTx, header *types.Header) error {
var (
hash = header.Hash()
number = header.Number.Uint64()
encoded = hexutility.EncodeTs(number)
hash = header.Hash()
number = header.Number.Uint64()
encoded = hexutility.EncodeTs(number)
headerKey = dbutils.HeaderKey(number, hash)
)
if err := db.Put(kv.HeaderNumber, hash[:], encoded); err != nil {
log.Crit("Failed to store hash to number mapping", "err", err)
return fmt.Errorf("HeaderNumber mapping: %w", err)
}
// Write the encoded header
data, err := rlp.EncodeToBytes(header)
if err != nil {
log.Crit("Failed to RLP encode header", "err", err)
return fmt.Errorf("WriteHeader: %w", err)
}
if err := db.Put(kv.Headers, dbutils.HeaderKey(number, hash), data); err != nil {
log.Crit("Failed to store header", "err", err)
if err := db.Put(kv.Headers, headerKey, data); err != nil {
return fmt.Errorf("WriteHeader: %w", err)
}
return nil
}
func WriteHeaderRaw(db kv.StatelessRwTx, number uint64, hash libcommon.Hash, headerRlp []byte, skipIndexing bool) error {
if err := db.Put(kv.Headers, dbutils.HeaderKey(number, hash), headerRlp); err != nil {
return err
}
if skipIndexing {
return nil
}
if err := db.Put(kv.HeaderNumber, hash[:], hexutility.EncodeTs(number)); err != nil {
return err
}
return nil
}
// deleteHeader - dangerous, use DeleteAncientBlocks/TruncateBlocks methods
@ -1225,8 +1240,7 @@ func WriteBlock(db kv.RwTx, block *types.Block) error {
if err := WriteBody(db, block.Hash(), block.NumberU64(), block.Body()); err != nil {
return err
}
WriteHeader(db, block.Header())
return nil
return WriteHeader(db, block.Header())
}
// DeleteAncientBlocks - delete [1, to) old blocks after moving it to snapshots.

View File

@ -0,0 +1,116 @@
package blockio
import (
"context"
"encoding/binary"
"math/big"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/hexutility"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/turbo/backup"
"github.com/ledgerwatch/log/v3"
)
// BlockReader can read blocks from db and snapshots
type BlockWriter struct {
txsV3 bool
}
func NewBlockWriter(txsV3 bool) *BlockWriter { return &BlockWriter{txsV3: txsV3} }
func (w *BlockWriter) TxsV3Enabled() bool { return w.txsV3 }
func (w *BlockWriter) WriteHeader(tx kv.RwTx, header *types.Header) error {
return rawdb.WriteHeader(tx, header)
}
func (w *BlockWriter) WriteHeaderRaw(tx kv.StatelessRwTx, number uint64, hash common.Hash, headerRlp []byte, skipIndexing bool) error {
return rawdb.WriteHeaderRaw(tx, number, hash, headerRlp, skipIndexing)
}
func (w *BlockWriter) WriteCanonicalHash(tx kv.RwTx, hash common.Hash, number uint64) error {
return rawdb.WriteCanonicalHash(tx, hash, number)
}
func (w *BlockWriter) WriteTd(db kv.Putter, hash common.Hash, number uint64, td *big.Int) error {
return rawdb.WriteTd(db, hash, number, td)
}
func (w *BlockWriter) FillHeaderNumberIndex(logPrefix string, tx kv.RwTx, tmpDir string, from, to uint64, ctx context.Context, logger log.Logger) error {
startKey := make([]byte, 8)
binary.BigEndian.PutUint64(startKey, from)
endKey := dbutils.HeaderKey(to, common.Hash{}) // etl.Tranform uses ExractEndKey as exclusive bound, therefore +1
return etl.Transform(
logPrefix,
tx,
kv.Headers,
kv.HeaderNumber,
tmpDir,
extractHeaders,
etl.IdentityLoadFunc,
etl.TransformArgs{
ExtractStartKey: startKey,
ExtractEndKey: endKey,
Quit: ctx.Done(),
},
logger,
)
}
func extractHeaders(k []byte, v []byte, next etl.ExtractNextFunc) error {
// We only want to extract entries composed by Block Number + Header Hash
if len(k) != 40 {
return nil
}
return next(k, common.Copy(k[8:]), common.Copy(k[:8]))
}
func (w *BlockWriter) WriteRawBodyIfNotExists(tx kv.RwTx, hash common.Hash, number uint64, body *types.RawBody) (ok bool, lastTxnNum uint64, err error) {
return rawdb.WriteRawBodyIfNotExists(tx, hash, number, body)
}
func (w *BlockWriter) WriteBody(tx kv.RwTx, hash common.Hash, number uint64, body *types.Body) error {
return rawdb.WriteBody(tx, hash, number, body)
}
func (w *BlockWriter) TruncateBodies(db kv.RoDB, tx kv.RwTx, from uint64) error {
fromB := hexutility.EncodeTs(from)
if err := tx.ForEach(kv.BlockBody, fromB, func(k, _ []byte) error { return tx.Delete(kv.BlockBody, k) }); err != nil {
return err
}
ethtx := kv.EthTx
transactionV3, err := kvcfg.TransactionsV3.Enabled(tx)
if err != nil {
panic(err)
}
if transactionV3 {
ethtx = kv.EthTxV3
}
if err := backup.ClearTables(context.Background(), db, tx,
kv.NonCanonicalTxs,
ethtx,
kv.MaxTxNum,
); err != nil {
return err
}
if err := rawdb.ResetSequence(tx, ethtx, 0); err != nil {
return err
}
if err := rawdb.ResetSequence(tx, kv.NonCanonicalTxs, 0); err != nil {
return err
}
return nil
}
func (w *BlockWriter) TruncateBlocks(ctx context.Context, tx kv.RwTx, blockFrom uint64) error {
return rawdb.TruncateBlocks(ctx, tx, blockFrom)
}
func (w *BlockWriter) TruncateTd(tx kv.RwTx, blockFrom uint64) error {
return rawdb.TruncateTd(tx, blockFrom)
}
func (w *BlockWriter) ResetSenders(ctx context.Context, db kv.RoDB, tx kv.RwTx) error {
return backup.ClearTables(ctx, db, tx, kv.Senders)
}

View File

@ -6,13 +6,13 @@ import (
"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/common/hexutility"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/turbo/backup"
@ -52,9 +52,9 @@ func ResetState(db kv.RwDB, ctx context.Context, chain string, tmpDir string) er
}
func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, agg *state.AggregatorV3,
br services.FullBlockReader, dirs datadir.Dirs, cc chain.Config, engine consensus.Engine, logger log.Logger) error {
br services.FullBlockReader, bw *blockio.BlockWriter, dirs datadir.Dirs, cc chain.Config, engine consensus.Engine, logger log.Logger) error {
// keep Genesis
if err := rawdb.TruncateBlocks(context.Background(), tx, 1); err != nil {
if err := bw.TruncateBlocks(context.Background(), tx, 1); err != nil {
return err
}
if err := stages.SaveStageProgress(tx, stages.Bodies, 1); err != nil {
@ -71,7 +71,7 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, ag
if err := rawdb.TruncateCanonicalHash(tx, 1, false); err != nil {
return err
}
if err := rawdb.TruncateTd(tx, 1); err != nil {
if err := bw.TruncateTd(tx, 1); err != nil {
return err
}
hash, err := rawdb.ReadCanonicalHash(tx, 0)
@ -83,29 +83,7 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, ag
}
// ensure no garbage records left (it may happen if db is inconsistent)
if err := tx.ForEach(kv.BlockBody, hexutility.EncodeTs(2), func(k, _ []byte) error { return tx.Delete(kv.BlockBody, k) }); err != nil {
return err
}
ethtx := kv.EthTx
transactionV3, err := kvcfg.TransactionsV3.Enabled(tx)
if err != nil {
panic(err)
}
if transactionV3 {
ethtx = kv.EthTxV3
}
if err := backup.ClearTables(context.Background(), db, tx,
kv.NonCanonicalTxs,
ethtx,
kv.MaxTxNum,
); err != nil {
return err
}
if err := rawdb.ResetSequence(tx, ethtx, 0); err != nil {
return err
}
if err := rawdb.ResetSequence(tx, kv.NonCanonicalTxs, 0); err != nil {
if err := bw.TruncateBodies(db, tx, 2); err != nil {
return err
}
@ -121,8 +99,8 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, snapshots *snapshotsync.RoSnapshots, ag
return nil
}
func ResetSenders(ctx context.Context, db kv.RwDB, tx kv.RwTx) error {
if err := backup.ClearTables(ctx, db, tx, kv.Senders); err != nil {
func ResetSenders(ctx context.Context, db kv.RwDB, tx kv.RwTx, bw *blockio.BlockWriter) error {
if err := bw.ResetSenders(ctx, db, tx); err != nil {
return nil
}
return clearStageProgress(tx, stages.Senders)

View File

@ -33,6 +33,7 @@ import (
clcore "github.com/ledgerwatch/erigon/cl/phase1/core"
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/holiman/uint256"
"github.com/ledgerwatch/log/v3"
@ -175,6 +176,7 @@ type Ethereum struct {
agg *libstate.AggregatorV3
blockSnapshots *snapshotsync.RoSnapshots
blockReader services.FullBlockReader
blockWriter *blockio.BlockWriter
kvRPC *remotedbserver.KvServer
logger log.Logger
}
@ -298,11 +300,11 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
},
logger: logger,
}
blockReader, allSnapshots, agg, err := backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader, backend.notifications.Events, config.TransactionsV3)
blockReader, blockWriter, allSnapshots, agg, err := backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader, backend.notifications.Events, config.TransactionsV3)
if err != nil {
return nil, err
}
backend.agg, backend.blockSnapshots, backend.blockReader = agg, allSnapshots, blockReader
backend.agg, backend.blockSnapshots, backend.blockReader, backend.blockWriter = agg, allSnapshots, blockReader, blockWriter
if config.HistoryV3 {
backend.chainDB, err = temporal.New(backend.chainDB, agg, accounts.ConvertV3toV2, historyv2read.RestoreCodeHash, accounts.DecodeIncarnationFromStorage, systemcontracts.SystemContractCodeLookup[chainConfig.ChainName])
@ -426,7 +428,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
return err
}
// We start the mining step
if err := stages2.StateStep(ctx, batch, stateSync, backend.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil {
if err := stages2.StateStep(ctx, batch, backend.blockWriter, stateSync, backend.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil {
logger.Warn("Could not validate block", "err", err)
return err
}
@ -968,13 +970,14 @@ func (s *Ethereum) NodesInfo(limit int) (*remote.NodesInfoReply, error) {
}
// sets up blockReader and client downloader
func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snConfig ethconfig.Snapshot, downloaderCfg *downloadercfg.Cfg, notifications *shards.Events, transactionsV3 bool) (services.FullBlockReader, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) {
func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snConfig ethconfig.Snapshot, downloaderCfg *downloadercfg.Cfg, notifications *shards.Events, transactionsV3 bool) (services.FullBlockReader, *blockio.BlockWriter, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) {
allSnapshots := snapshotsync.NewRoSnapshots(snConfig, dirs.Snap, s.logger)
var err error
if !snConfig.NoDownloader {
allSnapshots.OptimisticalyReopenWithDB(s.chainDB)
}
blockReader := snapshotsync.NewBlockReader(allSnapshots, transactionsV3)
blockWriter := blockio.NewBlockWriter(transactionsV3)
if !snConfig.NoDownloader {
if snConfig.DownloaderAddr != "" {
@ -984,28 +987,28 @@ func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snCo
// start embedded Downloader
s.downloader, err = downloader3.New(ctx, downloaderCfg)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
s.downloader.MainLoopInBackground(ctx, true)
bittorrentServer, err := downloader3.NewGrpcServer(s.downloader)
if err != nil {
return nil, nil, nil, fmt.Errorf("new server: %w", err)
return nil, nil, nil, nil, fmt.Errorf("new server: %w", err)
}
s.downloaderClient = direct.NewDownloaderClient(bittorrentServer)
}
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
}
dir.MustExist(dirs.SnapHistory)
agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, s.chainDB, s.logger)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err = agg.OpenFolder(); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
agg.OnFreeze(func(frozenFileNames []string) {
notifications.OnNewSnapshot()
@ -1017,7 +1020,7 @@ func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snCo
}
})
return blockReader, allSnapshots, agg, nil
return blockReader, blockWriter, allSnapshots, agg, nil
}
func (s *Ethereum) Peers(ctx context.Context) (*remote.PeersReply, error) {

View File

@ -2,38 +2,29 @@ package stagedsync
import (
"context"
"encoding/binary"
"fmt"
"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/log/v3"
)
func extractHeaders(k []byte, v []byte, next etl.ExtractNextFunc) error {
// We only want to extract entries composed by Block Number + Header Hash
if len(k) != 40 {
return nil
}
return next(k, libcommon.Copy(k[8:]), libcommon.Copy(k[:8]))
}
type BlockHashesCfg struct {
db kv.RwDB
tmpDir string
cc *chain.Config
headerWriter *blockio.BlockWriter
}
func StageBlockHashesCfg(db kv.RwDB, tmpDir string, cc *chain.Config) BlockHashesCfg {
func StageBlockHashesCfg(db kv.RwDB, tmpDir string, cc *chain.Config, headerWriter *blockio.BlockWriter) BlockHashesCfg {
return BlockHashesCfg{
db: db,
tmpDir: tmpDir,
cc: cc,
db: db,
tmpDir: tmpDir,
cc: cc,
headerWriter: headerWriter,
}
}
@ -46,7 +37,6 @@ func SpawnBlockHashStage(s *StageState, tx kv.RwTx, cfg BlockHashesCfg, ctx cont
}
defer tx.Rollback()
}
quit := ctx.Done()
headNumber, err := stages.GetStageProgress(tx, stages.Headers)
if err != nil {
return fmt.Errorf("getting headers progress: %w", err)
@ -55,29 +45,11 @@ func SpawnBlockHashStage(s *StageState, tx kv.RwTx, cfg BlockHashesCfg, ctx cont
return nil
}
startKey := make([]byte, 8)
binary.BigEndian.PutUint64(startKey, s.BlockNumber)
endKey := dbutils.HeaderKey(headNumber+1, libcommon.Hash{}) // etl.Tranform uses ExractEndKey as exclusive bound, therefore +1
//todo do we need non canonical headers ?
logPrefix := s.LogPrefix()
if err := etl.Transform(
logPrefix,
tx,
kv.Headers,
kv.HeaderNumber,
cfg.tmpDir,
extractHeaders,
etl.IdentityLoadFunc,
etl.TransformArgs{
ExtractStartKey: startKey,
ExtractEndKey: endKey,
Quit: quit,
},
logger,
); err != nil {
// etl.Tranform uses ExractEndKey as exclusive bound, therefore +1
if err := cfg.headerWriter.FillHeaderNumberIndex(s.LogPrefix(), tx, cfg.tmpDir, s.BlockNumber, headNumber+1, ctx, logger); err != nil {
return err
}
if err = s.Update(tx, headNumber); err != nil {
return err
}

View File

@ -16,6 +16,7 @@ import (
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/common"
@ -50,6 +51,7 @@ type HeadersCfg struct {
snapshots *snapshotsync.RoSnapshots
blockReader services.FullBlockReader
blockWriter *blockio.BlockWriter
forkValidator *engineapi.ForkValidator
notifications *shards.Notifications
}
@ -66,6 +68,7 @@ func StageHeadersCfg(
noP2PDiscovery bool,
snapshots *snapshotsync.RoSnapshots,
blockReader services.FullBlockReader,
blockWriter *blockio.BlockWriter,
tmpdir string,
notifications *shards.Notifications,
forkValidator *engineapi.ForkValidator) HeadersCfg {
@ -82,6 +85,7 @@ func StageHeadersCfg(
noP2PDiscovery: noP2PDiscovery,
snapshots: snapshots,
blockReader: blockReader,
blockWriter: blockWriter,
forkValidator: forkValidator,
notifications: notifications,
}
@ -178,7 +182,7 @@ func HeadersPOS(
interrupt, requestId, requestWithStatus := cfg.hd.BeaconRequestList.WaitForRequest(syncing, test)
cfg.hd.SetHeaderReader(&ChainReaderImpl{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader})
headerInserter := headerdownload.NewHeaderInserter(s.LogPrefix(), nil, s.BlockNumber, cfg.blockReader)
headerInserter := headerdownload.NewHeaderInserter(s.LogPrefix(), nil, s.BlockNumber, cfg.blockReader, cfg.blockWriter)
interrupted, err := handleInterrupt(interrupt, cfg, tx, headerInserter, useExternalTx, logger)
if err != nil {
@ -802,7 +806,7 @@ func HeadersPOW(
if localTd == nil {
return fmt.Errorf("localTD is nil: %d, %x", headerProgress, hash)
}
headerInserter := headerdownload.NewHeaderInserter(logPrefix, localTd, headerProgress, cfg.blockReader)
headerInserter := headerdownload.NewHeaderInserter(logPrefix, localTd, headerProgress, cfg.blockReader, cfg.blockWriter)
cfg.hd.SetHeaderReader(&ChainReaderImpl{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader})
stopped := false

View File

@ -16,6 +16,8 @@ import (
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/secp256k1"
@ -42,9 +44,11 @@ type SendersCfg struct {
chainConfig *chain.Config
blockRetire *snapshotsync.BlockRetire
hd *headerdownload.HeaderDownload
blockReader services.FullBlockReader
blockWriter *blockio.BlockWriter
}
func StageSendersCfg(db kv.RwDB, chainCfg *chain.Config, badBlockHalt bool, tmpdir string, prune prune.Mode, br *snapshotsync.BlockRetire, hd *headerdownload.HeaderDownload) SendersCfg {
func StageSendersCfg(db kv.RwDB, chainCfg *chain.Config, badBlockHalt bool, tmpdir string, prune prune.Mode, br *snapshotsync.BlockRetire, blockWriter *blockio.BlockWriter, hd *headerdownload.HeaderDownload) SendersCfg {
const sendersBatchSize = 10000
const sendersBlockSize = 4096
@ -61,6 +65,8 @@ func StageSendersCfg(db kv.RwDB, chainCfg *chain.Config, badBlockHalt bool, tmpd
prune: prune,
blockRetire: br,
hd: hd,
blockWriter: blockWriter,
}
}
@ -68,6 +74,7 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
if cfg.blockRetire != nil && cfg.blockRetire.Snapshots() != nil && cfg.blockRetire.Snapshots().Cfg().Enabled && s.BlockNumber < cfg.blockRetire.Snapshots().BlocksAvailable() {
s.BlockNumber = cfg.blockRetire.Snapshots().BlocksAvailable()
}
txsV3Enabled := cfg.blockWriter.TxsV3Enabled() // allow stor senders for non-canonical blocks
quitCh := ctx.Done()
useExternalTx := tx != nil
@ -110,25 +117,27 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
currentHeaderIdx := uint64(0)
canonical := make([]libcommon.Hash, to-s.BlockNumber)
for k, v, err := canonicalC.Seek(hexutility.EncodeTs(startFrom)); k != nil; k, v, err = canonicalC.Next() {
if err != nil {
return err
}
if err := libcommon.Stopped(quitCh); err != nil {
return err
}
if !txsV3Enabled {
for k, v, err := canonicalC.Seek(hexutility.EncodeTs(startFrom)); k != nil; k, v, err = canonicalC.Next() {
if err != nil {
return err
}
if err := libcommon.Stopped(quitCh); err != nil {
return err
}
if currentHeaderIdx >= to-s.BlockNumber { // if header stage is ehead of body stage
break
}
if currentHeaderIdx >= to-s.BlockNumber { // if header stage is ehead of body stage
break
}
copy(canonical[currentHeaderIdx][:], v)
currentHeaderIdx++
copy(canonical[currentHeaderIdx][:], v)
currentHeaderIdx++
select {
default:
case <-logEvery.C:
logger.Info(fmt.Sprintf("[%s] Preload headers", logPrefix), "block_number", binary.BigEndian.Uint64(k))
select {
default:
case <-logEvery.C:
logger.Info(fmt.Sprintf("[%s] Preload headers", logPrefix), "block_number", binary.BigEndian.Uint64(k))
}
}
}
logger.Trace(fmt.Sprintf("[%s] Read canonical hashes", logPrefix), "amount", len(canonical))
@ -226,15 +235,25 @@ Loop:
break
}
if canonical[blockNumber-s.BlockNumber-1] != blockHash {
// non-canonical case
continue
}
body := rawdb.ReadCanonicalBodyWithTransactions(tx, blockHash, blockNumber)
if body == nil {
logger.Warn(fmt.Sprintf("[%s] ReadCanonicalBodyWithTransactions can't find block", logPrefix), "num", blockNumber, "hash", blockHash)
continue
var body *types.Body
if txsV3Enabled {
if body, err = cfg.blockReader.BodyWithTransactions(ctx, tx, blockHash, blockNumber); err != nil {
return err
}
if body == nil {
logger.Warn(fmt.Sprintf("[%s] blockReader.BodyWithTransactions can't find block", logPrefix), "num", blockNumber, "hash", blockHash)
continue
}
} else {
if canonical[blockNumber-s.BlockNumber-1] != blockHash {
// non-canonical case
continue
}
body = rawdb.ReadCanonicalBodyWithTransactions(tx, blockHash, blockNumber)
if body == nil {
logger.Warn(fmt.Sprintf("[%s] ReadCanonicalBodyWithTransactions can't find block", logPrefix), "num", blockNumber, "hash", blockHash)
continue
}
}
select {

View File

@ -6,6 +6,7 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -25,6 +26,9 @@ func TestSenders(t *testing.T) {
ctx := context.Background()
db, tx := memdb.NewTestTx(t)
require := require.New(t)
//allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, logger)
bw := blockio.NewBlockWriter(false)
var testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
testAddr := crypto.PubkeyToAddress(testKey.PublicKey)
@ -37,7 +41,7 @@ func TestSenders(t *testing.T) {
// prepare tx so it works with our test
signer1 := types.MakeSigner(params.TestChainConfig, params.TestChainConfig.BerlinBlock.Uint64())
require.NoError(rawdb.WriteBody(tx, libcommon.HexToHash("01"), 1, &types.Body{
require.NoError(bw.WriteBody(tx, libcommon.HexToHash("01"), 1, &types.Body{
Transactions: []types.Transaction{
mustSign(&types.AccessListTx{
LegacyTx: types.LegacyTx{
@ -66,7 +70,7 @@ func TestSenders(t *testing.T) {
require.NoError(rawdb.WriteCanonicalHash(tx, libcommon.HexToHash("01"), 1))
signer2 := types.MakeSigner(params.TestChainConfig, params.TestChainConfig.BerlinBlock.Uint64())
require.NoError(rawdb.WriteBody(tx, libcommon.HexToHash("02"), 2, &types.Body{
require.NoError(bw.WriteBody(tx, libcommon.HexToHash("02"), 2, &types.Body{
Transactions: []types.Transaction{
mustSign(&types.AccessListTx{
LegacyTx: types.LegacyTx{
@ -103,18 +107,22 @@ func TestSenders(t *testing.T) {
}, *signer2),
},
}))
require.NoError(rawdb.WriteCanonicalHash(tx, libcommon.HexToHash("02"), 2))
require.NoError(rawdb.WriteBody(tx, libcommon.HexToHash("03"), 3, &types.Body{
err := bw.WriteBody(tx, libcommon.HexToHash("03"), 3, &types.Body{
Transactions: []types.Transaction{}, Uncles: []*types.Header{{GasLimit: 3}},
}))
})
require.NoError(err)
require.NoError(rawdb.WriteCanonicalHash(tx, libcommon.HexToHash("03"), 3))
require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3))
cfg := StageSendersCfg(db, params.TestChainConfig, false, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, db, nil, nil, logger), nil)
err := SpawnRecoverSendersStage(cfg, &StageState{ID: stages.Senders}, nil, tx, 3, ctx, log.New())
assert.NoError(t, err)
br := snapshotsync.NewBlockRetire(1, "", nil, db, nil, nil, logger)
cfg := StageSendersCfg(db, params.TestChainConfig, false, "", prune.Mode{}, br, bw, nil)
err = SpawnRecoverSendersStage(cfg, &StageState{ID: stages.Senders}, nil, tx, 3, ctx, log.New())
require.NoError(err)
{
found := rawdb.ReadCanonicalBodyWithTransactions(tx, libcommon.HexToHash("01"), 1)

View File

@ -12,6 +12,7 @@ import (
"github.com/ledgerwatch/erigon-lib/common/hexutility"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/core/rawdb"
@ -21,11 +22,12 @@ import (
)
type TxLookupCfg struct {
db kv.RwDB
prune prune.Mode
tmpdir string
snapshots *snapshotsync.RoSnapshots
borConfig *chain.BorConfig
db kv.RwDB
prune prune.Mode
tmpdir string
snapshots *snapshotsync.RoSnapshots
borConfig *chain.BorConfig
blockReader services.FullBlockReader
}
func StageTxLookupCfg(
@ -34,18 +36,19 @@ func StageTxLookupCfg(
tmpdir string,
snapshots *snapshotsync.RoSnapshots,
borConfig *chain.BorConfig,
blockReader services.FullBlockReader,
) TxLookupCfg {
return TxLookupCfg{
db: db,
prune: prune,
tmpdir: tmpdir,
snapshots: snapshots,
borConfig: borConfig,
db: db,
prune: prune,
tmpdir: tmpdir,
snapshots: snapshots,
borConfig: borConfig,
blockReader: blockReader,
}
}
func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, ctx context.Context, logger log.Logger) (err error) {
quitCh := ctx.Done()
useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(ctx)
@ -90,12 +93,12 @@ func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, c
startBlock++
}
// etl.Transform uses ExtractEndKey as exclusive bound, therefore endBlock + 1
if err = txnLookupTransform(logPrefix, tx, startBlock, endBlock+1, quitCh, cfg, logger); err != nil {
if err = txnLookupTransform(logPrefix, tx, startBlock, endBlock+1, ctx, cfg, logger); err != nil {
return fmt.Errorf("txnLookupTransform: %w", err)
}
if cfg.borConfig != nil {
if err = borTxnLookupTransform(logPrefix, tx, startBlock, endBlock+1, quitCh, cfg, logger); err != nil {
if err = borTxnLookupTransform(logPrefix, tx, startBlock, endBlock+1, ctx.Done(), cfg, logger); err != nil {
return fmt.Errorf("borTxnLookupTransform: %w", err)
}
}
@ -113,8 +116,13 @@ func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, c
}
// txnLookupTransform - [startKey, endKey)
func txnLookupTransform(logPrefix string, tx kv.RwTx, blockFrom, blockTo uint64, quitCh <-chan struct{}, cfg TxLookupCfg, logger log.Logger) error {
func txnLookupTransform(logPrefix string, tx kv.RwTx, blockFrom, blockTo uint64, ctx context.Context, cfg TxLookupCfg, logger log.Logger) (err error) {
bigNum := new(big.Int)
txsV3Enabled := cfg.blockReader.TxsV3Enabled()
if txsV3Enabled {
panic("implement me. need iterate by kv.BlockID instead of kv.HeaderCanonical")
}
return etl.Transform(logPrefix, tx, kv.HeaderCanonical, kv.TxLookup, cfg.tmpdir, func(k, v []byte, next etl.ExtractNextFunc) error {
blocknum, blockHash := binary.BigEndian.Uint64(k), libcommon.CastToHash(v)
body := rawdb.ReadCanonicalBodyWithTransactions(tx, blockHash, blocknum)
@ -131,7 +139,7 @@ func txnLookupTransform(logPrefix string, tx kv.RwTx, blockFrom, blockTo uint64,
return nil
}, etl.IdentityLoadFunc, etl.TransformArgs{
Quit: quitCh,
Quit: ctx.Done(),
ExtractStartKey: hexutility.EncodeTs(blockFrom),
ExtractEndKey: hexutility.EncodeTs(blockTo),
LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) {

View File

@ -5,7 +5,6 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/rlp"
)
@ -16,6 +15,7 @@ type All struct {
type BlockReader interface {
BlockWithSenders(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (block *types.Block, senders []libcommon.Address, err error)
TxsV3Enabled() bool
}
type HeaderReader interface {
@ -56,3 +56,19 @@ type FullBlockReader interface {
TxnReader
CanonicalReader
}
/*
type HeaderWriter interface {
WriteHeader(tx kv.RwTx, header *types.Header) error
WriteHeaderRaw(tx kv.StatelessRwTx, number uint64, hash libcommon.Hash, headerRlp []byte, skipIndexing bool) error
WriteCanonicalHash(tx kv.RwTx, hash libcommon.Hash, number uint64) error
WriteTd(db kv.Putter, hash libcommon.Hash, number uint64, td *big.Int) error
// [from,to)
FillHeaderNumberIndex(logPrefix string, tx kv.RwTx, tmpDir string, from, to uint64, ctx context.Context, logger log.Logger) error
}
type BlockWriter interface {
HeaderWriter
WriteRawBodyIfNotExists(tx kv.RwTx, hash libcommon.Hash, number uint64, body *types.RawBody) (ok bool, lastTxnNum uint64, err error)
WriteBody(tx kv.RwTx, hash libcommon.Hash, number uint64, body *types.Body) error
}
*/

View File

@ -22,42 +22,45 @@ type RemoteBlockReader struct {
client remote.ETHBACKENDClient
}
func (back *RemoteBlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (*types.Header, error) {
func (r *RemoteBlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (*types.Header, error) {
canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight)
if err != nil {
return nil, err
}
block, _, err := back.BlockWithSenders(ctx, tx, canonicalHash, blockHeight)
block, _, err := r.BlockWithSenders(ctx, tx, canonicalHash, blockHeight)
if err != nil {
return nil, err
}
return block.Header(), nil
}
func (back *RemoteBlockReader) Snapshots() *RoSnapshots { return nil }
func (r *RemoteBlockReader) Snapshots() *RoSnapshots { return nil }
func (back *RemoteBlockReader) HeaderByHash(ctx context.Context, tx kv.Getter, hash libcommon.Hash) (*types.Header, error) {
func (r *RemoteBlockReader) HeaderByHash(ctx context.Context, tx kv.Getter, hash libcommon.Hash) (*types.Header, error) {
blockNum := rawdb.ReadHeaderNumber(tx, hash)
if blockNum == nil {
return nil, nil
}
block, _, err := back.BlockWithSenders(ctx, tx, hash, *blockNum)
block, _, err := r.BlockWithSenders(ctx, tx, hash, *blockNum)
if err != nil {
return nil, err
}
return block.Header(), nil
}
func (back *RemoteBlockReader) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (libcommon.Hash, error) {
func (r *RemoteBlockReader) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (libcommon.Hash, error) {
return rawdb.ReadCanonicalHash(tx, blockHeight)
}
func NewRemoteBlockReader(client remote.ETHBACKENDClient) *RemoteBlockReader {
return &RemoteBlockReader{client}
}
func (r *RemoteBlockReader) TxsV3Enabled() bool {
panic("not implemented")
}
func (back *RemoteBlockReader) TxnLookup(ctx context.Context, tx kv.Getter, txnHash libcommon.Hash) (uint64, bool, error) {
reply, err := back.client.TxnLookup(ctx, &remote.TxnLookupRequest{TxnHash: gointerfaces.ConvertHashToH256(txnHash)})
func (r *RemoteBlockReader) TxnLookup(ctx context.Context, tx kv.Getter, txnHash libcommon.Hash) (uint64, bool, error) {
reply, err := r.client.TxnLookup(ctx, &remote.TxnLookupRequest{TxnHash: gointerfaces.ConvertHashToH256(txnHash)})
if err != nil {
return 0, false, err
}
@ -67,12 +70,12 @@ func (back *RemoteBlockReader) TxnLookup(ctx context.Context, tx kv.Getter, txnH
return reply.BlockNumber, true, nil
}
func (back *RemoteBlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNum uint64, i int) (txn types.Transaction, err error) {
func (r *RemoteBlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNum uint64, i int) (txn types.Transaction, err error) {
canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockNum)
if err != nil {
return nil, err
}
b, err := back.BodyWithTransactions(ctx, tx, canonicalHash, blockNum)
b, err := r.BodyWithTransactions(ctx, tx, canonicalHash, blockNum)
if err != nil {
return nil, err
}
@ -88,8 +91,8 @@ func (back *RemoteBlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter
return b.Transactions[i], nil
}
func (back *RemoteBlockReader) BlockWithSenders(ctx context.Context, _ kv.Getter, hash libcommon.Hash, blockHeight uint64) (block *types.Block, senders []libcommon.Address, err error) {
reply, err := back.client.Block(ctx, &remote.BlockRequest{BlockHash: gointerfaces.ConvertHashToH256(hash), BlockHeight: blockHeight})
func (r *RemoteBlockReader) BlockWithSenders(ctx context.Context, _ kv.Getter, hash libcommon.Hash, blockHeight uint64) (block *types.Block, senders []libcommon.Address, err error) {
reply, err := r.client.Block(ctx, &remote.BlockRequest{BlockHash: gointerfaces.ConvertHashToH256(hash), BlockHeight: blockHeight})
if err != nil {
return nil, nil, err
}
@ -109,8 +112,8 @@ func (back *RemoteBlockReader) BlockWithSenders(ctx context.Context, _ kv.Getter
return block, senders, nil
}
func (back *RemoteBlockReader) Header(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (*types.Header, error) {
block, _, err := back.BlockWithSenders(ctx, tx, hash, blockHeight)
func (r *RemoteBlockReader) Header(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (*types.Header, error) {
block, _, err := r.BlockWithSenders(ctx, tx, hash, blockHeight)
if err != nil {
return nil, err
}
@ -119,8 +122,8 @@ func (back *RemoteBlockReader) Header(ctx context.Context, tx kv.Getter, hash li
}
return block.Header(), nil
}
func (back *RemoteBlockReader) Body(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (body *types.Body, txAmount uint32, err error) {
block, _, err := back.BlockWithSenders(ctx, tx, hash, blockHeight)
func (r *RemoteBlockReader) Body(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (body *types.Body, txAmount uint32, err error) {
block, _, err := r.BlockWithSenders(ctx, tx, hash, blockHeight)
if err != nil {
return nil, 0, err
}
@ -129,8 +132,8 @@ func (back *RemoteBlockReader) Body(ctx context.Context, tx kv.Getter, hash libc
}
return block.Body(), uint32(len(block.Body().Transactions)), nil
}
func (back *RemoteBlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (body *types.Body, err error) {
block, _, err := back.BlockWithSenders(ctx, tx, hash, blockHeight)
func (r *RemoteBlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (body *types.Body, err error) {
block, _, err := r.BlockWithSenders(ctx, tx, hash, blockHeight)
if err != nil {
return nil, err
}
@ -140,8 +143,8 @@ func (back *RemoteBlockReader) BodyWithTransactions(ctx context.Context, tx kv.G
return block.Body(), nil
}
func (back *RemoteBlockReader) BodyRlp(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (bodyRlp rlp.RawValue, err error) {
body, err := back.BodyWithTransactions(ctx, tx, hash, blockHeight)
func (r *RemoteBlockReader) BodyRlp(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (bodyRlp rlp.RawValue, err error) {
body, err := r.BodyWithTransactions(ctx, tx, hash, blockHeight)
if err != nil {
return nil, err
}
@ -162,22 +165,22 @@ func NewBlockReader(snapshots *RoSnapshots, transactionsV3 bool) *BlockReader {
return &BlockReader{sn: snapshots, TransactionsV3: transactionsV3}
}
func (back *BlockReader) Snapshots() *RoSnapshots { return back.sn }
func (r *BlockReader) Snapshots() *RoSnapshots { return r.sn }
func (back *BlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (h *types.Header, err error) {
func (r *BlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (h *types.Header, err error) {
h = rawdb.ReadHeaderByNumber(tx, blockHeight)
if h != nil {
return h, nil
}
view := back.sn.View()
view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
if !ok {
return
}
h, _, err = back.headerFromSnapshot(blockHeight, seg, nil)
h, _, err = r.headerFromSnapshot(blockHeight, seg, nil)
if err != nil {
return nil, err
}
@ -185,7 +188,7 @@ func (back *BlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, block
}
// HeaderByHash - will search header in all snapshots starting from recent
func (back *BlockReader) HeaderByHash(ctx context.Context, tx kv.Getter, hash libcommon.Hash) (h *types.Header, err error) {
func (r *BlockReader) HeaderByHash(ctx context.Context, tx kv.Getter, hash libcommon.Hash) (h *types.Header, err error) {
h, err = rawdb.ReadHeaderByHash(tx, hash)
if err != nil {
return nil, err
@ -194,7 +197,7 @@ func (back *BlockReader) HeaderByHash(ctx context.Context, tx kv.Getter, hash li
return h, nil
}
view := back.sn.View()
view := r.sn.View()
defer view.Close()
segments := view.Headers()
@ -204,7 +207,7 @@ func (back *BlockReader) HeaderByHash(ctx context.Context, tx kv.Getter, hash li
continue
}
h, err = back.headerFromSnapshotByHash(hash, segments[i], buf)
h, err = r.headerFromSnapshotByHash(hash, segments[i], buf)
if err != nil {
return nil, err
}
@ -217,7 +220,7 @@ func (back *BlockReader) HeaderByHash(ctx context.Context, tx kv.Getter, hash li
var emptyHash = libcommon.Hash{}
func (back *BlockReader) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (h libcommon.Hash, err error) {
func (r *BlockReader) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (h libcommon.Hash, err error) {
h, err = rawdb.ReadCanonicalHash(tx, blockHeight)
if err != nil {
return h, err
@ -226,14 +229,14 @@ func (back *BlockReader) CanonicalHash(ctx context.Context, tx kv.Getter, blockH
return h, nil
}
view := back.sn.View()
view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
if !ok {
return
}
header, _, err := back.headerFromSnapshot(blockHeight, seg, nil)
header, _, err := r.headerFromSnapshot(blockHeight, seg, nil)
if err != nil {
return h, err
}
@ -244,26 +247,26 @@ func (back *BlockReader) CanonicalHash(ctx context.Context, tx kv.Getter, blockH
return h, nil
}
func (back *BlockReader) Header(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (h *types.Header, err error) {
func (r *BlockReader) Header(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (h *types.Header, err error) {
h = rawdb.ReadHeader(tx, hash, blockHeight)
if h != nil {
return h, nil
}
view := back.sn.View()
view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
if !ok {
return
}
h, _, err = back.headerFromSnapshot(blockHeight, seg, nil)
h, _, err = r.headerFromSnapshot(blockHeight, seg, nil)
if err != nil {
return h, err
}
return h, nil
}
func (back *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (body *types.Body, err error) {
func (r *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (body *types.Body, err error) {
body, err = rawdb.ReadBodyWithTransactions(tx, hash, blockHeight)
if err != nil {
return nil, err
@ -272,7 +275,7 @@ func (back *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter,
return body, nil
}
view := back.sn.View()
view := r.sn.View()
defer view.Close()
var baseTxnID uint64
@ -282,7 +285,7 @@ func (back *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter,
if !ok {
return nil, nil
}
body, baseTxnID, txsAmount, buf, err = back.bodyFromSnapshot(blockHeight, seg, buf)
body, baseTxnID, txsAmount, buf, err = r.bodyFromSnapshot(blockHeight, seg, buf)
if err != nil {
return nil, err
}
@ -293,7 +296,7 @@ func (back *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter,
if !ok {
return nil, nil
}
txs, senders, err := back.txsFromSnapshot(baseTxnID, txsAmount, txnSeg, buf)
txs, senders, err := r.txsFromSnapshot(baseTxnID, txsAmount, txnSeg, buf)
if err != nil {
return nil, err
}
@ -305,8 +308,8 @@ func (back *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter,
return body, nil
}
func (back *BlockReader) BodyRlp(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (bodyRlp rlp.RawValue, err error) {
body, err := back.BodyWithTransactions(ctx, tx, hash, blockHeight)
func (r *BlockReader) BodyRlp(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (bodyRlp rlp.RawValue, err error) {
body, err := r.BodyWithTransactions(ctx, tx, hash, blockHeight)
if err != nil {
return nil, err
}
@ -317,27 +320,27 @@ func (back *BlockReader) BodyRlp(ctx context.Context, tx kv.Getter, hash libcomm
return bodyRlp, nil
}
func (back *BlockReader) Body(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (body *types.Body, txAmount uint32, err error) {
if blockHeight >= back.sn.BlocksAvailable() {
func (r *BlockReader) Body(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (body *types.Body, txAmount uint32, err error) {
if blockHeight >= r.sn.BlocksAvailable() {
body, _, txAmount = rawdb.ReadBody(tx, hash, blockHeight)
return body, txAmount, nil
}
view := back.sn.View()
view := r.sn.View()
defer view.Close()
seg, ok := view.BodiesSegment(blockHeight)
if !ok {
return
}
body, _, txAmount, _, err = back.bodyFromSnapshot(blockHeight, seg, nil)
body, _, txAmount, _, err = r.bodyFromSnapshot(blockHeight, seg, nil)
if err != nil {
return nil, 0, err
}
return body, txAmount, nil
}
func (back *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (block *types.Block, senders []libcommon.Address, err error) {
if blockHeight >= back.sn.BlocksAvailable() {
func (r *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Getter, hash libcommon.Hash, blockHeight uint64) (block *types.Block, senders []libcommon.Address, err error) {
if blockHeight >= r.sn.BlocksAvailable() {
canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight)
if err != nil {
return nil, nil, fmt.Errorf("requested non-canonical hash %x. canonical=%x", hash, canonicalHash)
@ -352,7 +355,7 @@ func (back *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Getter, has
return rawdb.NonCanonicalBlockWithSenders(tx, hash, blockHeight)
}
view := back.sn.View()
view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
if !ok {
@ -360,7 +363,7 @@ func (back *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Getter, has
}
var buf []byte
h, buf, err := back.headerFromSnapshot(blockHeight, seg, buf)
h, buf, err := r.headerFromSnapshot(blockHeight, seg, buf)
if err != nil {
return nil, nil, err
}
@ -375,7 +378,7 @@ func (back *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Getter, has
if !ok {
return
}
b, baseTxnId, txsAmount, buf, err = back.bodyFromSnapshot(blockHeight, bodySeg, buf)
b, baseTxnId, txsAmount, buf, err = r.bodyFromSnapshot(blockHeight, bodySeg, buf)
if err != nil {
return nil, nil, err
}
@ -396,7 +399,7 @@ func (back *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Getter, has
return
}
var txs []types.Transaction
txs, senders, err = back.txsFromSnapshot(baseTxnId, txsAmount, txnSeg, buf)
txs, senders, err = r.txsFromSnapshot(baseTxnId, txsAmount, txnSeg, buf)
if err != nil {
return nil, nil, err
}
@ -411,7 +414,7 @@ func (back *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Getter, has
return block, senders, nil
}
func (back *BlockReader) headerFromSnapshot(blockHeight uint64, sn *HeaderSegment, buf []byte) (*types.Header, []byte, error) {
func (r *BlockReader) headerFromSnapshot(blockHeight uint64, sn *HeaderSegment, buf []byte) (*types.Header, []byte, error) {
if sn.idxHeaderHash == nil {
return nil, buf, nil
}
@ -436,7 +439,7 @@ func (back *BlockReader) headerFromSnapshot(blockHeight uint64, sn *HeaderSegmen
// because HeaderByHash method will search header in all snapshots - and may request header which doesn't exists
// but because our indices are based on PerfectHashMap, no way to know is given key exists or not, only way -
// to make sure is to fetch it and compare hash
func (back *BlockReader) headerFromSnapshotByHash(hash libcommon.Hash, sn *HeaderSegment, buf []byte) (*types.Header, error) {
func (r *BlockReader) headerFromSnapshotByHash(hash libcommon.Hash, sn *HeaderSegment, buf []byte) (*types.Header, error) {
defer func() {
if rec := recover(); rec != nil {
panic(fmt.Errorf("%+v, snapshot: %d-%d, trace: %s", rec, sn.ranges.from, sn.ranges.to, dbg.Stack()))
@ -469,8 +472,8 @@ func (back *BlockReader) headerFromSnapshotByHash(hash libcommon.Hash, sn *Heade
return h, nil
}
func (back *BlockReader) bodyFromSnapshot(blockHeight uint64, sn *BodySegment, buf []byte) (*types.Body, uint64, uint32, []byte, error) {
b, buf, err := back.bodyForStorageFromSnapshot(blockHeight, sn, buf)
func (r *BlockReader) bodyFromSnapshot(blockHeight uint64, sn *BodySegment, buf []byte) (*types.Body, uint64, uint32, []byte, error) {
b, buf, err := r.bodyForStorageFromSnapshot(blockHeight, sn, buf)
if err != nil {
return nil, 0, 0, buf, err
}
@ -485,7 +488,7 @@ func (back *BlockReader) bodyFromSnapshot(blockHeight uint64, sn *BodySegment, b
return body, b.BaseTxId + 1, txsAmount, buf, nil // empty txs in the beginning and end of block
}
func (back *BlockReader) bodyForStorageFromSnapshot(blockHeight uint64, sn *BodySegment, buf []byte) (*types.BodyForStorage, []byte, error) {
func (r *BlockReader) bodyForStorageFromSnapshot(blockHeight uint64, sn *BodySegment, buf []byte) (*types.BodyForStorage, []byte, error) {
defer func() {
if rec := recover(); rec != nil {
panic(fmt.Errorf("%+v, snapshot: %d-%d, trace: %s", rec, sn.ranges.from, sn.ranges.to, dbg.Stack()))
@ -518,7 +521,7 @@ func (back *BlockReader) bodyForStorageFromSnapshot(blockHeight uint64, sn *Body
return b, buf, nil
}
func (back *BlockReader) txsFromSnapshot(baseTxnID uint64, txsAmount uint32, txsSeg *TxnSegment, buf []byte) (txs []types.Transaction, senders []libcommon.Address, err error) {
func (r *BlockReader) txsFromSnapshot(baseTxnID uint64, txsAmount uint32, txsSeg *TxnSegment, buf []byte) (txs []types.Transaction, senders []libcommon.Address, err error) {
defer func() {
if rec := recover(); rec != nil {
panic(fmt.Errorf("%+v, snapshot: %d-%d, trace: %s", rec, txsSeg.ranges.from, txsSeg.ranges.to, dbg.Stack()))
@ -560,7 +563,7 @@ func (back *BlockReader) txsFromSnapshot(baseTxnID uint64, txsAmount uint32, txs
return txs, senders, nil
}
func (back *BlockReader) txnByID(txnID uint64, sn *TxnSegment, buf []byte) (txn types.Transaction, err error) {
func (r *BlockReader) txnByID(txnID uint64, sn *TxnSegment, buf []byte) (txn types.Transaction, err error) {
offset := sn.IdxTxnHash.OrdinalLookup(txnID - sn.IdxTxnHash.BaseDataID())
gg := sn.Seg.MakeGetter()
gg.Reset(offset)
@ -578,7 +581,7 @@ func (back *BlockReader) txnByID(txnID uint64, sn *TxnSegment, buf []byte) (txn
return
}
func (back *BlockReader) txnByHash(txnHash libcommon.Hash, segments []*TxnSegment, buf []byte) (txn types.Transaction, blockNum, txnID uint64, err error) {
func (r *BlockReader) txnByHash(txnHash libcommon.Hash, segments []*TxnSegment, buf []byte) (txn types.Transaction, blockNum, txnID uint64, err error) {
for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
if sn.IdxTxnHash == nil || sn.IdxTxnHash2BlockNum == nil {
@ -618,8 +621,8 @@ func (back *BlockReader) txnByHash(txnHash libcommon.Hash, segments []*TxnSegmen
// TxnByIdxInBlock - doesn't include system-transactions in the begin/end of block
// return nil if 0 < i < body.TxAmount
func (back *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNum uint64, i int) (txn types.Transaction, err error) {
if blockNum >= back.sn.BlocksAvailable() {
func (r *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNum uint64, i int) (txn types.Transaction, err error) {
if blockNum >= r.sn.BlocksAvailable() {
canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockNum)
if err != nil {
return nil, err
@ -635,14 +638,14 @@ func (back *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, bloc
return nil, nil
}
txn, err = rawdb.CanonicalTxnByID(tx, b.BaseTxId+1+uint64(i), canonicalHash, back.TransactionsV3)
txn, err = rawdb.CanonicalTxnByID(tx, b.BaseTxId+1+uint64(i), canonicalHash, r.TransactionsV3)
if err != nil {
return nil, err
}
return txn, nil
}
view := back.sn.View()
view := r.sn.View()
defer view.Close()
seg, ok := view.BodiesSegment(blockNum)
if !ok {
@ -650,7 +653,7 @@ func (back *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, bloc
}
var b *types.BodyForStorage
b, _, err = back.bodyForStorageFromSnapshot(blockNum, seg, nil)
b, _, err = r.bodyForStorageFromSnapshot(blockNum, seg, nil)
if err != nil {
return nil, err
}
@ -668,11 +671,11 @@ func (back *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, bloc
return
}
// +1 because block has system-txn in the beginning of block
return back.txnByID(b.BaseTxId+1+uint64(i), txnSeg, nil)
return r.txnByID(b.BaseTxId+1+uint64(i), txnSeg, nil)
}
// TxnLookup - find blockNumber and txnID by txnHash
func (back *BlockReader) TxnLookup(ctx context.Context, tx kv.Getter, txnHash libcommon.Hash) (uint64, bool, error) {
func (r *BlockReader) TxnLookup(ctx context.Context, tx kv.Getter, txnHash libcommon.Hash) (uint64, bool, error) {
n, err := rawdb.ReadTxLookupEntry(tx, txnHash)
if err != nil {
return 0, false, err
@ -681,12 +684,12 @@ func (back *BlockReader) TxnLookup(ctx context.Context, tx kv.Getter, txnHash li
return *n, true, nil
}
view := back.sn.View()
view := r.sn.View()
defer view.Close()
var txn types.Transaction
var blockNum uint64
txn, blockNum, _, err = back.txnByHash(txnHash, view.Txs(), nil)
txn, blockNum, _, err = r.txnByHash(txnHash, view.Txs(), nil)
if err != nil {
return 0, false, err
}
@ -696,8 +699,8 @@ func (back *BlockReader) TxnLookup(ctx context.Context, tx kv.Getter, txnHash li
return blockNum, true, nil
}
func (back *BlockReader) LastTxNumInSnapshot(blockNum uint64) (uint64, bool, error) {
view := back.sn.View()
func (r *BlockReader) LastTxNumInSnapshot(blockNum uint64) (uint64, bool, error) {
view := r.sn.View()
defer view.Close()
sn, ok := view.TxsSegment(blockNum)
@ -709,8 +712,8 @@ func (back *BlockReader) LastTxNumInSnapshot(blockNum uint64) (uint64, bool, err
return lastTxnID, true, nil
}
func (back *BlockReader) IterateBodies(f func(blockNum, baseTxNum, txAmount uint64) error) error {
view := back.sn.View()
func (r *BlockReader) IterateBodies(f func(blockNum, baseTxNum, txAmount uint64) error) error {
view := r.sn.View()
defer view.Close()
for _, sn := range view.Bodies() {
@ -734,3 +737,4 @@ func (back *BlockReader) IterateBodies(f func(blockNum, baseTxNum, txAmount uint
}
return nil
}
func (r *BlockReader) TxsV3Enabled() bool { return r.TransactionsV3 }

View File

@ -40,7 +40,8 @@ func TestInserter1(t *testing.T) {
t.Fatal(err)
}
defer tx.Rollback()
hi := headerdownload.NewHeaderInserter("headers", big.NewInt(0), 0, snapshotsync.NewBlockReader(m.BlockSnapshots, m.TransactionsV3))
br, bw := m.NewBlocksIO()
hi := headerdownload.NewHeaderInserter("headers", big.NewInt(0), 0, br, bw)
h1 := types.Header{
Number: big.NewInt(1),
Difficulty: big.NewInt(10),

View File

@ -892,19 +892,19 @@ func (hi *HeaderInserter) FeedHeaderPoW(db kv.StatelessRwTx, headerReader servic
// This makes sure we end up choosing the chain with the max total difficulty
hi.localTd.Set(td)
}
if err = rawdb.WriteTd(db, hash, blockHeight, td); err != nil {
if err = hi.headerWriter.WriteTd(db, hash, blockHeight, td); err != nil {
return nil, fmt.Errorf("[%s] failed to WriteTd: %w", hi.logPrefix, err)
}
if err = db.Put(kv.Headers, dbutils.HeaderKey(blockHeight, hash), headerRaw); err != nil {
return nil, fmt.Errorf("[%s] failed to store header: %w", hi.logPrefix, err)
// skipIndexing=true - because next stages will build indices in-batch (for example StageBlockHash)
if err = hi.headerWriter.WriteHeaderRaw(db, blockHeight, hash, headerRaw, true); err != nil {
return nil, fmt.Errorf("[%s] failed to WriteTd: %w", hi.logPrefix, err)
}
hi.prevHash = hash
return td, nil
}
func (hi *HeaderInserter) FeedHeaderPoS(db kv.GetPut, header *types.Header, hash libcommon.Hash) error {
func (hi *HeaderInserter) FeedHeaderPoS(db kv.RwTx, header *types.Header, hash libcommon.Hash) error {
blockHeight := header.Number.Uint64()
// TODO(yperbasis): do we need to check if the header is already inserted (oldH)?
@ -913,10 +913,12 @@ func (hi *HeaderInserter) FeedHeaderPoS(db kv.GetPut, header *types.Header, hash
return fmt.Errorf("[%s] parent's total difficulty not found with hash %x and height %d for header %x %d: %v", hi.logPrefix, header.ParentHash, blockHeight-1, hash, blockHeight, err)
}
td := new(big.Int).Add(parentTd, header.Difficulty)
if err = rawdb.WriteTd(db, hash, blockHeight, td); err != nil {
if err = hi.headerWriter.WriteHeader(db, header); err != nil {
return fmt.Errorf("[%s] failed to WriteHeader: %w", hi.logPrefix, err)
}
if err = hi.headerWriter.WriteTd(db, hash, blockHeight, td); err != nil {
return fmt.Errorf("[%s] failed to WriteTd: %w", hi.logPrefix, err)
}
rawdb.WriteHeader(db, header)
hi.highest = blockHeight
hi.highestHash = hash

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/golang-lru/v2"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/types"
@ -384,14 +385,16 @@ type HeaderInserter struct {
highestTimestamp uint64
canonicalCache *lru.Cache[uint64, common.Hash]
headerReader services.HeaderAndCanonicalReader
headerWriter *blockio.BlockWriter
}
func NewHeaderInserter(logPrefix string, localTd *big.Int, headerProgress uint64, headerReader services.HeaderAndCanonicalReader) *HeaderInserter {
func NewHeaderInserter(logPrefix string, localTd *big.Int, headerProgress uint64, headerReader services.HeaderAndCanonicalReader, headerWriter *blockio.BlockWriter) *HeaderInserter {
hi := &HeaderInserter{
logPrefix: logPrefix,
localTd: localTd,
unwindPoint: headerProgress,
headerReader: headerReader,
headerWriter: headerWriter,
}
hi.canonicalCache, _ = lru.New[uint64, common.Hash](1000)
return hi

View File

@ -11,6 +11,8 @@ import (
"github.com/c2h5oh/datasize"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
"google.golang.org/protobuf/types/known/emptypb"
@ -259,7 +261,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
if tb != nil {
tb.Cleanup(mock.Close)
}
blockReader := snapshotsync.NewBlockReader(mock.BlockSnapshots, mock.TransactionsV3)
blockReader, blockWriter := mock.NewBlocksIO()
mock.Address = crypto.PubkeyToAddress(mock.Key.PublicKey)
@ -322,7 +324,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
return err
}
// We start the mining step
if err := StateStep(ctx, batch, stateSync, mock.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil {
if err := StateStep(ctx, batch, blockWriter, stateSync, mock.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil {
logger.Warn("Could not validate block", "err", err)
return err
}
@ -392,12 +394,13 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
false,
mock.BlockSnapshots,
blockReader,
blockWriter,
dirs.Tmp,
mock.Notifications,
engineapi.NewForkValidatorMock(1),
),
stagedsync.StageCumulativeIndexCfg(mock.DB),
stagedsync.StageBlockHashesCfg(mock.DB, mock.Dirs.Tmp, mock.ChainConfig),
stagedsync.StageBlockHashesCfg(mock.DB, mock.Dirs.Tmp, mock.ChainConfig, blockWriter),
stagedsync.StageBodiesCfg(mock.DB,
mock.sentriesClient.Bd,
sendBodyRequest,
@ -410,7 +413,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
cfg.HistoryV3,
cfg.TransactionsV3,
),
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, false, dirs.Tmp, prune, blockRetire, mock.sentriesClient.Hd),
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, false, dirs.Tmp, prune, blockRetire, blockWriter, mock.sentriesClient.Hd),
stagedsync.StageExecuteBlocksCfg(
mock.DB,
prune,
@ -435,7 +438,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
stagedsync.StageHistoryCfg(mock.DB, prune, dirs.Tmp),
stagedsync.StageLogIndexCfg(mock.DB, prune, dirs.Tmp),
stagedsync.StageCallTracesCfg(mock.DB, prune, 0, dirs.Tmp),
stagedsync.StageTxLookupCfg(mock.DB, prune, dirs.Tmp, mock.BlockSnapshots, mock.ChainConfig.Bor),
stagedsync.StageTxLookupCfg(mock.DB, prune, dirs.Tmp, mock.BlockSnapshots, mock.ChainConfig.Bor, blockReader),
stagedsync.StageFinishCfg(mock.DB, dirs.Tmp, forkValidator),
!withPosDownloader),
stagedsync.DefaultUnwindOrder,
@ -760,3 +763,7 @@ func (ms *MockSentry) CalcStateRoot(tx kv.Tx) libcommon.Hash {
func (ms *MockSentry) HistoryV3Components() *libstate.AggregatorV3 {
return ms.agg
}
func (ms *MockSentry) NewBlocksIO() (services.FullBlockReader, *blockio.BlockWriter) {
return snapshotsync.NewBlockReader(ms.BlockSnapshots, ms.TransactionsV3), blockio.NewBlockWriter(ms.TransactionsV3)
}

View File

@ -17,6 +17,7 @@ import (
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/consensus"
@ -302,7 +303,7 @@ func MiningStep(ctx context.Context, kv kv.RwDB, mining *stagedsync.Sync, tmpDir
return nil
}
func StateStep(ctx context.Context, batch kv.RwTx, stateSync *stagedsync.Sync, Bd *bodydownload.BodyDownload, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody) (err error) {
func StateStep(ctx context.Context, batch kv.RwTx, blockWriter *blockio.BlockWriter, stateSync *stagedsync.Sync, Bd *bodydownload.BodyDownload, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody) (err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("%+v, trace: %s", rec, dbg.Stack())
@ -325,11 +326,10 @@ func StateStep(ctx context.Context, batch kv.RwTx, stateSync *stagedsync.Sync, B
currentHash := headersChain[i].Hash()
// Prepare memory state for block execution
Bd.AddToPrefetch(currentHeader, currentBody)
rawdb.WriteHeader(batch, currentHeader)
if err = rawdb.WriteHeaderNumber(batch, currentHash, currentHeight); err != nil {
if err := blockWriter.WriteHeader(batch, currentHeader); err != nil {
return err
}
if err = rawdb.WriteCanonicalHash(batch, currentHash, currentHeight); err != nil {
if err := blockWriter.WriteCanonicalHash(batch, currentHash, currentHeight); err != nil {
return err
}
}
@ -342,12 +342,10 @@ func StateStep(ctx context.Context, batch kv.RwTx, stateSync *stagedsync.Sync, B
height := header.Number.Uint64()
hash := header.Hash()
// Prepare memory state for block execution
rawdb.WriteHeader(batch, header)
if err = rawdb.WriteHeaderNumber(batch, hash, height); err != nil {
if err = blockWriter.WriteHeader(batch, header); err != nil {
return err
}
if err = rawdb.WriteCanonicalHash(batch, hash, height); err != nil {
if err = blockWriter.WriteCanonicalHash(batch, hash, height); err != nil {
return err
}
@ -382,7 +380,7 @@ func NewDefaultStages(ctx context.Context,
logger log.Logger,
) []*stagedsync.Stage {
dirs := cfg.Dirs
blockReader := snapshotsync.NewBlockReader(snapshots, cfg.TransactionsV3)
blockReader, blockWriter := snapshotsync.NewBlockReader(snapshots, cfg.TransactionsV3), blockio.NewBlockWriter(cfg.TransactionsV3)
blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, snapshots, db, snapDownloader, notifications.Events, logger)
// During Import we don't want other services like header requests, body requests etc. to be running.
@ -413,12 +411,13 @@ func NewDefaultStages(ctx context.Context,
p2pCfg.NoDiscovery,
snapshots,
blockReader,
blockWriter,
dirs.Tmp,
notifications,
forkValidator,
),
stagedsync.StageCumulativeIndexCfg(db),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageBodiesCfg(
db,
controlServer.Bd,
@ -432,7 +431,7 @@ func NewDefaultStages(ctx context.Context,
cfg.HistoryV3,
cfg.TransactionsV3,
),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockRetire, controlServer.Hd),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockRetire, blockWriter, controlServer.Hd),
stagedsync.StageExecuteBlocksCfg(
db,
cfg.Prune,
@ -457,7 +456,7 @@ func NewDefaultStages(ctx context.Context,
stagedsync.StageHistoryCfg(db, cfg.Prune, dirs.Tmp),
stagedsync.StageLogIndexCfg(db, cfg.Prune, dirs.Tmp),
stagedsync.StageCallTracesCfg(db, cfg.Prune, 0, dirs.Tmp),
stagedsync.StageTxLookupCfg(db, cfg.Prune, dirs.Tmp, snapshots, controlServer.ChainConfig.Bor),
stagedsync.StageTxLookupCfg(db, cfg.Prune, dirs.Tmp, snapshots, controlServer.ChainConfig.Bor, blockReader),
stagedsync.StageFinishCfg(db, dirs.Tmp, forkValidator),
runInTestMode)
}
@ -465,7 +464,7 @@ func NewDefaultStages(ctx context.Context,
func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config, controlServer *sentry.MultiClient,
dirs datadir.Dirs, notifications *shards.Notifications, snapshots *snapshotsync.RoSnapshots, agg *state.AggregatorV3,
logger log.Logger) (*stagedsync.Sync, error) {
blockReader := snapshotsync.NewBlockReader(snapshots, cfg.TransactionsV3)
blockReader, blockWriter := snapshotsync.NewBlockReader(snapshots, cfg.TransactionsV3), blockio.NewBlockWriter(cfg.TransactionsV3)
return stagedsync.New(
stagedsync.StateStages(ctx,
@ -481,12 +480,13 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config
false,
snapshots,
blockReader,
blockWriter,
dirs.Tmp,
nil, nil,
),
stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, snapshots, blockReader, cfg.HistoryV3, cfg.TransactionsV3),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, true, dirs.Tmp, cfg.Prune, nil, controlServer.Hd),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, true, dirs.Tmp, cfg.Prune, nil, blockWriter, controlServer.Hd),
stagedsync.StageExecuteBlocksCfg(
db,
cfg.Prune,