Snapshots of Bor events (#7901)

Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro-2.local>
Co-authored-by: Alex Sharp <alexsharp@alexs-mbp-2.home>
This commit is contained in:
ledgerwatch 2023-08-18 17:10:35 +01:00 committed by GitHub
parent 384c6ba9ee
commit 6b6c0caad0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 2445 additions and 360 deletions

View File

@ -138,7 +138,7 @@ func blocksIO(db kv.RoDB) (services.FullBlockReader, *blockio.BlockWriter) {
}); err != nil {
panic(err)
}
br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()))
br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()), nil /* BorSnapshots */)
bw := blockio.NewBlockWriter(histV3)
return br, bw
}
@ -1330,11 +1330,13 @@ func readSeg(chaindata string) error {
g := vDecomp.MakeGetter()
var buf []byte
var count int
var offset, nextPos uint64
for g.HasNext() {
g.Next(buf[:0])
buf, nextPos = g.Next(buf[:0])
fmt.Printf("offset: %d, val: %x\n", offset, buf)
offset = nextPos
count++
}
fmt.Printf("count=%d\n", count)
return nil
}

View File

@ -36,8 +36,9 @@ var cmdResetState = &cobra.Command{
}
ctx, _ := common.RootContext()
defer db.Close()
sn, agg := allSnapshots(ctx, db, logger)
sn, borSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
if err := db.View(ctx, func(tx kv.Tx) error { return printStages(tx, sn, agg) }); err != nil {

View File

@ -110,7 +110,7 @@ func openDB(opts kv2.MdbxOpts, applyMigrations bool, logger log.Logger) (kv.RwDB
return nil, err
}
if h3 {
_, agg := allSnapshots(context.Background(), db, logger)
_, _, agg := allSnapshots(context.Background(), db, logger)
tdb, err := temporal.New(db, agg, systemcontracts.SystemContractCodeLookup[chain])
if err != nil {
return nil, err

View File

@ -11,6 +11,9 @@ import (
"time"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon/consensus/bor"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall"
"github.com/ledgerwatch/erigon/consensus/bor/heimdallgrpc"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/node/nodecfg"
"github.com/ledgerwatch/erigon/turbo/builder"
@ -96,6 +99,27 @@ var cmdStageHeaders = &cobra.Command{
},
}
var cmdStageBorHeimdall = &cobra.Command{
Use: "stage_bor_heimdall",
Short: "",
Run: func(cmd *cobra.Command, args []string) {
logger := debug.SetupCobra(cmd, "integration")
db, err := openDB(dbCfg(kv.ChainDB, chaindata), true, logger)
if err != nil {
logger.Error("Opening DB", "error", err)
return
}
defer db.Close()
if err := stageBorHeimdall(db, cmd.Context(), logger); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error(err.Error())
}
return
}
},
}
var cmdStageBodies = &cobra.Command{
Use: "stage_bodies",
Short: "",
@ -393,8 +417,9 @@ var cmdSetSnap = &cobra.Command{
return
}
defer db.Close()
sn, agg := allSnapshots(cmd.Context(), db, logger)
sn, borSn, agg := allSnapshots(cmd.Context(), db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
if err := db.Update(context.Background(), func(tx kv.RwTx) error {
@ -460,6 +485,13 @@ func init() {
withHeimdall(cmdStageHeaders)
rootCmd.AddCommand(cmdStageHeaders)
withConfig(cmdStageBorHeimdall)
withDataDir(cmdStageBorHeimdall)
withReset(cmdStageBorHeimdall)
withChain(cmdStageBorHeimdall)
withHeimdall(cmdStageBorHeimdall)
rootCmd.AddCommand(cmdStageBorHeimdall)
withConfig(cmdStageBodies)
withDataDir(cmdStageBodies)
withUnwind(cmdStageBodies)
@ -603,8 +635,9 @@ func stageSnapshots(db kv.RwDB, ctx context.Context, logger log.Logger) error {
}
func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
sn, agg := allSnapshots(ctx, db, logger)
sn, borSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
br, bw := blocksIO(db, logger)
engine, _, _, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
@ -675,9 +708,22 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
})
}
func stageBorHeimdall(db kv.RwDB, ctx context.Context, logger log.Logger) error {
return db.Update(ctx, func(tx kv.RwTx) error {
if reset {
if err := reset2.ResetBorHeimdall(ctx, tx); err != nil {
return err
}
return nil
}
return nil
})
}
func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error {
sn, agg := allSnapshots(ctx, db, logger)
sn, borSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
chainConfig, historyV3 := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db)
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
@ -715,8 +761,9 @@ func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
tmpdir := datadir.New(datadirCli).Tmp
chainConfig := fromdb.ChainConfig(db)
sn, agg := allSnapshots(ctx, db, logger)
sn, borSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
@ -808,8 +855,9 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {
dirs := datadir.New(datadirCli)
engine, vmConfig, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
must(sync.SetCurrentStage(stages.Execution))
sn, agg := allSnapshots(ctx, db, logger)
sn, borSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
if warmup {
@ -889,8 +937,9 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stageTrie(db kv.RwDB, ctx context.Context, logger log.Logger) error {
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), kvcfg.HistoryV3.FromDB(db)
sn, agg := allSnapshots(ctx, db, logger)
sn, borSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
must(sync.SetCurrentStage(stages.IntermediateHashes))
@ -946,8 +995,9 @@ func stageTrie(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stageHashState(db kv.RwDB, ctx context.Context, logger log.Logger) error {
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), kvcfg.HistoryV3.FromDB(db)
sn, agg := allSnapshots(ctx, db, logger)
sn, borSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
must(sync.SetCurrentStage(stages.HashState))
@ -1123,8 +1173,9 @@ func stageHistory(db kv.RwDB, ctx context.Context, logger log.Logger) error {
if historyV3 {
return fmt.Errorf("this stage is disable in --history.v3=true")
}
sn, agg := allSnapshots(ctx, db, logger)
sn, borSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
must(sync.SetCurrentStage(stages.AccountHistoryIndex))
@ -1198,8 +1249,9 @@ func stageTxLookup(db kv.RwDB, ctx context.Context, logger log.Logger) error {
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
chainConfig := fromdb.ChainConfig(db)
must(sync.SetCurrentStage(stages.TxLookup))
sn, agg := allSnapshots(ctx, db, logger)
sn, borSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
if reset {
@ -1247,8 +1299,9 @@ func stageTxLookup(db kv.RwDB, ctx context.Context, logger log.Logger) error {
}
func printAllStages(db kv.RoDB, ctx context.Context, logger log.Logger) error {
sn, agg := allSnapshots(ctx, db, logger)
sn, borSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
return db.View(ctx, func(tx kv.Tx) error { return printStages(tx, sn, agg) })
}
@ -1279,9 +1332,10 @@ func removeMigration(db kv.RwDB, ctx context.Context) error {
var openSnapshotOnce sync.Once
var _allSnapshotsSingleton *freezeblocks.RoSnapshots
var _allBorSnapshotsSingleton *freezeblocks.BorRoSnapshots
var _aggSingleton *libstate.AggregatorV3
func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezeblocks.RoSnapshots, *libstate.AggregatorV3) {
func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezeblocks.RoSnapshots, *freezeblocks.BorRoSnapshots, *libstate.AggregatorV3) {
openSnapshotOnce.Do(func() {
var useSnapshots bool
_ = db.View(context.Background(), func(tx kv.Tx) error {
@ -1293,6 +1347,7 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
snapCfg := ethconfig.NewSnapCfg(useSnapshots, true, true)
_allSnapshotsSingleton = freezeblocks.NewRoSnapshots(snapCfg, dirs.Snap, logger)
_allBorSnapshotsSingleton = freezeblocks.NewBorRoSnapshots(snapCfg, dirs.Snap, logger)
var err error
_aggSingleton, err = libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db, logger)
@ -1309,6 +1364,10 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
panic(err)
}
_allSnapshotsSingleton.LogStat()
if err := _allBorSnapshotsSingleton.ReopenFolder(); err != nil {
panic(err)
}
_allBorSnapshotsSingleton.LogStat()
db.View(context.Background(), func(tx kv.Tx) error {
_aggSingleton.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
_, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax)
@ -1318,7 +1377,7 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
})
}
})
return _allSnapshotsSingleton, _aggSingleton
return _allSnapshotsSingleton, _allBorSnapshotsSingleton, _aggSingleton
}
var openBlockReaderOnce sync.Once
@ -1327,9 +1386,9 @@ var _blockWriterSingleton *blockio.BlockWriter
func blocksIO(db kv.RoDB, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter) {
openBlockReaderOnce.Do(func() {
sn, _ := allSnapshots(context.Background(), db, logger)
sn, borSn, _ := allSnapshots(context.Background(), db, logger)
histV3 := kvcfg.HistoryV3.FromDB(db)
_blockReaderSingleton = freezeblocks.NewBlockReader(sn)
_blockReaderSingleton = freezeblocks.NewBlockReader(sn, borSn)
_blockWriterSingleton = blockio.NewBlockWriter(histV3)
})
return _blockReaderSingleton, _blockWriterSingleton
@ -1407,7 +1466,8 @@ func newDomains(ctx context.Context, db kv.RwDB, stepSize uint64, mode libstate.
allSn, agg := allDomains(ctx, db, stepSize, mode, trie, logger)
cfg.Snapshot = allSn.Cfg()
engine := initConsensusEngine(chainConfig, cfg.Dirs.DataDir, db, logger)
blockReader, _ := blocksIO(db, logger)
engine, _ := initConsensusEngine(chainConfig, cfg.Dirs.DataDir, db, blockReader, logger)
return engine, cfg, allSn, agg
}
@ -1440,12 +1500,12 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
cfg.Miner = *miningConfig
}
cfg.Dirs = datadir.New(datadirCli)
allSn, agg := allSnapshots(ctx, db, logger)
allSn, _, agg := allSnapshots(ctx, db, logger)
cfg.Snapshot = allSn.Cfg()
engine := initConsensusEngine(chainConfig, cfg.Dirs.DataDir, db, logger)
blockReader, blockWriter := blocksIO(db, logger)
engine, heimdallClient := initConsensusEngine(chainConfig, cfg.Dirs.DataDir, db, blockReader, logger)
sentryControlServer, err := sentry.NewMultiClient(
db,
"",
@ -1469,7 +1529,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
notifications := &shards.Notifications{}
blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, db, notifications.Events, logger)
stages := stages2.NewDefaultStages(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, logger)
stages := stages2.NewDefaultStages(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, heimdallClient, logger)
sync := stagedsync.New(stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger)
miner := stagedsync.NewMiningState(&cfg.Miner)
@ -1531,20 +1591,26 @@ func overrideStorageMode(db kv.RwDB, logger log.Logger) error {
})
}
func initConsensusEngine(cc *chain2.Config, dir string, db kv.RwDB, logger log.Logger) (engine consensus.Engine) {
func initConsensusEngine(cc *chain2.Config, dir string, db kv.RwDB, blockReader services.FullBlockReader, logger log.Logger) (engine consensus.Engine, heimdallClient bor.IHeimdallClient) {
config := ethconfig.Defaults
var consensusConfig interface{}
if cc.Clique != nil {
consensusConfig = params.CliqueSnapshot
} else if cc.Aura != nil {
consensusConfig = &config.Aura
} else if cc.Bor != nil {
consensusConfig = &config.Bor
if !config.WithoutHeimdall {
if config.HeimdallgRPCAddress != "" {
heimdallClient = heimdallgrpc.NewHeimdallGRPCClient(config.HeimdallgRPCAddress, logger)
} else {
heimdallClient = heimdall.NewHeimdallClient(config.HeimdallURL, logger)
}
}
} else {
consensusConfig = &config.Ethash
}
return ethconsensusconfig.CreateConsensusEngine(&nodecfg.Config{Dirs: datadir.New(dir)}, cc, consensusConfig, config.Miner.Notify, config.Miner.Noverify,
HeimdallgRPCAddress, HeimdallURL, config.WithoutHeimdall, db.ReadOnly(), logger)
heimdallClient, config.WithoutHeimdall, blockReader, db.ReadOnly(), logger), heimdallClient
}

View File

@ -169,8 +169,9 @@ func init() {
func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.Context, logger1 log.Logger) error {
dirs := datadir.New(datadirCli)
sn, agg := allSnapshots(ctx, db, logger1)
sn, borSn, agg := allSnapshots(ctx, db, logger1)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
engine, vmConfig, stateStages, miningStages, miner := newSync(ctx, db, &miningConfig, logger1)
chainConfig, historyV3, pm := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db), fromdb.PruneMode(db)
@ -445,8 +446,9 @@ func checkMinedBlock(b1, b2 *types.Block, chainConfig *chain2.Config) {
}
func loopIh(db kv.RwDB, ctx context.Context, unwind uint64, logger log.Logger) error {
sn, agg := allSnapshots(ctx, db, logger)
sn, borSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
dirs := datadir.New(datadirCli)
@ -518,8 +520,9 @@ func loopIh(db kv.RwDB, ctx context.Context, unwind uint64, logger log.Logger) e
func loopExec(db kv.RwDB, ctx context.Context, unwind uint64, logger log.Logger) error {
chainConfig := fromdb.ChainConfig(db)
dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db)
sn, agg := allSnapshots(ctx, db, logger)
sn, borSn, agg := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
engine, vmConfig, sync, _, _ := newSync(ctx, db, nil, logger)

View File

@ -295,6 +295,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
// Configure DB first
var allSnapshots *freezeblocks.RoSnapshots
var allBorSnapshots *freezeblocks.BorRoSnapshots
onNewSnapshot := func() {}
if cfg.WithDatadir {
var rwKv kv.RwDB
@ -338,10 +339,13 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
// Configure sapshots
allSnapshots = freezeblocks.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap, logger)
allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, logger)
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
allSnapshots.OptimisticReopenWithDB(db)
allBorSnapshots.OptimisticalyReopenWithDB(db)
allSnapshots.LogStat()
allBorSnapshots.LogStat()
if agg, err = libstate.NewAggregatorV3(ctx, cfg.Dirs.SnapHistory, cfg.Dirs.Tmp, ethconfig.HistoryV3AggregationStep, db, logger); err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
@ -367,6 +371,11 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
} else {
allSnapshots.LogStat()
}
if err := allBorSnapshots.ReopenList(reply.BlocksFiles, true); err != nil {
logger.Error("[bor snapshots] reopen", "err", err)
} else {
allSnapshots.LogStat()
}
_ = reply.HistoryFiles
@ -384,7 +393,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
}()
}
onNewSnapshot()
blockReader = freezeblocks.NewBlockReader(allSnapshots)
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots)
var histV3Enabled bool
_ = db.View(ctx, func(tx kv.Tx) error {

View File

@ -79,10 +79,12 @@ func (back *RemoteBackend) BlockByHash(ctx context.Context, db kv.Tx, hash commo
block, _, err := back.BlockWithSenders(ctx, db, hash, *number)
return block, err
}
func (back *RemoteBackend) TxsV3Enabled() bool { panic("not implemented") }
func (back *RemoteBackend) Snapshots() services.BlockSnapshots { panic("not implemented") }
func (back *RemoteBackend) FrozenBlocks() uint64 { return back.blockReader.FrozenBlocks() }
func (back *RemoteBackend) FrozenFiles() (list []string) { return back.blockReader.FrozenFiles() }
func (back *RemoteBackend) TxsV3Enabled() bool { panic("not implemented") }
func (back *RemoteBackend) Snapshots() services.BlockSnapshots { panic("not implemented") }
func (back *RemoteBackend) BorSnapshots() services.BlockSnapshots { panic("not implemented") }
func (back *RemoteBackend) FrozenBlocks() uint64 { return back.blockReader.FrozenBlocks() }
func (back *RemoteBackend) FrozenBorBlocks() uint64 { return back.blockReader.FrozenBorBlocks() }
func (back *RemoteBackend) FrozenFiles() (list []string) { return back.blockReader.FrozenFiles() }
func (back *RemoteBackend) FreezingCfg() ethconfig.BlocksFreezing {
return back.blockReader.FreezingCfg()
}
@ -263,6 +265,12 @@ func (back *RemoteBackend) CanonicalHash(ctx context.Context, tx kv.Getter, bloc
func (back *RemoteBackend) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNum uint64, i int) (types.Transaction, error) {
return back.blockReader.TxnByIdxInBlock(ctx, tx, blockNum, i)
}
func (back *RemoteBackend) EventLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error) {
return back.blockReader.EventLookup(ctx, tx, txnHash)
}
func (back *RemoteBackend) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.Hash, blockNum uint64) ([]rlp.RawValue, error) {
return back.blockReader.EventsByBlock(ctx, tx, hash, blockNum)
}
func (back *RemoteBackend) NodeInfo(ctx context.Context, limit uint32) ([]p2p.NodeInfo, error) {
nodes, err := back.remoteEthBackend.NodeInfo(ctx, &remote.NodesInfoRequest{Limit: limit})

View File

@ -78,7 +78,7 @@ func CheckChangeSets(genesis *types.Genesis, blockNum uint64, chaindata string,
if err := allSnapshots.ReopenFolder(); err != nil {
return fmt.Errorf("reopen snapshot segments: %w", err)
}
blockReader := freezeblocks.NewBlockReader(allSnapshots)
blockReader := freezeblocks.NewBlockReader(allSnapshots, nil /* BorSnapshots */)
chainDb := db
defer chainDb.Close()
@ -116,7 +116,7 @@ func CheckChangeSets(genesis *types.Genesis, blockNum uint64, chaindata string,
commitEvery := time.NewTicker(30 * time.Second)
defer commitEvery.Stop()
engine := initConsensusEngine(chainConfig, allSnapshots, logger)
engine := initConsensusEngine(chainConfig, allSnapshots, blockReader, logger)
for !interrupt {

View File

@ -30,6 +30,9 @@ import (
"github.com/ledgerwatch/erigon/cmd/state/exec3"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/consensus/bor"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall"
"github.com/ledgerwatch/erigon/consensus/bor/heimdallgrpc"
"github.com/ledgerwatch/erigon/consensus/misc"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/state"
@ -232,12 +235,17 @@ func Erigon4(genesis *types.Genesis, chainConfig *chain2.Config, logger log.Logg
var blockReader services.FullBlockReader
var allSnapshots = freezeblocks.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadirCli, "snapshots"), logger)
var allBorSnapshots = freezeblocks.NewBorRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadirCli, "snapshots"), logger)
defer allSnapshots.Close()
defer allBorSnapshots.Close()
if err := allSnapshots.ReopenFolder(); err != nil {
return fmt.Errorf("reopen snapshot segments: %w", err)
}
blockReader = freezeblocks.NewBlockReader(allSnapshots)
engine := initConsensusEngine(chainConfig, allSnapshots, logger)
if err := allBorSnapshots.ReopenFolder(); err != nil {
return fmt.Errorf("reopen bor snapshot segments: %w", err)
}
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots)
engine := initConsensusEngine(chainConfig, allSnapshots, blockReader, logger)
getHeader := func(hash libcommon.Hash, number uint64) *types.Header {
h, err := blockReader.Header(ctx, historyTx, hash, number)
@ -600,10 +608,11 @@ func (ww *StateWriterV4) CreateContract(address libcommon.Address) error {
return nil
}
func initConsensusEngine(cc *chain2.Config, snapshots *freezeblocks.RoSnapshots, logger log.Logger) (engine consensus.Engine) {
func initConsensusEngine(cc *chain2.Config, snapshots *freezeblocks.RoSnapshots, blockReader services.FullBlockReader, logger log.Logger) (engine consensus.Engine) {
config := ethconfig.Defaults
var consensusConfig interface{}
var heimdallClient bor.IHeimdallClient
if cc.Clique != nil {
consensusConfig = params.CliqueSnapshot
@ -611,9 +620,15 @@ func initConsensusEngine(cc *chain2.Config, snapshots *freezeblocks.RoSnapshots,
consensusConfig = &config.Aura
} else if cc.Bor != nil {
consensusConfig = &config.Bor
if !config.WithoutHeimdall {
if config.HeimdallgRPCAddress != "" {
heimdallClient = heimdallgrpc.NewHeimdallGRPCClient(config.HeimdallgRPCAddress, logger)
} else {
heimdallClient = heimdall.NewHeimdallClient(config.HeimdallURL, logger)
}
}
} else {
consensusConfig = &config.Ethash
}
return ethconsensusconfig.CreateConsensusEngine(&nodecfg.Config{Dirs: datadir.New(datadirCli)}, cc, consensusConfig, config.Miner.Notify, config.Miner.Noverify, config.HeimdallgRPCAddress,
config.HeimdallURL, config.WithoutHeimdall, true /* readonly */, logger)
return ethconsensusconfig.CreateConsensusEngine(&nodecfg.Config{Dirs: datadir.New(datadirCli)}, cc, consensusConfig, config.Miner.Notify, config.Miner.Noverify, heimdallClient, config.WithoutHeimdall, blockReader, true /* readonly */, logger)
}

View File

@ -431,7 +431,7 @@ func OpcodeTracer(genesis *types.Genesis, blockNum uint64, chaindata string, num
}
return nil
})
blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()))
blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()), nil /* BorSnapshots */)
chainConfig := genesis.Config
vmConfig := vm.Config{Tracer: ot, Debug: true}

View File

@ -55,7 +55,7 @@ func blocksIO(db kv.RoDB) (services.FullBlockReader, *blockio.BlockWriter) {
}); err != nil {
panic(err)
}
br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()))
br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()), nil /* BorSnapshots */)
bw := blockio.NewBlockWriter(histV3)
return br, bw
}

View File

@ -21,6 +21,7 @@ import (
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/core/vm/evmtypes"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/services"
)
@ -281,6 +282,15 @@ func (cr ChainReader) GetTd(hash libcommon.Hash, number uint64) *big.Int {
func (cr ChainReader) FrozenBlocks() uint64 {
return cr.blockReader.FrozenBlocks()
}
func (cr ChainReader) GetBlock(hash libcommon.Hash, number uint64) *types.Block {
panic("")
}
func (cr ChainReader) HasBlock(hash libcommon.Hash, number uint64) bool {
panic("")
}
func (cr ChainReader) BorEventsByBlock(hash libcommon.Hash, number uint64) []rlp.RawValue {
panic("")
}
func NewWorkersPool(lock sync.Locker, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *exec22.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int) (reconWorkers []*Worker, applyWorker *Worker, rws *exec22.ResultsQueue, clear func(), wait func()) {
reconWorkers = make([]*Worker, workerCount)

View File

@ -28,7 +28,7 @@ func blocksIO(db kv.RoDB) (services.FullBlockReader, *blockio.BlockWriter) {
}); err != nil {
panic(err)
}
br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()))
br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()), nil /* BorSnapshots */)
bw := blockio.NewBlockWriter(histV3)
return br, bw
}

View File

@ -701,7 +701,7 @@ func (c *AuRa) applyRewards(header *types.Header, state *state.IntraBlockState,
// word `signal epoch` == word `pending epoch`
func (c *AuRa) Finalize(config *chain.Config, header *types.Header, state *state.IntraBlockState, txs types.Transactions,
uncles []*types.Header, receipts types.Receipts, withdrawals []*types.Withdrawal,
chain consensus.ChainHeaderReader, syscall consensus.SystemCall, logger log.Logger,
chain consensus.ChainReader, syscall consensus.SystemCall, logger log.Logger,
) (types.Transactions, types.Receipts, error) {
if err := c.applyRewards(header, state, syscall); err != nil {
return nil, nil, err
@ -839,7 +839,7 @@ func allHeadersUntil(chain consensus.ChainHeaderReader, from *types.Header, to l
//}
// FinalizeAndAssemble implements consensus.Engine
func (c *AuRa) FinalizeAndAssemble(config *chain.Config, header *types.Header, state *state.IntraBlockState, txs types.Transactions, uncles []*types.Header, receipts types.Receipts, withdrawals []*types.Withdrawal, chain consensus.ChainHeaderReader, syscall consensus.SystemCall, call consensus.Call, logger log.Logger) (*types.Block, types.Transactions, types.Receipts, error) {
func (c *AuRa) FinalizeAndAssemble(config *chain.Config, header *types.Header, state *state.IntraBlockState, txs types.Transactions, uncles []*types.Header, receipts types.Receipts, withdrawals []*types.Withdrawal, chain consensus.ChainReader, syscall consensus.SystemCall, call consensus.Call, logger log.Logger) (*types.Block, types.Transactions, types.Receipts, error) {
outTxs, outReceipts, err := c.Finalize(config, header, state, txs, uncles, receipts, withdrawals, chain, syscall, logger)
if err != nil {
return nil, nil, nil, err

View File

@ -25,7 +25,6 @@ import (
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/consensus/bor/clerk"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall/span"
"github.com/ledgerwatch/erigon/consensus/bor/statefull"
"github.com/ledgerwatch/erigon/consensus/bor/valset"
@ -37,6 +36,7 @@ import (
"github.com/ledgerwatch/erigon/crypto/cryptopool"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/turbo/services"
)
const (
@ -234,6 +234,7 @@ type Bor struct {
chainConfig *chain.Config // Chain config
config *chain.BorConfig // Consensus engine configuration parameters for bor consensus
DB kv.RwDB // Database to store and retrieve snapshot checkpoints
blockReader services.FullBlockReader
recents *lru.ARCCache[libcommon.Hash, *Snapshot] // Snapshots for recent block to speed up reorgs
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address] // Signatures of recent blocks to speed up mining
@ -353,6 +354,7 @@ func CalculateSprint(config *chain.BorConfig, number uint64) uint64 {
func New(
chainConfig *chain.Config,
db kv.RwDB,
blockReader services.FullBlockReader,
spanner Spanner,
heimdallClient IHeimdallClient,
genesisContracts GenesisContract,
@ -373,6 +375,7 @@ func New(
chainConfig: chainConfig,
config: borConfig,
DB: db,
blockReader: blockReader,
recents: recents,
signatures: signatures,
spanner: spanner,
@ -684,7 +687,7 @@ func (c *Bor) snapshot(chain consensus.ChainHeaderReader, number uint64, hash li
headers = append(headers, header)
number, hash = number-1, header.ParentHash
if number <= chain.FrozenBlocks() {
if number < chain.FrozenBlocks() {
break
}
@ -758,7 +761,7 @@ func (c *Bor) snapshot(chain consensus.ChainHeaderReader, number uint64, hash li
return nil, err
}
c.logger.Trace("Stored snapshot to disk", "number", snap.Number, "hash", snap.Hash)
c.logger.Info("Stored proposer snapshot to disk", "number", snap.Number, "hash", snap.Hash)
}
return snap, err
@ -917,7 +920,7 @@ func (c *Bor) CalculateRewards(config *chain.Config, header *types.Header, uncle
// rewards given.
func (c *Bor) Finalize(config *chain.Config, header *types.Header, state *state.IntraBlockState,
txs types.Transactions, uncles []*types.Header, r types.Receipts, withdrawals []*types.Withdrawal,
chain consensus.ChainHeaderReader, syscall consensus.SystemCall, logger log.Logger,
chain consensus.ChainReader, syscall consensus.SystemCall, logger log.Logger,
) (types.Transactions, types.Receipts, error) {
var err error
@ -931,7 +934,7 @@ func (c *Bor) Finalize(config *chain.Config, header *types.Header, state *state.
return nil, types.Receipts{}, err
}
if c.HeimdallClient != nil {
if c.blockReader != nil {
// commit states
if err = c.CommitStates(state, header, cx, syscall); err != nil {
c.logger.Error("Error while committing states", "err", err)
@ -992,7 +995,7 @@ func (c *Bor) changeContractCodeIfNeeded(headerNumber uint64, state *state.Intra
// nor block rewards given, and returns the final block.
func (c *Bor) FinalizeAndAssemble(chainConfig *chain.Config, header *types.Header, state *state.IntraBlockState,
txs types.Transactions, uncles []*types.Header, receipts types.Receipts, withdrawals []*types.Withdrawal,
chain consensus.ChainHeaderReader, syscall consensus.SystemCall, call consensus.Call, logger log.Logger,
chain consensus.ChainReader, syscall consensus.SystemCall, call consensus.Call, logger log.Logger,
) (*types.Block, types.Transactions, types.Receipts, error) {
// stateSyncData := []*types.StateSyncData{}
@ -1329,85 +1332,12 @@ func (c *Bor) CommitStates(
chain statefull.ChainContext,
syscall consensus.SystemCall,
) error {
fetchStart := time.Now()
number := header.Number.Uint64()
var (
lastStateIDBig *big.Int
from uint64
to time.Time
err error
)
// Explicit condition for Indore fork won't be needed for fetching this
// as erigon already performs this call on the IBS (Intra block state) of
// the incoming chain.
lastStateIDBig, err = c.GenesisContractsClient.LastStateId(syscall)
if err != nil {
return err
}
if c.config.IsIndore(number) {
stateSyncDelay := c.config.CalculateStateSyncDelay(number)
to = time.Unix(int64(header.Time-stateSyncDelay), 0)
} else {
to = time.Unix(int64(chain.Chain.GetHeaderByNumber(number-c.config.CalculateSprint(number)).Time), 0)
}
lastStateID := lastStateIDBig.Uint64()
from = lastStateID + 1
c.logger.Info(
"Fetching state updates from Heimdall",
"fromID", from,
"to", to.Format(time.RFC3339),
)
eventRecords, err := c.HeimdallClient.StateSyncEvents(c.execCtx, lastStateID+1, to.Unix())
if err != nil {
return err
}
if c.config.OverrideStateSyncRecords != nil {
if val, ok := c.config.OverrideStateSyncRecords[strconv.FormatUint(number, 10)]; ok {
eventRecords = eventRecords[0:val]
}
}
fetchTime := time.Since(fetchStart)
processStart := time.Now()
chainID := c.chainConfig.ChainID.String()
for _, eventRecord := range eventRecords {
if eventRecord.ID <= lastStateID {
continue
}
if err := validateEventRecord(eventRecord, number, to, lastStateID, chainID); err != nil {
c.logger.Error("while validating event record", "block", number, "to", to, "stateID", lastStateID+1, "error", err.Error())
break
}
if err := c.GenesisContractsClient.CommitState(eventRecord, syscall); err != nil {
events := chain.Chain.BorEventsByBlock(header.Hash(), header.Number.Uint64())
for _, event := range events {
if err := c.GenesisContractsClient.CommitState(event, syscall); err != nil {
return err
}
lastStateID++
}
processTime := time.Since(processStart)
c.logger.Info("StateSyncData", "number", number, "lastStateID", lastStateID, "total records", len(eventRecords), "fetch time", fetchTime, "process time", processTime)
return nil
}
func validateEventRecord(eventRecord *clerk.EventRecordWithTime, number uint64, to time.Time, lastStateID uint64, chainID string) error {
// event id should be sequential and event.Time should lie in the range [from, to)
if lastStateID+1 != eventRecord.ID || eventRecord.ChainID != chainID || !eventRecord.Time.Before(to) {
return &InvalidStateReceivedError{number, lastStateID, &to, eventRecord}
}
return nil
}

View File

@ -107,7 +107,7 @@ func (h test_heimdall) Close() {}
type test_genesisContract struct {
}
func (g test_genesisContract) CommitState(event *clerk.EventRecordWithTime, syscall consensus.SystemCall) error {
func (g test_genesisContract) CommitState(event rlp.RawValue, syscall consensus.SystemCall) error {
return nil
}
@ -230,6 +230,7 @@ func newValidator(t *testing.T, heimdall *test_heimdall, blocks map[uint64]*type
bor := bor.New(
heimdall.chainConfig,
memdb.New(""),
nil, /* blockReader */
&spanner{span.NewChainSpanner(contract.ValidatorSet(), heimdall.chainConfig, false, logger), span.Span{}},
heimdall,
test_genesisContract{},

View File

@ -8,7 +8,6 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/accounts/abi"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/consensus/bor/clerk"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/log/v3"
)
@ -56,27 +55,8 @@ func NewGenesisContractsClient(
}
}
func (gc *GenesisContractsClient) CommitState(event *clerk.EventRecordWithTime, syscall consensus.SystemCall) error {
eventRecord := event.BuildEventRecord()
recordBytes, err := rlp.EncodeToBytes(eventRecord)
if err != nil {
return err
}
const method = "commitState"
t := event.Time.Unix()
data, err := gc.stateReceiverABI.Pack(method, big.NewInt(0).SetInt64(t), recordBytes)
if err != nil {
gc.logger.Error("Unable to pack tx for commitState", "err", err)
return err
}
gc.logger.Info("→ committing new state", "eventRecord", event.String())
_, err = syscall(gc.StateReceiverContract, data)
func (gc *GenesisContractsClient) CommitState(event rlp.RawValue, syscall consensus.SystemCall) error {
_, err := syscall(gc.StateReceiverContract, event)
return err
}

View File

@ -24,7 +24,7 @@ func NewFaker() *FakeBor {
func (f *FakeBor) Finalize(config *chain.Config, header *types.Header, state *state.IntraBlockState,
txs types.Transactions, uncles []*types.Header, r types.Receipts, withdrawals []*types.Withdrawal,
chain consensus.ChainHeaderReader, syscall consensus.SystemCall, logger log.Logger,
chain consensus.ChainReader, syscall consensus.SystemCall, logger log.Logger,
) (types.Transactions, types.Receipts, error) {
systemcontracts.UpgradeBuildInSystemContract(config, header.Number, state, logger)
return f.FakeEthash.Finalize(config, header, state, txs, uncles, r, withdrawals, chain, syscall, logger)

View File

@ -4,11 +4,11 @@ import (
"math/big"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/consensus/bor/clerk"
"github.com/ledgerwatch/erigon/rlp"
)
//go:generate mockgen -destination=./genesis_contract_mock.go -package=bor . GenesisContract
type GenesisContract interface {
CommitState(event *clerk.EventRecordWithTime, syscall consensus.SystemCall) error
CommitState(event rlp.RawValue, syscall consensus.SystemCall) error
LastStateId(syscall consensus.SystemCall) (*big.Int, error)
}

View File

@ -11,7 +11,7 @@ import (
)
type ChainContext struct {
Chain consensus.ChainHeaderReader
Chain consensus.ChainReader
Bor consensus.Engine
}

View File

@ -378,7 +378,7 @@ func (c *Clique) CalculateRewards(config *chain.Config, header *types.Header, un
// rewards given.
func (c *Clique) Finalize(config *chain.Config, header *types.Header, state *state.IntraBlockState,
txs types.Transactions, uncles []*types.Header, r types.Receipts, withdrawals []*types.Withdrawal,
chain consensus.ChainHeaderReader, syscall consensus.SystemCall, logger log.Logger,
chain consensus.ChainReader, syscall consensus.SystemCall, logger log.Logger,
) (types.Transactions, types.Receipts, error) {
// No block rewards in PoA, so the state remains as is and uncles are dropped
header.UncleHash = types.CalcUncleHash(nil)
@ -389,7 +389,7 @@ func (c *Clique) Finalize(config *chain.Config, header *types.Header, state *sta
// nor block rewards given, and returns the final block.
func (c *Clique) FinalizeAndAssemble(chainConfig *chain.Config, header *types.Header, state *state.IntraBlockState,
txs types.Transactions, uncles []*types.Header, receipts types.Receipts, withdrawals []*types.Withdrawal,
chain consensus.ChainHeaderReader, syscall consensus.SystemCall, call consensus.Call, logger log.Logger,
chain consensus.ChainReader, syscall consensus.SystemCall, call consensus.Call, logger log.Logger,
) (*types.Block, types.Transactions, types.Receipts, error) {
// No block rewards in PoA, so the state remains as is and uncles are dropped
header.UncleHash = types.CalcUncleHash(nil)

View File

@ -27,6 +27,7 @@ import (
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/log/v3"
)
@ -63,9 +64,10 @@ type ChainReader interface {
// GetBlock retrieves a block from the database by hash and number.
GetBlock(hash libcommon.Hash, number uint64) *types.Block
GetHeader(hash libcommon.Hash, number uint64) *types.Header
HasBlock(hash libcommon.Hash, number uint64) bool
BorEventsByBlock(hash libcommon.Hash, number uint64) []rlp.RawValue
}
type SystemCall func(contract libcommon.Address, data []byte) ([]byte, error)
@ -145,7 +147,7 @@ type EngineWriter interface {
// consensus rules that happen at finalization (e.g. block rewards).
Finalize(config *chain.Config, header *types.Header, state *state.IntraBlockState,
txs types.Transactions, uncles []*types.Header, receipts types.Receipts, withdrawals []*types.Withdrawal,
chain ChainHeaderReader, syscall SystemCall, logger log.Logger,
chain ChainReader, syscall SystemCall, logger log.Logger,
) (types.Transactions, types.Receipts, error)
// FinalizeAndAssemble runs any post-transaction state modifications (e.g. block
@ -155,7 +157,7 @@ type EngineWriter interface {
// consensus rules that happen at finalization (e.g. block rewards).
FinalizeAndAssemble(config *chain.Config, header *types.Header, state *state.IntraBlockState,
txs types.Transactions, uncles []*types.Header, receipts types.Receipts, withdrawals []*types.Withdrawal,
chain ChainHeaderReader, syscall SystemCall, call Call, logger log.Logger,
chain ChainReader, syscall SystemCall, call Call, logger log.Logger,
) (*types.Block, types.Transactions, types.Receipts, error)
// Seal generates a new sealing request for the given input block and pushes

View File

@ -557,7 +557,7 @@ func (ethash *Ethash) Initialize(config *chain.Config, chain consensus.ChainHead
// setting the final state on the header
func (ethash *Ethash) Finalize(config *chain.Config, header *types.Header, state *state.IntraBlockState,
txs types.Transactions, uncles []*types.Header, r types.Receipts, withdrawals []*types.Withdrawal,
chain consensus.ChainHeaderReader, syscall consensus.SystemCall, logger log.Logger,
chain consensus.ChainReader, syscall consensus.SystemCall, logger log.Logger,
) (types.Transactions, types.Receipts, error) {
// Accumulate any block and uncle rewards and commit the final state root
accumulateRewards(config, state, header, uncles)
@ -568,7 +568,7 @@ func (ethash *Ethash) Finalize(config *chain.Config, header *types.Header, state
// uncle rewards, setting the final state and assembling the block.
func (ethash *Ethash) FinalizeAndAssemble(chainConfig *chain.Config, header *types.Header, state *state.IntraBlockState,
txs types.Transactions, uncles []*types.Header, r types.Receipts, withdrawals []*types.Withdrawal,
chain consensus.ChainHeaderReader, syscall consensus.SystemCall, call consensus.Call, logger log.Logger,
chain consensus.ChainReader, syscall consensus.SystemCall, call consensus.Call, logger log.Logger,
) (*types.Block, types.Transactions, types.Receipts, error) {
// Finalize block

View File

@ -132,7 +132,7 @@ func (s *Merge) CalculateRewards(config *chain.Config, header *types.Header, unc
func (s *Merge) Finalize(config *chain.Config, header *types.Header, state *state.IntraBlockState,
txs types.Transactions, uncles []*types.Header, r types.Receipts, withdrawals []*types.Withdrawal,
chain consensus.ChainHeaderReader, syscall consensus.SystemCall, logger log.Logger,
chain consensus.ChainReader, syscall consensus.SystemCall, logger log.Logger,
) (types.Transactions, types.Receipts, error) {
if !misc.IsPoSHeader(header) {
return s.eth1Engine.Finalize(config, header, state, txs, uncles, r, withdrawals, chain, syscall, logger)
@ -164,7 +164,7 @@ func (s *Merge) Finalize(config *chain.Config, header *types.Header, state *stat
func (s *Merge) FinalizeAndAssemble(config *chain.Config, header *types.Header, state *state.IntraBlockState,
txs types.Transactions, uncles []*types.Header, receipts types.Receipts, withdrawals []*types.Withdrawal,
chain consensus.ChainHeaderReader, syscall consensus.SystemCall, call consensus.Call, logger log.Logger,
chain consensus.ChainReader, syscall consensus.SystemCall, call consensus.Call, logger log.Logger,
) (*types.Block, types.Transactions, types.Receipts, error) {
if !misc.IsPoSHeader(header) {
return s.eth1Engine.FinalizeAndAssemble(config, header, state, txs, uncles, receipts, withdrawals, chain, syscall, call, logger)

View File

@ -78,7 +78,7 @@ func ExecuteBlockEphemerally(
blockHashFunc func(n uint64) libcommon.Hash,
engine consensus.Engine, block *types.Block,
stateReader state.StateReader, stateWriter state.WriterWithChangeSets,
chainReader consensus.ChainHeaderReader, getTracer func(txIndex int, txHash libcommon.Hash) (vm.EVMLogger, error),
chainReader consensus.ChainReader, getTracer func(txIndex int, txHash libcommon.Hash) (vm.EVMLogger, error),
logger log.Logger,
) (*EphemeralExecResult, error) {
@ -285,7 +285,7 @@ func FinalizeBlockExecution(
header *types.Header, txs types.Transactions, uncles []*types.Header,
stateWriter state.WriterWithChangeSets, cc *chain.Config,
ibs *state.IntraBlockState, receipts types.Receipts,
withdrawals []*types.Withdrawal, headerReader consensus.ChainHeaderReader,
withdrawals []*types.Withdrawal, chainReader consensus.ChainReader,
isMining bool,
logger log.Logger,
) (newBlock *types.Block, newTxs types.Transactions, newReceipt types.Receipts, err error) {
@ -293,9 +293,9 @@ func FinalizeBlockExecution(
return SysCallContract(contract, data, cc, ibs, header, engine, false /* constCall */)
}
if isMining {
newBlock, newTxs, newReceipt, err = engine.FinalizeAndAssemble(cc, header, ibs, txs, uncles, receipts, withdrawals, headerReader, syscall, nil, logger)
newBlock, newTxs, newReceipt, err = engine.FinalizeAndAssemble(cc, header, ibs, txs, uncles, receipts, withdrawals, chainReader, syscall, nil, logger)
} else {
_, _, err = engine.Finalize(cc, header, ibs, txs, uncles, receipts, withdrawals, headerReader, syscall, logger)
_, _, err = engine.Finalize(cc, header, ibs, txs, uncles, receipts, withdrawals, chainReader, syscall, logger)
}
if err != nil {
return nil, nil, nil, err

View File

@ -38,6 +38,7 @@ import (
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/trie"
)
@ -652,3 +653,6 @@ func (cr *FakeChainReader) GetBlock(hash libcommon.Hash, number uint64) *types.B
func (cr *FakeChainReader) HasBlock(hash libcommon.Hash, number uint64) bool { return false }
func (cr *FakeChainReader) GetTd(hash libcommon.Hash, number uint64) *big.Int { return nil }
func (cr *FakeChainReader) FrozenBlocks() uint64 { return 0 }
func (cr *FakeChainReader) BorEventsByBlock(hash libcommon.Hash, number uint64) []rlp.RawValue {
return nil
}

View File

@ -44,6 +44,11 @@ import (
"github.com/ledgerwatch/erigon/rlp"
)
const (
spanLength = 6400 // Number of blocks in a span
zerothSpanEnd = 255 // End block of 0th span
)
// ReadCanonicalHash retrieves the hash assigned to a canonical block number.
func ReadCanonicalHash(db kv.Getter, number uint64) (libcommon.Hash, error) {
data, err := db.GetOne(kv.HeaderCanonical, hexutility.EncodeTs(number))
@ -1066,6 +1071,68 @@ func PruneBlocks(tx kv.RwTx, blockTo uint64, blocksDeleteLimit int) error {
return nil
}
// PruneBorBlocks - delete [1, to) old blocks after moving it to snapshots.
// keeps genesis in db: [1, to)
// doesn't change sequences of kv.EthTx and kv.NonCanonicalTxs
// doesn't delete Receipts, Senders, Canonical markers, TotalDifficulty
func PruneBorBlocks(tx kv.RwTx, blockTo uint64, blocksDeleteLimit int) error {
c, err := tx.Cursor(kv.BorEventNums)
if err != nil {
return err
}
defer c.Close()
var blockNumBytes [8]byte
binary.BigEndian.PutUint64(blockNumBytes[:], blockTo)
k, v, err := c.Seek(blockNumBytes[:])
if err != nil {
return err
}
var eventIdTo uint64 = math.MaxUint64
if k != nil {
eventIdTo = binary.BigEndian.Uint64(v)
}
c1, err := tx.RwCursor(kv.BorEvents)
if err != nil {
return err
}
defer c1.Close()
counter := blocksDeleteLimit
for k, _, err = c1.First(); err == nil && k != nil && counter > 0; k, _, err = c1.Next() {
eventId := binary.BigEndian.Uint64(k)
if eventId >= eventIdTo {
break
}
if err = c1.DeleteCurrent(); err != nil {
return err
}
counter--
}
if err != nil {
return err
}
var firstSpanToKeep uint64
if blockTo > zerothSpanEnd {
firstSpanToKeep = 1 + (blockTo-zerothSpanEnd-1)/spanLength
}
c2, err := tx.RwCursor(kv.BorSpans)
if err != nil {
return err
}
defer c2.Close()
counter = blocksDeleteLimit
for k, _, err := c2.First(); err == nil && k != nil && counter > 0; k, _, err = c2.Next() {
spanId := binary.BigEndian.Uint64(k)
if spanId >= firstSpanToKeep {
break
}
if err = c2.DeleteCurrent(); err != nil {
return err
}
counter--
}
return nil
}
// TruncateBlocks - delete block >= blockFrom
// does decrement sequences of kv.EthTx and kv.NonCanonicalTxs
// doesn't delete Receipts, Senders, Canonical markers, TotalDifficulty

View File

@ -109,3 +109,11 @@ func (w *BlockWriter) TruncateBodies(db kv.RoDB, tx kv.RwTx, from uint64) error
func (w *BlockWriter) PruneBlocks(ctx context.Context, tx kv.RwTx, blockTo uint64, blocksDeleteLimit int) error {
return rawdb.PruneBlocks(tx, blockTo, blocksDeleteLimit)
}
// PruneBorBlocks - [1, to) old blocks after moving it to snapshots.
// keeps genesis in db
// doesn't change sequences of kv.EthTx and kv.NonCanonicalTxs
// doesn't delete Receipts, Senders, Canonical markers, TotalDifficulty
func (w *BlockWriter) PruneBorBlocks(ctx context.Context, tx kv.RwTx, blockTo uint64, blocksDeleteLimit int) error {
return rawdb.PruneBorBlocks(tx, blockTo, blocksDeleteLimit)
}

View File

@ -164,7 +164,7 @@ func ReadBorTransaction(db kv.Tx, borTxHash libcommon.Hash) (types.Transaction,
return borTx, err
}
func ReadBorTxLookupEntry(db kv.Tx, borTxHash libcommon.Hash) (*uint64, error) {
func ReadBorTxLookupEntry(db kv.Getter, borTxHash libcommon.Hash) (*uint64, error) {
blockNumBytes, err := db.GetOne(kv.BorTxLookup, borTxHash.Bytes())
if err != nil {
return nil, err

View File

@ -98,6 +98,18 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, agg *state.AggregatorV3,
return nil
}
func ResetBorHeimdall(ctx context.Context, tx kv.RwTx) error {
if err := tx.ClearBucket(kv.BorEventNums); err != nil {
return err
}
if err := tx.ClearBucket(kv.BorEvents); err != nil {
return err
}
if err := tx.ClearBucket(kv.BorSpans); err != nil {
return err
}
return clearStageProgress(tx, stages.BorHeimdall)
}
func ResetSenders(ctx context.Context, db kv.RwDB, tx kv.RwTx) error {
if err := backup.ClearTables(ctx, db, tx, kv.Senders); err != nil {
return nil

View File

@ -88,6 +88,8 @@ import (
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/consensus/bor"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall"
"github.com/ledgerwatch/erigon/consensus/bor/heimdallgrpc"
"github.com/ledgerwatch/erigon/consensus/clique"
"github.com/ledgerwatch/erigon/consensus/ethash"
"github.com/ledgerwatch/erigon/consensus/merge"
@ -273,12 +275,6 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
logger: logger,
}
blockReader, blockWriter, allSnapshots, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config.Snapshot, config.HistoryV3, logger)
if err != nil {
return nil, err
}
backend.agg, backend.blockSnapshots, backend.blockReader, backend.blockWriter = agg, allSnapshots, blockReader, blockWriter
// Check if we have an already initialized chain and fall back to
// that if so. Otherwise we need to generate a new genesis spec.
var chainConfig *chain.Config
@ -303,6 +299,12 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
panic(err)
}
blockReader, blockWriter, allSnapshots, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config.Snapshot, config.HistoryV3, chainConfig.Bor != nil, logger)
if err != nil {
return nil, err
}
backend.agg, backend.blockSnapshots, backend.blockReader, backend.blockWriter = agg, allSnapshots, blockReader, blockWriter
backend.chainConfig = chainConfig
backend.genesisBlock = genesis
backend.genesisHash = genesis.Hash()
@ -451,9 +453,15 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
} else {
consensusConfig = &config.Ethash
}
backend.engine = ethconsensusconfig.CreateConsensusEngine(stack.Config(), chainConfig, consensusConfig, config.Miner.Notify, config.Miner.Noverify, config.HeimdallgRPCAddress, config.HeimdallURL,
config.WithoutHeimdall, false /* readonly */, logger)
var heimdallClient bor.IHeimdallClient
if chainConfig.Bor != nil && !config.WithoutHeimdall {
if config.HeimdallgRPCAddress != "" {
heimdallClient = heimdallgrpc.NewHeimdallGRPCClient(config.HeimdallgRPCAddress, logger)
} else {
heimdallClient = heimdall.NewHeimdallClient(config.HeimdallURL, logger)
}
}
backend.engine = ethconsensusconfig.CreateConsensusEngine(stack.Config(), chainConfig, consensusConfig, config.Miner.Notify, config.Miner.Noverify, heimdallClient, config.WithoutHeimdall, blockReader, false /* readonly */, logger)
inMemoryExecution := func(batch kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody,
notifications *shards.Notifications) error {
@ -463,7 +471,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
if err != nil {
return err
}
chainReader := stagedsync.NewChainReaderImpl(chainConfig, batch, blockReader)
chainReader := stagedsync.NewChainReaderImpl(chainConfig, batch, blockReader, logger)
// We start the mining step
if err := stages2.StateStep(ctx, chainReader, backend.engine, batch, backend.blockWriter, stateSync, backend.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil {
logger.Warn("Could not validate block", "err", err)
@ -675,7 +683,8 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
backend.ethBackendRPC, backend.miningRPC, backend.stateChangesClient = ethBackendRPC, miningRPC, stateDiffClient
backend.syncStages = stages2.NewDefaultStages(backend.sentryCtx, backend.chainDB, stack.Config().P2P, config, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.forkValidator, logger)
backend.syncStages = stages2.NewDefaultStages(backend.sentryCtx, backend.chainDB, stack.Config().P2P, config, backend.sentriesClient, backend.notifications, backend.downloaderClient,
blockReader, blockRetire, backend.agg, backend.forkValidator, heimdallClient, logger)
backend.syncUnwindOrder = stagedsync.DefaultUnwindOrder
backend.syncPruneOrder = stagedsync.DefaultPruneOrder
backend.stagedSync = stagedsync.New(backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger)
@ -1081,13 +1090,20 @@ func (s *Ethereum) setUpSnapDownloader(ctx context.Context, downloaderCfg *downl
return err
}
func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConfig ethconfig.BlocksFreezing, histV3 bool, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter, *freezeblocks.RoSnapshots, *libstate.AggregatorV3, error) {
func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConfig ethconfig.BlocksFreezing, histV3 bool, isBor bool, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter, *freezeblocks.RoSnapshots, *libstate.AggregatorV3, error) {
allSnapshots := freezeblocks.NewRoSnapshots(snConfig, dirs.Snap, logger)
var allBorSnapshots *freezeblocks.BorRoSnapshots
if isBor {
allBorSnapshots = freezeblocks.NewBorRoSnapshots(snConfig, dirs.Snap, logger)
}
var err error
if !snConfig.NoDownloader {
allSnapshots.OptimisticalyReopenWithDB(db)
if isBor {
allBorSnapshots.OptimisticalyReopenWithDB(db)
}
}
blockReader := freezeblocks.NewBlockReader(allSnapshots)
blockReader := freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots)
blockWriter := blockio.NewBlockWriter(histV3)
dir.MustExist(dirs.SnapHistory)
@ -1133,13 +1149,16 @@ func (s *Ethereum) Start() error {
var currentTD *big.Int
if err := s.chainDB.View(s.sentryCtx, func(tx kv.Tx) error {
h := rawdb.ReadCurrentHeader(tx)
if h == nil {
return nil
}
var err error
currentTD, err = rawdb.ReadTd(tx, h.Hash(), h.Number.Uint64())
return err
}); err != nil {
return err
}
if isChainPoS(s.chainConfig, currentTD) {
if currentTD != nil && isChainPoS(s.chainConfig, currentTD) {
go s.eth1ExecutionServer.Start(s.sentryCtx)
} else {
go stages2.StageLoop(s.sentryCtx, s.chainDB, s.stagedSync, s.sentriesClient.Hd, s.waitForStageLoopStop, s.config.Sync.LoopThrottle, s.logger, s.blockReader, hook)

View File

@ -13,9 +13,7 @@ import (
"github.com/ledgerwatch/erigon/consensus/aura"
"github.com/ledgerwatch/erigon/consensus/bor"
"github.com/ledgerwatch/erigon/consensus/bor/contract"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall/span"
"github.com/ledgerwatch/erigon/consensus/bor/heimdallgrpc"
"github.com/ledgerwatch/erigon/consensus/clique"
"github.com/ledgerwatch/erigon/consensus/ethash"
"github.com/ledgerwatch/erigon/consensus/ethash/ethashcfg"
@ -23,10 +21,11 @@ import (
"github.com/ledgerwatch/erigon/node"
"github.com/ledgerwatch/erigon/node/nodecfg"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/services"
)
func CreateConsensusEngine(nodeConfig *nodecfg.Config, chainConfig *chain.Config, config interface{}, notify []string, noVerify bool,
heimdallGrpcAddress string, heimdallUrl string, withoutHeimdall bool, readonly bool,
heimdallClient bor.IHeimdallClient, withoutHeimdall bool, blockReader services.FullBlockReader, readonly bool,
logger log.Logger,
) consensus.Engine {
var eng consensus.Engine
@ -112,17 +111,7 @@ func CreateConsensusEngine(nodeConfig *nodecfg.Config, chainConfig *chain.Config
panic(err)
}
var heimdallClient bor.IHeimdallClient
if withoutHeimdall {
return bor.New(chainConfig, db, spanner, nil, genesisContractsClient, logger)
} else {
if heimdallGrpcAddress != "" {
heimdallClient = heimdallgrpc.NewHeimdallGRPCClient(heimdallGrpcAddress, logger)
} else {
heimdallClient = heimdall.NewHeimdallClient(heimdallUrl, logger)
}
eng = bor.New(chainConfig, db, spanner, heimdallClient, genesisContractsClient, logger)
}
eng = bor.New(chainConfig, db, blockReader, spanner, heimdallClient, genesisContractsClient, logger)
}
}
@ -153,5 +142,5 @@ func CreateConsensusEngineBareBones(chainConfig *chain.Config, logger log.Logger
}
return CreateConsensusEngine(&nodecfg.Config{}, chainConfig, consensusConfig, nil /* notify */, true, /* noVerify */
"" /* heimdallGrpcAddress */, "" /* heimdallUrl */, true /* withoutHeimdall */, false /* readonly */, logger)
nil /* heimdallClient */, true /* withoutHeimdall */, nil /* blockReader */, false /* readonly */, logger)
}

View File

@ -7,6 +7,7 @@ import (
"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
@ -79,3 +80,7 @@ func (cr ChainReader) GetTd(hash libcommon.Hash, number uint64) *big.Int {
func (cr ChainReader) FrozenBlocks() uint64 {
return cr.BlockReader.FrozenBlocks()
}
func (cr ChainReader) BorEventsByBlock(hash libcommon.Hash, number uint64) []rlp.RawValue {
panic("")
}

View File

@ -9,7 +9,22 @@ import (
"github.com/ledgerwatch/log/v3"
)
func DefaultStages(ctx context.Context, snapshots SnapshotsCfg, headers HeadersCfg, blockHashCfg BlockHashesCfg, bodies BodiesCfg, senders SendersCfg, exec ExecuteBlockCfg, hashState HashStateCfg, trieCfg TrieCfg, history HistoryCfg, logIndex LogIndexCfg, callTraces CallTracesCfg, txLookup TxLookupCfg, finish FinishCfg, test bool) []*Stage {
func DefaultStages(ctx context.Context,
snapshots SnapshotsCfg,
headers HeadersCfg,
borHeimdallCfg BorHeimdallCfg,
blockHashCfg BlockHashesCfg,
bodies BodiesCfg,
senders SendersCfg,
exec ExecuteBlockCfg,
hashState HashStateCfg,
trieCfg TrieCfg,
history HistoryCfg,
logIndex LogIndexCfg,
callTraces CallTracesCfg,
txLookup TxLookupCfg,
finish FinishCfg,
test bool) []*Stage {
return []*Stage{
{
ID: stages.Snapshots,
@ -40,7 +55,23 @@ func DefaultStages(ctx context.Context, snapshots SnapshotsCfg, headers HeadersC
return HeadersUnwind(u, s, tx, headers, test)
},
Prune: func(firstCycle bool, p *PruneState, tx kv.RwTx, logger log.Logger) error {
return HeadersPrune(p, tx, headers, ctx)
return nil
},
},
{
ID: stages.BorHeimdall,
Description: "Download Bor-specific data from Heimdall",
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx, logger log.Logger) error {
if badBlockUnwind {
return nil
}
return BorHeimdallForward(s, u, ctx, tx, borHeimdallCfg, logger)
},
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx, logger log.Logger) error {
return BorHeimdallUnwind(u, ctx, s, tx, borHeimdallCfg)
},
Prune: func(firstCycle bool, p *PruneState, tx kv.RwTx, logger log.Logger) error {
return BorHeimdallPrune(p, ctx, tx, borHeimdallCfg)
},
},
{
@ -66,7 +97,7 @@ func DefaultStages(ctx context.Context, snapshots SnapshotsCfg, headers HeadersC
return UnwindBodiesStage(u, tx, bodies, ctx)
},
Prune: func(firstCycle bool, p *PruneState, tx kv.RwTx, logger log.Logger) error {
return PruneBodiesStage(p, tx, bodies, ctx)
return nil
},
},
{
@ -476,6 +507,7 @@ func StateStages(ctx context.Context, headers HeadersCfg, bodies BodiesCfg, bloc
var DefaultForwardOrder = UnwindOrder{
stages.Snapshots,
stages.Headers,
stages.BorHeimdall,
stages.BlockHashes,
stages.Bodies,
@ -516,6 +548,7 @@ var DefaultUnwindOrder = UnwindOrder{
stages.Bodies,
stages.BlockHashes,
stages.BorHeimdall,
stages.Headers,
}
@ -544,6 +577,7 @@ var StateUnwindOrder = UnwindOrder{
stages.Execution,
stages.Senders,
stages.Bodies,
stages.BorHeimdall,
stages.BlockHashes,
stages.Headers,
}
@ -556,7 +590,7 @@ var DefaultPruneOrder = PruneOrder{
stages.AccountHistoryIndex,
stages.CallTraces,
// Unwinding of IHashes needs to happen after unwinding HashState
// Pruning of IHashes needs to happen after pruning HashState
stages.HashState,
stages.IntermediateHashes,
@ -565,6 +599,7 @@ var DefaultPruneOrder = PruneOrder{
stages.Bodies,
stages.BlockHashes,
stages.BorHeimdall,
stages.Headers,
stages.Snapshots,
}

View File

@ -349,21 +349,3 @@ func UnwindBodiesStage(u *UnwindState, tx kv.RwTx, cfg BodiesCfg, ctx context.Co
}
return nil
}
func PruneBodiesStage(s *PruneState, tx kv.RwTx, cfg BodiesCfg, ctx context.Context) (err error) {
useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
if !useExternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,349 @@
package stagedsync
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"math/big"
"strconv"
"time"
"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/accounts/abi"
"github.com/ledgerwatch/erigon/consensus/bor"
"github.com/ledgerwatch/erigon/consensus/bor/contract"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
)
const (
spanLength = 6400 // Number of blocks in a span
zerothSpanEnd = 255 // End block of 0th span
)
type BorHeimdallCfg struct {
db kv.RwDB
chainConfig chain.Config
heimdallClient bor.IHeimdallClient
blockReader services.FullBlockReader
stateReceiverABI abi.ABI
}
func StageBorHeimdallCfg(
db kv.RwDB,
chainConfig chain.Config,
heimdallClient bor.IHeimdallClient,
blockReader services.FullBlockReader,
) BorHeimdallCfg {
return BorHeimdallCfg{
db: db,
chainConfig: chainConfig,
heimdallClient: heimdallClient,
blockReader: blockReader,
stateReceiverABI: contract.StateReceiver(),
}
}
func BorHeimdallForward(
s *StageState,
u Unwinder,
ctx context.Context,
tx kv.RwTx,
cfg BorHeimdallCfg,
logger log.Logger,
) (err error) {
if cfg.chainConfig.Bor == nil {
return
}
if cfg.heimdallClient == nil {
return
}
useExternalTx := tx != nil
if !useExternalTx {
var err error
tx, err = cfg.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
headNumber, err := stages.GetStageProgress(tx, stages.Headers)
if err != nil {
return fmt.Errorf("getting headers progress: %w", err)
}
if s.BlockNumber == headNumber {
return nil
}
// Find out the latest event Id
cursor, err := tx.Cursor(kv.BorEvents)
if err != nil {
return err
}
defer cursor.Close()
k, _, err := cursor.Last()
if err != nil {
return err
}
var lastEventId uint64
if k != nil {
lastEventId = binary.BigEndian.Uint64(k)
}
type LastFrozenEvent interface {
LastFrozenEventID() uint64
}
snapshotLastEventId := cfg.blockReader.(LastFrozenEvent).LastFrozenEventID()
if snapshotLastEventId > lastEventId {
lastEventId = snapshotLastEventId
}
lastBlockNum := s.BlockNumber
if cfg.blockReader.FrozenBorBlocks() > lastBlockNum {
lastBlockNum = cfg.blockReader.FrozenBorBlocks()
}
for blockNum := lastBlockNum + 1; blockNum <= headNumber; blockNum++ {
if blockNum%cfg.chainConfig.Bor.CalculateSprint(blockNum) == 0 {
if lastEventId, err = fetchAndWriteBorEvents(ctx, cfg.blockReader, cfg.chainConfig.Bor, blockNum, lastEventId, cfg.chainConfig.ChainID.String(), tx, cfg.heimdallClient, cfg.stateReceiverABI, s.LogPrefix(), logger); err != nil {
return err
}
}
if blockNum == 1 || (blockNum > zerothSpanEnd && ((blockNum-zerothSpanEnd-1)%spanLength) == 0) {
if err = fetchAndWriteSpans(ctx, blockNum, tx, cfg.heimdallClient, s.LogPrefix(), logger); err != nil {
return err
}
}
}
if err = s.Update(tx, headNumber); err != nil {
return err
}
if !useExternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return
}
func fetchAndWriteBorEvents(
ctx context.Context,
blockReader services.FullBlockReader,
config *chain.BorConfig,
blockNum uint64,
lastEventId uint64,
chainID string,
tx kv.RwTx,
heimdallClient bor.IHeimdallClient,
stateReceiverABI abi.ABI,
logPrefix string,
logger log.Logger,
) (uint64, error) {
fetchStart := time.Now()
header, err := blockReader.HeaderByNumber(ctx, tx, blockNum)
if err != nil {
return lastEventId, err
}
// Find out the latest eventId
var (
from uint64
to time.Time
)
if config.IsIndore(blockNum) {
stateSyncDelay := config.CalculateStateSyncDelay(blockNum)
to = time.Unix(int64(header.Time-stateSyncDelay), 0)
} else {
pHeader, err := blockReader.HeaderByNumber(ctx, tx, blockNum-config.CalculateSprint(blockNum))
if err != nil {
return lastEventId, err
}
to = time.Unix(int64(pHeader.Time), 0)
}
from = lastEventId + 1
logger.Info(
fmt.Sprintf("[%s] Fetching state updates from Heimdall", logPrefix),
"fromID", from,
"to", to.Format(time.RFC3339),
)
eventRecords, err := heimdallClient.StateSyncEvents(ctx, from, to.Unix())
if err != nil {
return lastEventId, err
}
if config.OverrideStateSyncRecords != nil {
if val, ok := config.OverrideStateSyncRecords[strconv.FormatUint(blockNum, 10)]; ok {
eventRecords = eventRecords[0:val]
}
}
fetchTime := time.Since(fetchStart)
processStart := time.Now()
if len(eventRecords) > 0 {
var key, val [8]byte
binary.BigEndian.PutUint64(key[:], blockNum)
binary.BigEndian.PutUint64(val[:], lastEventId+1)
}
const method = "commitState"
wroteIndex := false
for _, eventRecord := range eventRecords {
if eventRecord.ID <= lastEventId {
continue
}
if lastEventId+1 != eventRecord.ID || eventRecord.ChainID != chainID || !eventRecord.Time.Before(to) {
return lastEventId, fmt.Errorf("invalid event record received blockNum=%d, eventId=%d (exp %d), chainId=%s (exp %s), time=%s (exp to %s)", blockNum, eventRecord.ID, lastEventId+1, eventRecord.ChainID, chainID, eventRecord.Time, to)
}
eventRecordWithoutTime := eventRecord.BuildEventRecord()
recordBytes, err := rlp.EncodeToBytes(eventRecordWithoutTime)
if err != nil {
return lastEventId, err
}
data, err := stateReceiverABI.Pack(method, big.NewInt(eventRecord.Time.Unix()), recordBytes)
if err != nil {
logger.Error(fmt.Sprintf("[%s] Unable to pack tx for commitState", logPrefix), "err", err)
return lastEventId, err
}
var eventIdBuf [8]byte
binary.BigEndian.PutUint64(eventIdBuf[:], eventRecord.ID)
if err = tx.Put(kv.BorEvents, eventIdBuf[:], data); err != nil {
return lastEventId, err
}
if !wroteIndex {
var blockNumBuf [8]byte
binary.BigEndian.PutUint64(blockNumBuf[:], blockNum)
binary.BigEndian.PutUint64(eventIdBuf[:], eventRecord.ID)
if err = tx.Put(kv.BorEventNums, blockNumBuf[:], eventIdBuf[:]); err != nil {
return lastEventId, err
}
wroteIndex = true
}
lastEventId++
}
processTime := time.Since(processStart)
logger.Info(fmt.Sprintf("[%s] StateSyncData", logPrefix), "number", blockNum, "lastEventID", lastEventId, "total records", len(eventRecords), "fetch time", fetchTime, "process time", processTime)
return lastEventId, nil
}
func fetchAndWriteSpans(
ctx context.Context,
blockNum uint64,
tx kv.RwTx,
heimdallClient bor.IHeimdallClient,
logPrefix string,
logger log.Logger,
) error {
var spanID uint64
if blockNum > zerothSpanEnd {
spanID = 1 + (blockNum-zerothSpanEnd-1)/spanLength
}
logger.Info(fmt.Sprintf("[%s] Fetching span", logPrefix), "id", spanID)
response, err := heimdallClient.Span(ctx, spanID)
if err != nil {
return err
}
spanBytes, err := json.Marshal(response)
if err != nil {
return err
}
var spanIDBytes [8]byte
binary.BigEndian.PutUint64(spanIDBytes[:], spanID)
if err = tx.Put(kv.BorSpans, spanIDBytes[:], spanBytes); err != nil {
return err
}
return nil
}
func BorHeimdallUnwind(u *UnwindState, ctx context.Context, s *StageState, tx kv.RwTx, cfg BorHeimdallCfg) (err error) {
if cfg.chainConfig.Bor == nil {
return
}
useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
cursor, err := tx.RwCursor(kv.BorEventNums)
if err != nil {
return err
}
defer cursor.Close()
var blockNumBuf [8]byte
binary.BigEndian.PutUint64(blockNumBuf[:], u.UnwindPoint+1)
k, v, err := cursor.Seek(blockNumBuf[:])
if err != nil {
return err
}
if k != nil {
// v is the encoding of the first eventId to be removed
eventCursor, err := tx.RwCursor(kv.BorEvents)
if err != nil {
return err
}
defer eventCursor.Close()
for v, _, err = eventCursor.Seek(v); err == nil && v != nil; v, _, err = eventCursor.Next() {
if err = eventCursor.DeleteCurrent(); err != nil {
return err
}
}
if err != nil {
return err
}
}
for ; err == nil && k != nil; k, _, err = cursor.Next() {
if err = cursor.DeleteCurrent(); err != nil {
return err
}
}
if err != nil {
return err
}
// Removing spans
spanCursor, err := tx.RwCursor(kv.BorSpans)
if err != nil {
return err
}
defer spanCursor.Close()
var lastSpanToKeep uint64
if u.UnwindPoint > zerothSpanEnd {
lastSpanToKeep = 1 + (u.UnwindPoint-zerothSpanEnd-1)/spanLength
}
var spanIdBytes [8]byte
binary.BigEndian.PutUint64(spanIdBytes[:], lastSpanToKeep+1)
for k, _, err = spanCursor.Seek(spanIdBytes[:]); err == nil && k != nil; k, _, err = spanCursor.Next() {
if err = spanCursor.DeleteCurrent(); err != nil {
return err
}
}
if err = u.Done(tx); err != nil {
return err
}
if !useExternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return
}
func BorHeimdallPrune(s *PruneState, ctx context.Context, tx kv.RwTx, cfg BorHeimdallCfg) (err error) {
if cfg.chainConfig.Bor == nil {
return
}
return
}

View File

@ -168,7 +168,7 @@ func executeBlock(
var execRs *core.EphemeralExecResult
getHashFn := core.GetHashFn(block.Header(), getHeader)
execRs, err = core.ExecuteBlockEphemerally(cfg.chainConfig, &vmConfig, getHashFn, cfg.engine, block, stateReader, stateWriter, ChainReaderImpl{config: cfg.chainConfig, tx: tx, blockReader: cfg.blockReader}, getTracer, logger)
execRs, err = core.ExecuteBlockEphemerally(cfg.chainConfig, &vmConfig, getHashFn, cfg.engine, block, stateReader, stateWriter, NewChainReaderImpl(cfg.chainConfig, tx, cfg.blockReader, logger), getTracer, logger)
if err != nil {
return err
}

View File

@ -481,12 +481,13 @@ func logProgressHeaders(logPrefix string, prev, now uint64, logger log.Logger) u
type ChainReaderImpl struct {
config *chain.Config
tx kv.Getter
tx kv.Tx
blockReader services.FullBlockReader
logger log.Logger
}
func NewChainReaderImpl(config *chain.Config, tx kv.Getter, blockReader services.FullBlockReader) *ChainReaderImpl {
return &ChainReaderImpl{config, tx, blockReader}
func NewChainReaderImpl(config *chain.Config, tx kv.Tx, blockReader services.FullBlockReader, logger log.Logger) *ChainReaderImpl {
return &ChainReaderImpl{config, tx, blockReader, logger}
}
func (cr ChainReaderImpl) Config() *chain.Config { return cr.config }
@ -520,30 +521,25 @@ func (cr ChainReaderImpl) GetHeaderByHash(hash libcommon.Hash) *types.Header {
func (cr ChainReaderImpl) GetTd(hash libcommon.Hash, number uint64) *big.Int {
td, err := rawdb.ReadTd(cr.tx, hash, number)
if err != nil {
log.Error("ReadTd failed", "err", err)
cr.logger.Error("ReadTd failed", "err", err)
return nil
}
return td
}
func (cr ChainReaderImpl) FrozenBlocks() uint64 {
return cr.blockReader.FrozenBlocks()
}
func HeadersPrune(p *PruneState, tx kv.RwTx, cfg HeadersCfg, ctx context.Context) (err error) {
useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
if !useExternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
func (cr ChainReaderImpl) GetBlock(hash libcommon.Hash, number uint64) *types.Block {
panic("")
}
func (cr ChainReaderImpl) HasBlock(hash libcommon.Hash, number uint64) bool {
panic("")
}
func (cr ChainReaderImpl) BorEventsByBlock(hash libcommon.Hash, number uint64) []rlp.RawValue {
events, err := cr.blockReader.EventsByBlock(context.Background(), cr.tx, hash, number)
if err != nil {
cr.logger.Error("BorEventsByBlock failed", "err", err)
return nil
}
return events
}

View File

@ -80,7 +80,7 @@ func TestAccountAndStorageTrie(t *testing.T) {
// ----------------------------------------------------------------
historyV3 := false
blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()))
blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()))
cfg := stagedsync.StageTrieCfg(db, false, true, false, t.TempDir(), blockReader, nil, historyV3, nil)
_, err := stagedsync.RegenerateIntermediateHashes("IH", tx, cfg, libcommon.Hash{} /* expectedRootHash */, ctx, log.New())
assert.Nil(t, err)
@ -202,7 +202,7 @@ func TestAccountTrieAroundExtensionNode(t *testing.T) {
hash6 := libcommon.HexToHash("0x3100000000000000000000000000000000000000000000000000000000000000")
assert.Nil(t, tx.Put(kv.HashedAccounts, hash6[:], encoded))
blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()))
blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()))
_, err := stagedsync.RegenerateIntermediateHashes("IH", tx, stagedsync.StageTrieCfg(db, false, true, false, t.TempDir(), blockReader, nil, historyV3, nil), libcommon.Hash{} /* expectedRootHash */, ctx, log.New())
assert.Nil(t, err)
@ -265,7 +265,7 @@ func TestStorageDeletion(t *testing.T) {
// Populate account & storage trie DB tables
// ----------------------------------------------------------------
historyV3 := false
blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()))
blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()))
cfg := stagedsync.StageTrieCfg(db, false, true, false, t.TempDir(), blockReader, nil, historyV3, nil)
_, err = stagedsync.RegenerateIntermediateHashes("IH", tx, cfg, libcommon.Hash{} /* expectedRootHash */, ctx, log.New())
assert.Nil(t, err)
@ -384,7 +384,7 @@ func TestHiveTrieRoot(t *testing.T) {
common.FromHex("02081bc16d674ec80000")))
historyV3 := false
blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()))
blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()))
cfg := stagedsync.StageTrieCfg(db, false, true, false, t.TempDir(), blockReader, nil, historyV3, nil)
logger := log.New()
_, err := stagedsync.RegenerateIntermediateHashes("IH", tx, cfg, libcommon.Hash{} /* expectedRootHash */, ctx, logger)

View File

@ -287,7 +287,7 @@ func SnapshotsPrune(s *PruneState, initialCycle bool, cfg SnapshotsCfg, ctx cont
freezingCfg := cfg.blockReader.FreezingCfg()
if freezingCfg.Enabled {
if err := cfg.blockRetire.PruneAncientBlocks(tx, 100); err != nil {
if err := cfg.blockRetire.PruneAncientBlocks(tx, 100, cfg.chainConfig.Bor != nil); err != nil {
return err
}
}
@ -299,7 +299,7 @@ func SnapshotsPrune(s *PruneState, initialCycle bool, cfg SnapshotsCfg, ctx cont
}
}
cfg.blockRetire.RetireBlocksInBackground(ctx, s.ForwardProgress, log.LvlDebug, func(downloadRequest []services.DownloadRequest) error {
cfg.blockRetire.RetireBlocksInBackground(ctx, s.ForwardProgress, cfg.chainConfig.Bor != nil, log.LvlInfo, func(downloadRequest []services.DownloadRequest) error {
if cfg.snapshotDownloader != nil && !reflect.ValueOf(cfg.snapshotDownloader).IsNil() {
if err := snapshotsync.RequestSnapshotsDownload(ctx, downloadRequest, cfg.snapshotDownloader); err != nil {
return err

View File

@ -31,6 +31,7 @@ type SyncStage string
var (
Snapshots SyncStage = "Snapshots" // Snapshots
Headers SyncStage = "Headers" // Headers are downloaded, their Proof-Of-Work validity and chaining is verified
BorHeimdall SyncStage = "BorHeimdall" // Downloading data from heimdall corresponding to the downloaded headers (validator sets and sync events)
CumulativeIndex SyncStage = "CumulativeIndex" // Calculate how much gas has been used up to each block.
BlockHashes SyncStage = "BlockHashes" // Headers Number are written, fills blockHash => number bucket
Bodies SyncStage = "Bodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified
@ -61,6 +62,7 @@ var (
var AllStages = []SyncStage{
Snapshots,
Headers,
BorHeimdall,
BlockHashes,
Bodies,
Senders,

View File

@ -243,3 +243,19 @@ func (s *EthBackendServer) SubscribeLogs(server remote.ETHBACKEND_SubscribeLogsS
}
return fmt.Errorf("no logs filter available")
}
func (s *EthBackendServer) BorEvent(ctx context.Context, req *remote.BorEventRequest) (*remote.BorEventReply, error) {
tx, err := s.db.BeginRo(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()
_, ok, err := s.blockReader.EventLookup(ctx, tx, gointerfaces.ConvertH256ToHash(req.BorTxHash))
if err != nil {
return nil, err
}
if !ok {
return &remote.BorEventReply{}, nil
}
return &remote.BorEventReply{}, nil
}

6
go.mod
View File

@ -3,8 +3,8 @@ module github.com/ledgerwatch/erigon
go 1.19
require (
github.com/ledgerwatch/erigon-lib v0.0.0-20230818133620-f84bd53b26e0
github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230810173239-feb52fae58d9
github.com/ledgerwatch/erigon-lib v0.0.0-20230818153309-3aa5249d48c1
github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230818153427-cc16b83a89be
github.com/ledgerwatch/log/v3 v3.8.0
github.com/ledgerwatch/secp256k1 v1.0.0
github.com/ledgerwatch/trackerslist v1.1.0 // indirect
@ -169,7 +169,6 @@ require (
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/ledgerwatch/interfaces v0.0.0-20230811182153-2fcb75060567 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
@ -183,7 +182,6 @@ require (
github.com/lispad/go-generics-tools v1.1.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/matryer/moq v0.3.2 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect

12
go.sum
View File

@ -499,12 +499,10 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20230818133620-f84bd53b26e0 h1:gEjFNL301q023CQhbDtzQM9MSHZeq7yMAeVRcadn2Ns=
github.com/ledgerwatch/erigon-lib v0.0.0-20230818133620-f84bd53b26e0/go.mod h1:5pMHwQYRLkJXjyyfC/OhVLqUN3/0n4y1d6wYGnoWP3g=
github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230810173239-feb52fae58d9 h1:fG8PozTh9rKBRtWwZsoCA8kJ0M/B6SiG4Vo1sF29Inw=
github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230810173239-feb52fae58d9/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/interfaces v0.0.0-20230811182153-2fcb75060567 h1:ZZGeye8uJaIYvOmI2TbAdV5Oo9j8+SA4dXlK6y3GJsY=
github.com/ledgerwatch/interfaces v0.0.0-20230811182153-2fcb75060567/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/erigon-lib v0.0.0-20230818153309-3aa5249d48c1 h1:P6+hfBUKVvLuUyaAQtYn9s0w9XJC+KMrk+9Pbjnk8R8=
github.com/ledgerwatch/erigon-lib v0.0.0-20230818153309-3aa5249d48c1/go.mod h1:6GbsxaQafoXa3G2Q69PtXkQI6LRALylcnmKMDMtvV24=
github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230818153427-cc16b83a89be h1:6/4MXkk5AoKUHivIpCokHOX/WV9L7tXgURp1k8KfmSM=
github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230818153427-cc16b83a89be/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.8.0 h1:gCpp7uGtIerEz1jKVPeDnbIopFPud9ZnCpBLlLBGqPU=
github.com/ledgerwatch/log/v3 v3.8.0/go.mod h1:J2Jl6zV/58LeA6LTaVVnCGyf1/cYYSEOOLHY4ZN8S2A=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
@ -548,8 +546,6 @@ github.com/maticnetwork/crand v1.0.2 h1:Af0tAivC8zrxXDpGWNWVT/0s1fOz8w0eRbahZgUR
github.com/maticnetwork/crand v1.0.2/go.mod h1:/NRNL3bj2eYdqpWmoIP5puxndTpi0XRxpj5ZKxfHjyg=
github.com/maticnetwork/polyproto v0.0.2 h1:cPxuxbIDItdwGnucc3lZB58U8Zfe1mH73PWTGd15554=
github.com/maticnetwork/polyproto v0.0.2/go.mod h1:e1mU2EXSwEpn5jM7GfNwu3AupsV6WAGoPFFfswXOF0o=
github.com/matryer/moq v0.3.2 h1:z7oltmpTxiQ9nKNg0Jc7z45TM+eO7OhCVohxRxwaudM=
github.com/matryer/moq v0.3.2/go.mod h1:RJ75ZZZD71hejp39j4crZLsEDszGk6iH4v4YsWFKH4s=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=

View File

@ -413,7 +413,7 @@ func doRetireCommand(cliCtx *cli.Context) error {
if err := snapshots.ReopenFolder(); err != nil {
return err
}
blockReader := freezeblocks.NewBlockReader(snapshots)
blockReader := freezeblocks.NewBlockReader(snapshots, nil /* borSnapshots */)
blockWriter := blockio.NewBlockWriter(fromdb.HistV3(db))
br := freezeblocks.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs, blockReader, blockWriter, db, nil, logger)
@ -450,7 +450,7 @@ func doRetireCommand(cliCtx *cli.Context) error {
return err
}
for j := 0; j < 10_000; j++ { // prune happens by small steps, so need many runs
if err := br.PruneAncientBlocks(tx, 100); err != nil {
if err := br.PruneAncientBlocks(tx, 100, false /* includeBor */); err != nil {
return err
}
}

View File

@ -3,6 +3,7 @@ package jsonrpc
import (
"bytes"
"context"
"fmt"
"math/big"
"github.com/ledgerwatch/erigon-lib/common"
@ -37,15 +38,11 @@ func (api *APIImpl) GetTransactionByHash(ctx context.Context, txnHash common.Has
}
// Private API returns 0 if transaction is not found.
if blockNum == 0 && chainConfig.Bor != nil {
blockNumPtr, err := rawdb.ReadBorTxLookupEntry(tx, txnHash)
blockNum, ok, err = api._blockReader.EventLookup(ctx, tx, txnHash)
if err != nil {
return nil, err
}
ok = blockNumPtr != nil
if ok {
blockNum = *blockNumPtr
}
fmt.Printf("Found block num %d, ok %t\n", blockNum, ok)
}
if ok {
block, err := api.blockByNumberWithSenders(ctx, tx, blockNum)
@ -77,10 +74,7 @@ func (api *APIImpl) GetTransactionByHash(ctx context.Context, txnHash common.Has
if chainConfig.Bor == nil {
return nil, nil
}
borTx := rawdb.ReadBorTransactionForBlock(tx, blockNum)
if borTx == nil {
return nil, nil
}
borTx := types2.NewBorTransaction()
return newRPCBorTransaction(borTx, txnHash, blockHash, blockNum, uint64(len(block.Transactions())), baseFee, chainConfig.ChainID), nil
}

View File

@ -896,7 +896,7 @@ func (api *TraceAPIImpl) callManyTransactions(
return nil, nil, err
}
engine := api.engine()
consensusHeaderReader := stagedsync.NewChainReaderImpl(cfg, dbtx, nil)
consensusHeaderReader := stagedsync.NewChainReaderImpl(cfg, dbtx, nil, nil)
err = core.InitializeBlockExecution(engine.(consensus.Engine), consensusHeaderReader, block.HeaderNoCopy(), cfg, initialState)
if err != nil {
return nil, nil, err

View File

@ -33,6 +33,11 @@ type HeaderReader interface {
HeadersRange(ctx context.Context, walker func(header *types.Header) error) error
}
type BorEventReader interface {
EventLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error)
EventsByBlock(ctx context.Context, tx kv.Tx, hash common.Hash, blockNum uint64) ([]rlp.RawValue, error)
}
type CanonicalReader interface {
CanonicalHash(ctx context.Context, tx kv.Getter, blockNum uint64) (common.Hash, error)
BadHeaderNumber(ctx context.Context, tx kv.Getter, hash common.Hash) (blockHeight *uint64, err error)
@ -65,15 +70,18 @@ type FullBlockReader interface {
BlockReader
BodyReader
HeaderReader
BorEventReader
TxnReader
CanonicalReader
FrozenBlocks() uint64
FrozenBorBlocks() uint64
FrozenFiles() (list []string)
FreezingCfg() ethconfig.BlocksFreezing
CanPruneTo(currentBlockInDB uint64) (canPruneBlocksTo uint64)
Snapshots() BlockSnapshots
BorSnapshots() BlockSnapshots
}
type BlockSnapshots interface {
@ -84,8 +92,8 @@ type BlockSnapshots interface {
// BlockRetire - freezing blocks: moving old data from DB to snapshot files
type BlockRetire interface {
PruneAncientBlocks(tx kv.RwTx, limit int) error
RetireBlocksInBackground(ctx context.Context, maxBlockNumInDB uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []DownloadRequest) error)
PruneAncientBlocks(tx kv.RwTx, limit int, includeBor bool) error
RetireBlocksInBackground(ctx context.Context, maxBlockNumInDB uint64, includeBor bool, lvl log.Lvl, seedNewSnapshots func(downloadRequest []DownloadRequest) error)
HasNewFrozenFiles() bool
BuildMissedIndicesIfNeed(ctx context.Context, logPrefix string, notifier DBEventNotifier, cc *chain.Config) error
}
@ -114,10 +122,11 @@ type DownloadRequest struct {
Ranges *Range
Path string
TorrentHash string
Bor bool
}
func NewDownloadRequest(ranges *Range, path string, torrentHash string) DownloadRequest {
return DownloadRequest{Ranges: ranges, Path: path, TorrentHash: torrentHash}
func NewDownloadRequest(ranges *Range, path string, torrentHash string, bor bool) DownloadRequest {
return DownloadRequest{Ranges: ranges, Path: path, TorrentHash: torrentHash, Bor: bor}
}
type Range struct {

View File

@ -3,10 +3,14 @@ package freezeblocks
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math"
"sort"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
@ -81,7 +85,9 @@ func (r *RemoteBlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, bl
}
func (r *RemoteBlockReader) Snapshots() services.BlockSnapshots { panic("not implemented") }
func (r *RemoteBlockReader) BorSnapshots() services.BlockSnapshots { panic("not implemented") }
func (r *RemoteBlockReader) FrozenBlocks() uint64 { panic("not supported") }
func (r *RemoteBlockReader) FrozenBorBlocks() uint64 { panic("not supported") }
func (r *RemoteBlockReader) FrozenFiles() (list []string) { panic("not supported") }
func (r *RemoteBlockReader) FreezingCfg() ethconfig.BlocksFreezing { panic("not supported") }
@ -205,22 +211,55 @@ func (r *RemoteBlockReader) BodyRlp(ctx context.Context, tx kv.Getter, hash comm
return bodyRlp, nil
}
// BlockReader can read blocks from db and snapshots
type BlockReader struct {
sn *RoSnapshots
TransactionsV3 bool
func (r *RemoteBlockReader) EventLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error) {
reply, err := r.client.BorEvent(ctx, &remote.BorEventRequest{BorTxHash: gointerfaces.ConvertHashToH256(txnHash)})
if err != nil {
return 0, false, err
}
if reply == nil || len(reply.EventRlps) == 0 {
return 0, false, nil
}
return reply.BlockNumber, true, nil
}
func NewBlockReader(snapshots services.BlockSnapshots) *BlockReader {
return &BlockReader{sn: snapshots.(*RoSnapshots), TransactionsV3: true}
func (r *RemoteBlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.Hash, blockHeight uint64) ([]rlp.RawValue, error) {
borTxnHash := types.ComputeBorTxHash(blockHeight, hash)
reply, err := r.client.BorEvent(ctx, &remote.BorEventRequest{BorTxHash: gointerfaces.ConvertHashToH256(borTxnHash)})
if err != nil {
return nil, err
}
result := make([]rlp.RawValue, len(reply.EventRlps))
for i, r := range reply.EventRlps {
result[i] = rlp.RawValue(r)
}
return result, nil
}
// BlockReader can read blocks from db and snapshots
type BlockReader struct {
sn *RoSnapshots
borSn *BorRoSnapshots
}
func NewBlockReader(snapshots services.BlockSnapshots, borSnapshots services.BlockSnapshots) *BlockReader {
return &BlockReader{sn: snapshots.(*RoSnapshots), borSn: borSnapshots.(*BorRoSnapshots)}
}
func (r *BlockReader) CanPruneTo(currentBlockInDB uint64) uint64 {
return CanDeleteTo(currentBlockInDB, r.sn.BlocksAvailable())
}
func (r *BlockReader) Snapshots() services.BlockSnapshots { return r.sn }
func (r *BlockReader) BorSnapshots() services.BlockSnapshots { return r.borSn }
func (r *BlockReader) FrozenBlocks() uint64 { return r.sn.BlocksAvailable() }
func (r *BlockReader) FrozenFiles() []string { return r.sn.Files() }
func (r *BlockReader) FrozenBorBlocks() uint64 { return r.borSn.BlocksAvailable() }
func (r *BlockReader) FrozenFiles() []string {
files := r.sn.Files()
if r.borSn != nil {
files = append(files, r.borSn.Files()...)
}
sort.Strings(files)
return files
}
func (r *BlockReader) FreezingCfg() ethconfig.BlocksFreezing { return r.sn.Cfg() }
func (r *BlockReader) HeadersRange(ctx context.Context, walker func(header *types.Header) error) error {
@ -888,3 +927,140 @@ func (r *BlockReader) ReadAncestor(db kv.Getter, hash common.Hash, number, ances
}
return hash, number
}
func (r *BlockReader) EventLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error) {
n, err := rawdb.ReadBorTxLookupEntry(tx, txnHash)
if err != nil {
return 0, false, err
}
if n != nil {
return *n, true, nil
}
view := r.borSn.View()
defer view.Close()
blockNum, ok, err := r.borBlockByEventHash(txnHash, view.Events(), nil)
if err != nil {
return 0, false, err
}
if !ok {
return 0, false, nil
}
return blockNum, true, nil
}
func (r *BlockReader) borBlockByEventHash(txnHash common.Hash, segments []*BorEventSegment, buf []byte) (blockNum uint64, ok bool, err error) {
for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
if sn.IdxBorTxnHash == nil {
continue
}
reader := recsplit.NewIndexReader(sn.IdxBorTxnHash)
blockEventId := reader.Lookup(txnHash[:])
offset := sn.IdxBorTxnHash.OrdinalLookup(blockEventId)
gg := sn.seg.MakeGetter()
gg.Reset(offset)
if !gg.MatchPrefix(txnHash[:]) {
continue
}
buf, _ = gg.Next(buf[:0])
blockNum = binary.BigEndian.Uint64(buf[length.Hash:])
ok = true
return
}
return
}
func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.Hash, blockHeight uint64) ([]rlp.RawValue, error) {
if blockHeight >= r.FrozenBorBlocks() {
c, err := tx.Cursor(kv.BorEventNums)
if err != nil {
return nil, err
}
defer c.Close()
var k, v []byte
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], blockHeight)
result := []rlp.RawValue{}
if k, v, err = c.Seek(buf[:]); err != nil {
return nil, err
}
if !bytes.Equal(k, buf[:]) {
return result, nil
}
startEventId := binary.BigEndian.Uint64(v)
var endEventId uint64
if k, v, err = c.Next(); err != nil {
return nil, err
}
if k == nil {
endEventId = math.MaxUint64
} else {
endEventId = binary.BigEndian.Uint64(v)
}
c1, err := tx.Cursor(kv.BorEvents)
if err != nil {
return nil, err
}
defer c1.Close()
binary.BigEndian.PutUint64(buf[:], startEventId)
for k, v, err = c1.Seek(buf[:]); err == nil && k != nil; k, v, err = c1.Next() {
eventId := binary.BigEndian.Uint64(k)
if eventId >= endEventId {
break
}
result = append(result, rlp.RawValue(common.Copy(v)))
}
if err != nil {
return nil, err
}
return result, nil
}
borTxHash := types.ComputeBorTxHash(blockHeight, hash)
view := r.borSn.View()
defer view.Close()
segments := view.Events()
var buf []byte
result := []rlp.RawValue{}
for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
if sn.ranges.from > blockHeight {
continue
}
if sn.ranges.to <= blockHeight {
continue
}
if sn.IdxBorTxnHash == nil {
continue
}
reader := recsplit.NewIndexReader(sn.IdxBorTxnHash)
blockEventId := reader.Lookup(borTxHash[:])
offset := sn.IdxBorTxnHash.OrdinalLookup(blockEventId)
gg := sn.seg.MakeGetter()
gg.Reset(offset)
for gg.HasNext() && gg.MatchPrefix(borTxHash[:]) {
buf, _ = gg.Next(buf[:0])
result = append(result, rlp.RawValue(common.Copy(buf[length.Hash+length.BlockNum+8:])))
}
}
return result, nil
}
func (r *BlockReader) LastFrozenEventID() uint64 {
view := r.borSn.View()
defer view.Close()
segments := view.Events()
if len(segments) == 0 {
return 0
}
lastSegment := segments[len(segments)-1]
var lastEventID uint64
gg := lastSegment.seg.MakeGetter()
var buf []byte
for gg.HasNext() {
buf, _ = gg.Next(buf[:0])
lastEventID = binary.BigEndian.Uint64(buf[length.Hash+length.BlockNum : length.Hash+length.BlockNum+8])
}
return lastEventID
}

View File

@ -553,6 +553,7 @@ Loop:
s.logger.Warn("invalid segment name", "err", err, "name", fName)
continue
}
var processed bool = true
switch f.T {
case snaptype.Headers:
@ -669,14 +670,18 @@ Loop:
if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil {
return err
}
default:
processed = false
}
if f.To > 0 {
segmentsMax = f.To - 1
} else {
segmentsMax = 0
if processed {
if f.To > 0 {
segmentsMax = f.To - 1
} else {
segmentsMax = 0
}
segmentsMaxSet = true
}
segmentsMaxSet = true
}
if segmentsMaxSet {
s.segmentsMax.Store(segmentsMax)
@ -836,7 +841,7 @@ func (s *RoSnapshots) PrintDebug() {
func buildIdx(ctx context.Context, sn snaptype.FileInfo, chainConfig *chain.Config, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) error {
//_, fName := filepath.Split(sn.Path)
//log.Debug("[snapshots] build idx", "file", fName)
//log.Info("[snapshots] build idx", "file", fName)
switch sn.T {
case snaptype.Headers:
if err := HeadersIdx(ctx, chainConfig, sn.Path, sn.From, tmpDir, p, lvl, logger); err != nil {
@ -851,7 +856,18 @@ func buildIdx(ctx context.Context, sn snaptype.FileInfo, chainConfig *chain.Conf
if err := TransactionsIdx(ctx, chainConfig, sn.From, sn.To, dir, tmpDir, p, lvl, logger); err != nil {
return err
}
case snaptype.BorEvents:
dir, _ := filepath.Split(sn.Path)
if err := BorEventsIdx(ctx, sn.Path, sn.From, sn.To, dir, tmpDir, p, lvl, logger); err != nil {
return err
}
case snaptype.BorSpans:
dir, _ := filepath.Split(sn.Path)
if err := BorSpansIdx(ctx, sn.Path, sn.From, sn.To, dir, tmpDir, p, lvl, logger); err != nil {
return err
}
}
//log.Info("[snapshots] finish build idx", "file", fName)
return nil
}
@ -874,7 +890,58 @@ func BuildMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs
if segment.T != t {
continue
}
if hasIdxFile(&segment, logger) {
if hasIdxFile(segment, logger) {
continue
}
sn := segment
g.Go(func() error {
p := &background.Progress{}
ps.Add(p)
defer ps.Delete(p)
return buildIdx(gCtx, sn, chainConfig, tmpDir, p, log.LvlInfo, logger)
})
}
}
finish := make(chan struct{})
go func() {
defer close(finish)
g.Wait()
}()
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
for {
select {
case <-finish:
return g.Wait()
case <-ctx.Done():
return ctx.Err()
case <-logEvery.C:
var m runtime.MemStats
dbg.ReadMemStats(&m)
logger.Info(fmt.Sprintf("[%s] Indexing", logPrefix), "progress", ps.String(), "total-indexing-time", time.Since(startIndexingTime).Round(time.Second).String(), "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
}
}
}
func BuildBorMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs, chainConfig *chain.Config, workers int, logger log.Logger) error {
dir, tmpDir := dirs.Snap, dirs.Tmp
segments, _, err := BorSegments(dir)
if err != nil {
return err
}
ps := background.NewProgressSet()
startIndexingTime := time.Now()
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(workers)
for _, t := range []snaptype.Type{snaptype.BorEvents, snaptype.BorSpans} {
for _, segment := range segments {
if segment.T != t {
continue
}
if hasIdxFile(segment, logger) {
continue
}
sn := segment
@ -941,6 +1008,23 @@ MainLoop:
return res
}
func borSegmentsMustExist(dir string, in []snaptype.FileInfo) (res []snaptype.FileInfo) {
MainLoop:
for _, f := range in {
if f.From == f.To {
continue
}
for _, t := range []snaptype.Type{snaptype.BorEvents, snaptype.BorSpans} {
p := filepath.Join(dir, snaptype.SegmentFileName(f.From, f.To, t))
if !dir2.FileExist(p) {
continue MainLoop
}
}
res = append(res, f)
}
return res
}
// noOverlaps - keep largest ranges and avoid overlap
func noOverlaps(in []snaptype.FileInfo) (res []snaptype.FileInfo) {
for i := range in {
@ -1040,6 +1124,9 @@ func NewBlockRetire(workers int, dirs datadir.Dirs, blockReader services.FullBlo
return &BlockRetire{workers: workers, tmpDir: dirs.Tmp, dirs: dirs, blockReader: blockReader, blockWriter: blockWriter, db: db, notifier: notifier, logger: logger}
}
func (br *BlockRetire) snapshots() *RoSnapshots { return br.blockReader.Snapshots().(*RoSnapshots) }
func (br *BlockRetire) borSnapshots() *BorRoSnapshots {
return br.blockReader.BorSnapshots().(*BorRoSnapshots)
}
func (br *BlockRetire) HasNewFrozenFiles() bool {
return br.needSaveFilesListInDB.CompareAndSwap(true, false)
}
@ -1127,7 +1214,7 @@ func (br *BlockRetire) RetireBlocks(ctx context.Context, blockFrom, blockTo uint
downloadRequest := make([]services.DownloadRequest, 0, len(rangesToMerge))
for i := range rangesToMerge {
r := &services.Range{From: rangesToMerge[i].from, To: rangesToMerge[i].to}
downloadRequest = append(downloadRequest, services.NewDownloadRequest(r, "", ""))
downloadRequest = append(downloadRequest, services.NewDownloadRequest(r, "", "", false /* Bor */))
}
if seedNewSnapshots != nil {
@ -1137,7 +1224,7 @@ func (br *BlockRetire) RetireBlocks(ctx context.Context, blockFrom, blockTo uint
}
return nil
}
func (br *BlockRetire) PruneAncientBlocks(tx kv.RwTx, limit int) error {
func (br *BlockRetire) PruneAncientBlocks(tx kv.RwTx, limit int, includeBor bool) error {
if br.blockReader.FreezingCfg().KeepBlocks {
return nil
}
@ -1149,10 +1236,16 @@ func (br *BlockRetire) PruneAncientBlocks(tx kv.RwTx, limit int) error {
if err := br.blockWriter.PruneBlocks(context.Background(), tx, canDeleteTo, limit); err != nil {
return nil
}
if includeBor {
canDeleteTo := CanDeleteTo(currentProgress, br.blockReader.FrozenBorBlocks())
if err := br.blockWriter.PruneBorBlocks(context.Background(), tx, canDeleteTo, limit); err != nil {
return nil
}
}
return nil
}
func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProgress uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error) {
func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProgress uint64, includeBor bool, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error) {
ok := br.working.CompareAndSwap(false, true)
if !ok {
// go-routine is still working
@ -1162,13 +1255,19 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProg
defer br.working.Store(false)
blockFrom, blockTo, ok := CanRetire(forwardProgress, br.blockReader.FrozenBlocks())
if !ok {
return
if ok {
if err := br.RetireBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots); err != nil {
br.logger.Warn("[snapshots] retire blocks", "err", err, "fromBlock", blockFrom, "toBlock", blockTo)
}
}
err := br.RetireBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots)
if err != nil {
br.logger.Warn("[snapshots] retire blocks", "err", err, "fromBlock", blockFrom, "toBlock", blockTo)
if includeBor {
blockFrom, blockTo, ok = CanRetire(forwardProgress, br.blockReader.FrozenBorBlocks())
if ok {
if err := br.RetireBorBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots); err != nil {
br.logger.Warn("[bor snapshots] retire blocks", "err", err, "fromBlock", blockFrom, "toBlock", blockTo)
}
}
}
}()
}
@ -1177,31 +1276,64 @@ func (br *BlockRetire) BuildMissedIndicesIfNeed(ctx context.Context, logPrefix s
snapshots.LogStat()
// Create .idx files
if snapshots.IndicesMax() >= snapshots.SegmentsMax() {
return nil
}
if snapshots.IndicesMax() < snapshots.SegmentsMax() {
if !snapshots.Cfg().Produce && snapshots.IndicesMax() == 0 {
return fmt.Errorf("please remove --snap.stop, erigon can't work without creating basic indices")
}
if snapshots.Cfg().Produce {
if !snapshots.SegmentsReady() {
return fmt.Errorf("not all snapshot segments are available")
if !snapshots.Cfg().Produce && snapshots.IndicesMax() == 0 {
return fmt.Errorf("please remove --snap.stop, erigon can't work without creating basic indices")
}
if snapshots.Cfg().Produce {
if !snapshots.SegmentsReady() {
return fmt.Errorf("not all snapshot segments are available")
}
// wait for Downloader service to download all expected snapshots
if snapshots.IndicesMax() < snapshots.SegmentsMax() {
indexWorkers := estimate.IndexSnapshot.Workers()
if err := BuildMissedIndices(logPrefix, ctx, br.dirs, cc, indexWorkers, br.logger); err != nil {
return fmt.Errorf("BuildMissedIndices: %w", err)
// wait for Downloader service to download all expected snapshots
if snapshots.IndicesMax() < snapshots.SegmentsMax() {
indexWorkers := estimate.IndexSnapshot.Workers()
if err := BuildMissedIndices(logPrefix, ctx, br.dirs, cc, indexWorkers, br.logger); err != nil {
return fmt.Errorf("BuildMissedIndices: %w", err)
}
}
if err := snapshots.ReopenFolder(); err != nil {
return err
}
snapshots.LogStat()
if notifier != nil {
notifier.OnNewSnapshot()
}
}
}
if cc.Bor != nil {
borSnapshots := br.borSnapshots()
borSnapshots.LogStat()
if err := snapshots.ReopenFolder(); err != nil {
return err
}
if notifier != nil {
notifier.OnNewSnapshot()
// Create .idx files
if borSnapshots.IndicesMax() < borSnapshots.SegmentsMax() {
if !borSnapshots.Cfg().Produce && borSnapshots.IndicesMax() == 0 {
return fmt.Errorf("please remove --snap.stop, erigon can't work without creating basic indices")
}
if borSnapshots.Cfg().Produce {
if !borSnapshots.SegmentsReady() {
return fmt.Errorf("not all bor snapshot segments are available")
}
// wait for Downloader service to download all expected snapshots
if borSnapshots.IndicesMax() < borSnapshots.SegmentsMax() {
indexWorkers := estimate.IndexSnapshot.Workers()
if err := BuildBorMissedIndices(logPrefix, ctx, br.dirs, cc, indexWorkers, br.logger); err != nil {
return fmt.Errorf("BuildBorMissedIndices: %w", err)
}
}
if err := borSnapshots.ReopenFolder(); err != nil {
return err
}
borSnapshots.LogStat()
if notifier != nil {
notifier.OnNewSnapshot()
}
}
}
}
return nil
@ -1316,7 +1448,7 @@ func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, sna
return nil
}
func hasIdxFile(sn *snaptype.FileInfo, logger log.Logger) bool {
func hasIdxFile(sn snaptype.FileInfo, logger log.Logger) bool {
stat, err := os.Stat(sn.Path)
if err != nil {
return false
@ -1325,18 +1457,7 @@ func hasIdxFile(sn *snaptype.FileInfo, logger log.Logger) bool {
fName := snaptype.IdxFileName(sn.From, sn.To, sn.T.String())
var result = true
switch sn.T {
case snaptype.Headers:
idx, err := recsplit.OpenIndex(path.Join(dir, fName))
if err != nil {
return false
}
// If index was created before the segment file, it needs to be ignored (and rebuilt)
if idx.ModTime().Before(stat.ModTime()) {
logger.Warn("Index file has timestamp before segment file, will be recreated", "segfile", sn.Path, "segtime", stat.ModTime(), "idxfile", fName, "idxtime", idx.ModTime())
result = false
}
idx.Close()
case snaptype.Bodies:
case snaptype.Headers, snaptype.Bodies, snaptype.BorEvents, snaptype.BorSpans:
idx, err := recsplit.OpenIndex(path.Join(dir, fName))
if err != nil {
return false

File diff suppressed because it is too large Load Diff

View File

@ -41,10 +41,18 @@ func BuildProtoRequest(downloadRequest []services.DownloadRequest) *proto_downlo
if r.Ranges.To-r.Ranges.From != snaptype.Erigon2SegmentSize {
continue
}
for _, t := range snaptype.AllSnapshotTypes {
req.Items = append(req.Items, &proto_downloader.DownloadItem{
Path: snaptype.SegmentFileName(r.Ranges.From, r.Ranges.To, t),
})
if r.Bor {
for _, t := range []snaptype.Type{snaptype.BorEvents, snaptype.BorSpans} {
req.Items = append(req.Items, &proto_downloader.DownloadItem{
Path: snaptype.SegmentFileName(r.Ranges.From, r.Ranges.To, t),
})
}
} else {
for _, t := range snaptype.AllSnapshotTypes {
req.Items = append(req.Items, &proto_downloader.DownloadItem{
Path: snaptype.SegmentFileName(r.Ranges.From, r.Ranges.To, t),
})
}
}
}
}
@ -65,10 +73,16 @@ func RequestSnapshotsDownload(ctx context.Context, downloadRequest []services.Do
// for MVP we sync with Downloader only once, in future will send new snapshots also
func WaitForDownloader(logPrefix string, ctx context.Context, histV3 bool, agg *state.AggregatorV3, tx kv.RwTx, blockReader services.FullBlockReader, notifier services.DBEventNotifier, cc *chain.Config, snapshotDownloader proto_downloader.DownloaderClient) error {
snapshots := blockReader.Snapshots()
borSnapshots := blockReader.BorSnapshots()
if blockReader.FreezingCfg().NoDownloader {
if err := snapshots.ReopenFolder(); err != nil {
return err
}
if cc.Bor != nil {
if err := borSnapshots.ReopenFolder(); err != nil {
return err
}
}
if notifier != nil { // can notify right here, even that write txn is not commit
notifier.OnNewSnapshot()
}
@ -85,38 +99,53 @@ func WaitForDownloader(logPrefix string, ctx context.Context, histV3 bool, agg *
return err
}
dbEmpty := len(snInDB) == 0
var existingFilesMap map[string]struct{}
var missingSnapshots []*services.Range
var existingFilesMap, borExistingFilesMap map[string]struct{}
var missingSnapshots, borMissingSnapshots []*services.Range
if !dbEmpty {
existingFilesMap, missingSnapshots, err = snapshots.ScanDir()
if err != nil {
return err
}
if cc.Bor == nil {
borExistingFilesMap = map[string]struct{}{}
} else {
borExistingFilesMap, borMissingSnapshots, err = borSnapshots.ScanDir()
if err != nil {
return err
}
}
}
if len(missingSnapshots) > 0 {
log.Warn(fmt.Sprintf("[%s] downloading missing snapshots", logPrefix))
}
// send all hashes to the Downloader service
preverifiedBlockSnapshots := snapcfg.KnownCfg(cc.ChainName, snInDB, snHistInDB).Preverified
preverifiedBlockSnapshots := snapcfg.KnownCfg(cc.ChainName, []string{} /* whitelist */, snHistInDB).Preverified
downloadRequest := make([]services.DownloadRequest, 0, len(preverifiedBlockSnapshots)+len(missingSnapshots))
// build all download requests
// builds preverified snapshots request
for _, p := range preverifiedBlockSnapshots {
if _, exists := existingFilesMap[p.Name]; !exists { // Not to download existing files "behind the scenes"
downloadRequest = append(downloadRequest, services.NewDownloadRequest(nil, p.Name, p.Hash))
_, exists := existingFilesMap[p.Name]
_, borExists := borExistingFilesMap[p.Name]
if !exists && !borExists { // Not to download existing files "behind the scenes"
downloadRequest = append(downloadRequest, services.NewDownloadRequest(nil, p.Name, p.Hash, false /* Bor */))
}
}
if histV3 {
preverifiedHistorySnapshots := snapcfg.KnownCfg(cc.ChainName, snInDB, snHistInDB).PreverifiedHistory
for _, p := range preverifiedHistorySnapshots {
downloadRequest = append(downloadRequest, services.NewDownloadRequest(nil, p.Name, p.Hash))
downloadRequest = append(downloadRequest, services.NewDownloadRequest(nil, p.Name, p.Hash, false /* Bor */))
}
}
// builds missing snapshots request
for _, r := range missingSnapshots {
downloadRequest = append(downloadRequest, services.NewDownloadRequest(r, "", ""))
downloadRequest = append(downloadRequest, services.NewDownloadRequest(r, "", "", false /* Bor */))
}
if cc.Bor != nil {
for _, r := range borMissingSnapshots {
downloadRequest = append(downloadRequest, services.NewDownloadRequest(r, "", "", true /* Bor */))
}
}
log.Info(fmt.Sprintf("[%s] Fetching torrent files metadata", logPrefix))
@ -155,17 +184,19 @@ Loop:
if stats, err := snapshotDownloader.Stats(ctx, &proto_downloader.StatsRequest{}); err != nil {
log.Warn("Error while waiting for snapshots progress", "err", err)
} else if stats.Completed {
if !blockReader.FreezingCfg().Verify { // will verify after loop
if _, err := snapshotDownloader.Verify(ctx, &proto_downloader.VerifyRequest{}); err != nil {
return err
/*
if !blockReader.FreezingCfg().Verify { // will verify after loop
if _, err := snapshotDownloader.Verify(ctx, &proto_downloader.VerifyRequest{}); err != nil {
return err
}
}
}
*/
log.Info(fmt.Sprintf("[%s] download finished", logPrefix), "time", time.Since(downloadStartTime).String())
break Loop
} else {
if stats.MetadataReady < stats.FilesTotal {
log.Info(fmt.Sprintf("[%s] Waiting for torrents metadata: %d/%d", logPrefix, stats.MetadataReady, stats.FilesTotal))
continue
//continue
}
dbg.ReadMemStats(&m)
downloadTimeLeft := calculateTime(stats.BytesTotal-stats.BytesCompleted, stats.DownloadRate)
@ -192,10 +223,22 @@ Finish:
return err
}
}
stats, err = snapshotDownloader.Stats(ctx, &proto_downloader.StatsRequest{})
if err != nil {
return err
}
if !stats.Completed {
goto Loop
}
if err := snapshots.ReopenFolder(); err != nil {
return err
}
if cc.Bor != nil {
if err := borSnapshots.ReopenFolder(); err != nil {
return err
}
}
if err := agg.OpenFolder(); err != nil {
return err
}

View File

@ -144,7 +144,7 @@ func TestSetupGenesis(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
_, db, _ := temporal.NewTestDB(t, datadir.New(tmpdir), nil)
blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()))
blockReader := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()), freezeblocks.NewBorRoSnapshots(ethconfig.BlocksFreezing{Enabled: false}, "", log.New()))
config, genesis, err := test.fn(db)
// Check the return values.
if !reflect.DeepEqual(err, test.wantErr) {

View File

@ -574,7 +574,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult
if terminalTotalDifficulty != nil {
if td.Cmp(terminalTotalDifficulty) >= 0 {
hd.highestInDb = link.blockHeight
hd.logger.Info(POSPandaBanner)
//hd.logger.Info(POSPandaBanner)
dataflow.HeaderDownloadStates.AddChange(link.blockHeight, dataflow.HeaderInserted)
return true, true, 0, lastTime, nil
}

View File

@ -259,6 +259,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
erigonGrpcServeer := remotedbserver.NewKvServer(ctx, db, nil, nil, logger)
allSnapshots := freezeblocks.NewRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, logger)
allBorSnapshots := freezeblocks.NewBorRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, logger)
mock := &MockSentry{
Ctx: ctx, cancel: ctxCancel, DB: db, agg: agg,
tb: tb,
@ -277,7 +278,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
},
PeerId: gointerfaces.ConvertHashToH512([64]byte{0x12, 0x34, 0x50}), // "12345"
BlockSnapshots: allSnapshots,
BlockReader: freezeblocks.NewBlockReader(allSnapshots),
BlockReader: freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots),
HistoryV3: cfg.HistoryV3,
}
if tb != nil {
@ -395,6 +396,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
stagedsync.DefaultStages(mock.Ctx,
stagedsync.StageSnapshotsCfg(mock.DB, *mock.ChainConfig, dirs, blockRetire, snapshotsDownloader, mock.BlockReader, mock.Notifications.Events, mock.HistoryV3, mock.agg),
stagedsync.StageHeadersCfg(mock.DB, mock.sentriesClient.Hd, mock.sentriesClient.Bd, *mock.ChainConfig, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, mock.BlockReader, blockWriter, dirs.Tmp, mock.Notifications, engine_helpers.NewForkValidatorMock(1)),
stagedsync.StageBorHeimdallCfg(mock.DB, *mock.ChainConfig, nil /* heimdallClient */, mock.BlockReader),
stagedsync.StageBlockHashesCfg(mock.DB, mock.Dirs.Tmp, mock.ChainConfig, blockWriter),
stagedsync.StageBodiesCfg(mock.DB, mock.sentriesClient.Bd, sendBodyRequest, penalize, blockPropagator, cfg.Sync.BodyDownloadTimeoutSeconds, *mock.ChainConfig, mock.BlockReader, cfg.HistoryV3, blockWriter),
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, false, dirs.Tmp, prune, mock.BlockReader, mock.sentriesClient.Hd),

View File

@ -23,6 +23,7 @@ import (
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/cmd/sentry/sentry"
"github.com/ledgerwatch/erigon/consensus/bor"
"github.com/ledgerwatch/erigon/consensus/misc"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
@ -101,7 +102,7 @@ func StageLoopIteration(ctx context.Context, db kv.RwDB, tx kv.RwTx, sync *stage
}() // avoid crash because Erigon's core does many things
externalTx := tx != nil
finishProgressBefore, headersProgressBefore, err := stagesHeadersAndFinish(db, tx)
finishProgressBefore, borProgressBefore, headersProgressBefore, err := stagesHeadersAndFinish(db, tx)
if err != nil {
return err
}
@ -109,6 +110,9 @@ func StageLoopIteration(ctx context.Context, db kv.RwDB, tx kv.RwTx, sync *stage
// In all other cases - process blocks batch in 1 RwTx
// 2 corner-cases: when sync with --snapshots=false and when executed only blocks from snapshots (in this case all stages progress is equal and > 0, but node is not synced)
isSynced := finishProgressBefore > 0 && finishProgressBefore > blockReader.FrozenBlocks() && finishProgressBefore == headersProgressBefore
if blockReader.BorSnapshots() != nil {
isSynced = isSynced && borProgressBefore > blockReader.FrozenBorBlocks()
}
canRunCycleInOneTransaction := isSynced
if externalTx {
canRunCycleInOneTransaction = true
@ -191,15 +195,18 @@ func stageLoopStepPrune(ctx context.Context, db kv.RwDB, tx kv.RwTx, sync *stage
return db.Update(ctx, func(tx kv.RwTx) error { return sync.RunPrune(db, tx, initialCycle) })
}
func stagesHeadersAndFinish(db kv.RoDB, tx kv.Tx) (head, fin uint64, err error) {
func stagesHeadersAndFinish(db kv.RoDB, tx kv.Tx) (head, bor, fin uint64, err error) {
if tx != nil {
if fin, err = stages.GetStageProgress(tx, stages.Finish); err != nil {
return head, fin, err
return head, bor, fin, err
}
if head, err = stages.GetStageProgress(tx, stages.Headers); err != nil {
return head, fin, err
return head, bor, fin, err
}
return head, fin, nil
if bor, err = stages.GetStageProgress(tx, stages.BorHeimdall); err != nil {
return head, bor, fin, err
}
return head, bor, fin, nil
}
if err := db.View(context.Background(), func(tx kv.Tx) error {
if fin, err = stages.GetStageProgress(tx, stages.Finish); err != nil {
@ -208,11 +215,14 @@ func stagesHeadersAndFinish(db kv.RoDB, tx kv.Tx) (head, fin uint64, err error)
if head, err = stages.GetStageProgress(tx, stages.Headers); err != nil {
return err
}
if bor, err = stages.GetStageProgress(tx, stages.BorHeimdall); err != nil {
return err
}
return nil
}); err != nil {
return head, fin, err
return head, bor, fin, err
}
return head, fin, nil
return head, bor, fin, nil
}
type Hook struct {
@ -418,6 +428,7 @@ func NewDefaultStages(ctx context.Context,
blockRetire services.BlockRetire,
agg *state.AggregatorV3,
forkValidator *engine_helpers.ForkValidator,
heimdallClient bor.IHeimdallClient,
logger log.Logger,
) []*stagedsync.Stage {
dirs := cfg.Dirs
@ -430,6 +441,7 @@ func NewDefaultStages(ctx context.Context,
return stagedsync.DefaultStages(ctx,
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, dirs, blockRetire, snapDownloader, blockReader, notifications.Events, cfg.HistoryV3, agg),
stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, p2pCfg.NoDiscovery, blockReader, blockWriter, dirs.Tmp, notifications, forkValidator),
stagedsync.StageBorHeimdallCfg(db, *controlServer.ChainConfig, heimdallClient, blockReader),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, cfg.HistoryV3, blockWriter),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd),

View File

@ -78,7 +78,7 @@ func ComputeTxEnv(ctx context.Context, engine consensus.EngineReader, block *typ
vmenv := vm.NewEVM(blockContext, evmtypes.TxContext{}, statedb, cfg, vm.Config{})
rules := vmenv.ChainRules()
consensusHeaderReader := stagedsync.NewChainReaderImpl(cfg, dbtx, nil)
consensusHeaderReader := stagedsync.NewChainReaderImpl(cfg, dbtx, nil, nil)
core.InitializeBlockExecution(engine.(consensus.Engine), consensusHeaderReader, header, cfg, statedb)