Simplify stagedsync.Prepare (#2317)

This commit is contained in:
Alex Sharov 2021-07-08 20:52:22 +07:00 committed by GitHub
parent 5ceaac53f6
commit e98340d806
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 254 additions and 411 deletions

View File

@ -818,9 +818,10 @@ func newSync(ctx context.Context, db ethdb.RwKV) (ethdb.StorageMode, consensus.E
bodyDownloadTimeoutSeconds,
downloadServer,
tmpdir,
snapshotDir,
ethconfig.Snapshot{Enabled: false},
txPool,
txPoolP2PServer,
nil, nil,
)
if err != nil {
panic(err)
@ -842,7 +843,7 @@ func newSync(ctx context.Context, db ethdb.RwKV) (ethdb.StorageMode, consensus.E
var sync *stagedsync.State
if err := db.View(context.Background(), func(tx ethdb.Tx) (err error) {
sync, err = st.Prepare(vmConfig, nil, tx, sm, ctx.Done(), false, nil, nil)
sync, err = st.Prepare(nil, tx, ctx.Done(), false, nil, nil)
if err != nil {
return nil
}

View File

@ -11,7 +11,6 @@ import (
"time"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon/ethdb/kv"
"github.com/spf13/cobra"
"github.com/ledgerwatch/erigon/cmd/utils"
@ -311,7 +310,7 @@ func syncBySmallSteps(db ethdb.RwKV, miningConfig params.MiningConfig, ctx conte
miningConfig.Etherbase = nextBlock.Header().Coinbase
miningConfig.ExtraData = nextBlock.Header().Extra
miningStages, err := mining.Prepare(vmConfig, kv.NewObjectDatabase(db), tx, sm, quit, false, miningWorld, nil)
miningStages, err := mining.Prepare(db, tx, quit, false, miningWorld, nil)
if err != nil {
panic(err)
}

View File

@ -59,7 +59,7 @@ func TestSendRawTransaction(t *testing.T) {
notifier := &remotedbserver.Events{}
initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64()
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.ChainConfig, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}

View File

@ -55,7 +55,7 @@ func TestTxPoolContent(t *testing.T) {
notifier := &remotedbserver.Events{}
initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64()
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.ChainConfig, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}

View File

@ -73,7 +73,6 @@ func TestMatreshkaStream(t *testing.T) {
Open()
_ = chaindata
defer snkv.Close()
db := kv2.NewObjectDatabase(snkv)
tx, err := snkv.BeginRw(context.Background())
if err != nil {
@ -109,11 +108,11 @@ func TestMatreshkaStream(t *testing.T) {
limit := currentBlock.Number.Uint64()
getHeader := func(hash common.Hash, number uint64) *types.Header { return rawdb.ReadHeader(tx, hash, number) }
stateReaderWriter := NewDebugReaderWriter(state.NewPlainStateReader(tx), state.NewPlainStateWriter(db, tx, blockNum))
stateReaderWriter := NewDebugReaderWriter(state.NewPlainStateReader(tx), state.NewPlainStateWriter(tx, tx, blockNum))
tt := time.Now()
ttt := time.Now()
for currentBlock := blockNum; currentBlock < blockNum+limit; currentBlock++ {
stateReaderWriter.UpdateWriter(state.NewPlainStateWriter(db, tx, currentBlock))
stateReaderWriter.UpdateWriter(state.NewPlainStateWriter(tx, tx, currentBlock))
block, err := rawdb.ReadBlockByNumber(tx, currentBlock)
if err != nil {
t.Fatal(err, currentBlock)

View File

@ -102,14 +102,14 @@ type Ethereum struct {
pendingBlocks chan *types.Block
minedBlocks chan *types.Block
// downloader v2 fields
downloadV2Ctx context.Context
downloadV2Cancel context.CancelFunc
// downloader fields
downloadCtx context.Context
downloadCancel context.CancelFunc
downloadServer *download.ControlServerImpl
sentryServers []*download.SentryServerImpl
txPoolP2PServer *txpool.P2PServer
sentries []remote.SentryClient
stagedSync2 *stagedsync.StagedSync
stagedSync *stagedsync.StagedSync
waitForStageLoopStop chan struct{}
waitForMiningStop chan struct{}
}
@ -134,8 +134,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}
var torrentClient *snapshotsync.Client
snapshotsDir := stack.Config().ResolvePath("snapshots")
if config.SnapshotLayout {
config.Snapshot.Dir = stack.Config().ResolvePath("snapshots")
if config.Snapshot.Enabled {
var peerID string
if err = chainKv.View(context.Background(), func(tx ethdb.Tx) error {
v, err := tx.GetOne(dbutils.BittorrentInfoBucket, []byte(dbutils.BittorrentPeerID))
@ -147,7 +147,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}); err != nil {
log.Error("Get bittorrent peer", "err", err)
}
torrentClient, err = snapshotsync.New(snapshotsDir, config.SnapshotSeeding, peerID)
torrentClient, err = snapshotsync.New(config.Snapshot.Dir, config.Snapshot.Seeding, peerID)
if err != nil {
return nil, err
}
@ -160,11 +160,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}
}
chainKv, err = snapshotsync.WrapSnapshots(chainKv, snapshotsDir)
chainKv, err = snapshotsync.WrapSnapshots(chainKv, config.Snapshot.Dir)
if err != nil {
return nil, err
}
err = snapshotsync.SnapshotSeeding(chainKv, torrentClient, "headers", snapshotsDir)
err = snapshotsync.SnapshotSeeding(chainKv, torrentClient, "headers", config.Snapshot.Dir)
if err != nil {
return nil, err
}
@ -242,12 +242,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// setting notifier to support streaming events to rpc daemon
backend.events = remotedbserver.NewEvents()
var mg *snapshotsync.SnapshotMigrator
if config.SnapshotLayout {
if config.Snapshot.Enabled {
currentSnapshotBlock, currentInfohash, err := snapshotsync.GetSnapshotInfo(chainKv)
if err != nil {
return nil, err
}
mg = snapshotsync.NewMigrator(snapshotsDir, currentSnapshotBlock, currentInfohash)
mg = snapshotsync.NewMigrator(config.Snapshot.Dir, currentSnapshotBlock, currentInfohash)
err = mg.RemoveNonCurrentSnapshots()
if err != nil {
log.Error("Remove non current snapshot", "err", err)
@ -338,10 +338,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}
}
backend.downloadV2Ctx, backend.downloadV2Cancel = context.WithCancel(context.Background())
backend.downloadCtx, backend.downloadCancel = context.WithCancel(context.Background())
if len(stack.Config().P2P.SentryAddr) > 0 {
for _, addr := range stack.Config().P2P.SentryAddr {
sentry, err := download.GrpcSentryClient(backend.downloadV2Ctx, addr)
sentry, err := download.GrpcSentryClient(backend.downloadCtx, addr)
if err != nil {
return nil, err
}
@ -365,7 +365,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
cfg66 := stack.Config().P2P
cfg66.NodeDatabase = path.Join(stack.Config().DataDir, "nodes", "eth66")
server66 := download.NewSentryServer(backend.downloadV2Ctx, d66, readNodeInfo, &cfg66, eth.ETH66)
server66 := download.NewSentryServer(backend.downloadCtx, d66, readNodeInfo, &cfg66, eth.ETH66)
backend.sentryServers = append(backend.sentryServers, server66)
backend.sentries = []remote.SentryClient{remote.NewSentryClientDirect(eth.ETH66, server66)}
cfg65 := stack.Config().P2P
@ -375,7 +375,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err
}
cfg65.ListenAddr = cfg65.ListenAddr65
server65 := download.NewSentryServer(backend.downloadV2Ctx, d65, readNodeInfo, &cfg65, eth.ETH65)
server65 := download.NewSentryServer(backend.downloadCtx, d65, readNodeInfo, &cfg65, eth.ETH65)
backend.sentryServers = append(backend.sentryServers, server65)
backend.sentries = append(backend.sentries, remote.NewSentryClientDirect(eth.ETH65, server65))
go func() {
@ -386,7 +386,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
for {
select {
case <-backend.downloadV2Ctx.Done():
case <-backend.downloadCtx.Done():
return
case <-logEvery.C:
logItems = logItems[:0]
@ -402,7 +402,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
backend.txPoolP2PServer, err = txpool.NewP2PServer(backend.downloadV2Ctx, backend.sentries, backend.txPool)
backend.txPoolP2PServer, err = txpool.NewP2PServer(backend.downloadCtx, backend.sentries, backend.txPool)
if err != nil {
return nil, err
}
@ -415,28 +415,25 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
backend.txPoolP2PServer.TxFetcher = fetcher.NewTxFetcher(backend.txPool.Has, backend.txPool.AddRemotes, fetchTx)
bodyDownloadTimeoutSeconds := 30 // TODO: convert to duration, make configurable
backend.stagedSync2, err = stages2.NewStagedSync2(
backend.downloadV2Ctx,
backend.stagedSync, err = stages2.NewStagedSync2(
backend.downloadCtx,
backend.chainKV,
config.StorageMode,
config.BatchSize,
bodyDownloadTimeoutSeconds,
backend.downloadServer,
tmpdir,
snapshotsDir,
config.Snapshot,
backend.txPool,
backend.txPoolP2PServer,
torrentClient, mg,
)
if err != nil {
return nil, err
}
if config.SnapshotLayout {
backend.stagedSync2.SetTorrentParams(torrentClient, snapshotsDir, mg)
log.Info("Set torrent params", "snapshotsDir", snapshotsDir)
}
go txpropagate.BroadcastNewTxsToNetworks(backend.downloadV2Ctx, backend.txPool, backend.downloadServer)
go txpropagate.BroadcastNewTxsToNetworks(backend.downloadCtx, backend.txPool, backend.downloadServer)
go func() {
defer debug.LogPanic()
@ -651,14 +648,14 @@ func (s *Ethereum) Protocols() []p2p.Protocol {
func (s *Ethereum) Start() error {
for i := range s.sentries {
go func(i int) {
download.RecvMessageLoop(s.downloadV2Ctx, s.sentries[i], s.downloadServer, nil)
download.RecvMessageLoop(s.downloadCtx, s.sentries[i], s.downloadServer, nil)
}(i)
go func(i int) {
download.RecvUploadMessageLoop(s.downloadV2Ctx, s.sentries[i], s.downloadServer, nil)
download.RecvUploadMessageLoop(s.downloadCtx, s.sentries[i], s.downloadServer, nil)
}(i)
}
go Loop(s.downloadV2Ctx, s.chainKV, s.stagedSync2, s.downloadServer, s.events, s.config.StateStream, s.waitForStageLoopStop)
go Loop(s.downloadCtx, s.chainKV, s.stagedSync, s.downloadServer, s.events, s.config.StateStream, s.waitForStageLoopStop)
return nil
}
@ -666,7 +663,7 @@ func (s *Ethereum) Start() error {
// Ethereum protocol.
func (s *Ethereum) Stop() error {
// Stop all the peer-related stuff first.
s.downloadV2Cancel()
s.downloadCancel()
s.txPoolP2PServer.TxFetcher.Stop()
s.txPool.Stop()
if s.quitMining != nil {

View File

@ -109,6 +109,13 @@ func init() {
//go:generate gencodec -type Config -formats toml -out gen_config.go
type Snapshot struct {
Enabled bool
Mode snapshotsync.SnapshotMode
Dir string
Seeding bool
}
// Config contains configuration options for ETH protocol.
type Config struct {
// The genesis block, which is inserted if the database is empty.
@ -124,11 +131,10 @@ type Config struct {
P2PEnabled bool
StorageMode ethdb.StorageMode
BatchSize datasize.ByteSize // Batch size for execution stage
SnapshotMode snapshotsync.SnapshotMode
SnapshotSeeding bool
SnapshotLayout bool
StorageMode ethdb.StorageMode
BatchSize datasize.ByteSize // Batch size for execution stage
Snapshot Snapshot
BlockDownloaderWindow int

View File

@ -16,150 +16,6 @@
package filters
import (
"context"
"math/rand"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/bloombits"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/event"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rpc"
)
type testBackend struct {
db ethdb.Database
sections uint64
txFeed event.Feed
logsFeed event.Feed
rmLogsFeed event.Feed
pendingLogsFeed event.Feed
chainFeed event.Feed
}
func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) {
var (
hash common.Hash
num uint64
)
if blockNr == rpc.LatestBlockNumber {
hash = rawdb.ReadHeadBlockHash(b.db)
number := rawdb.ReadHeaderNumber(b.db, hash)
if number == nil {
return nil, nil
}
num = *number
} else {
num = uint64(blockNr)
var err error
hash, err = rawdb.ReadCanonicalHash(b.db, num)
if err != nil {
return nil, err
}
}
return rawdb.ReadHeader(b.db, hash, num), nil
}
func (b *testBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
number := rawdb.ReadHeaderNumber(b.db, hash)
if number == nil {
return nil, nil
}
return rawdb.ReadHeader(b.db, hash, *number), nil
}
func (b *testBackend) GetReceipts(ctx context.Context, hash common.Hash) (receipts types.Receipts, err error) {
if err := b.db.RwKV().View(ctx, func(tx ethdb.Tx) error {
b, senders, err := rawdb.ReadBlockByHashWithSenders(tx, hash)
if err != nil {
return err
}
receipts = rawdb.ReadReceipts(tx, b, senders)
return nil
}); err != nil {
return nil, err
}
return nil, nil
}
func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
var logs [][]*types.Log
if err := b.db.RwKV().View(ctx, func(tx ethdb.Tx) error {
b, senders, err := rawdb.ReadBlockByHashWithSenders(tx, hash)
if err != nil {
return err
}
receipts := rawdb.ReadReceipts(tx, b, senders)
logs = make([][]*types.Log, len(receipts))
for i, receipt := range receipts {
logs[i] = receipt.Logs
}
return nil
}); err != nil {
return nil, err
}
return logs, nil
}
func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.txFeed.Subscribe(ch)
}
func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return b.rmLogsFeed.Subscribe(ch)
}
func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.logsFeed.Subscribe(ch)
}
func (b *testBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.pendingLogsFeed.Subscribe(ch)
}
func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return b.chainFeed.Subscribe(ch)
}
func (b *testBackend) BloomStatus() (uint64, uint64) {
return params.BloomBitsBlocks, b.sections
}
func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
requests := make(chan chan *bloombits.Retrieval)
go session.Multiplex(16, 0, requests)
go func() {
for {
// Wait for a service request or a shutdown
select {
case <-ctx.Done():
return
case request := <-requests:
task := <-request
task.Bitsets = make([][]byte, len(task.Sections))
for i, section := range task.Sections {
if rand.Int()%4 != 0 { // Handle occasional missing deliveries
head, err := rawdb.ReadCanonicalHash(b.db, (section+1)*params.BloomBitsBlocks-1)
if err != nil {
task.Error = err
continue
}
task.Bitsets[i], _ = rawdb.ReadBloomBits(b.db, task.Bit, section, head)
}
}
request <- task
}
}
}()
}
/*
var (
deadline = 5 * time.Minute

View File

@ -15,90 +15,3 @@
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package filters
import (
"context"
"math/big"
"testing"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus/ethash"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/ethdb/kv"
"github.com/ledgerwatch/erigon/params"
)
func makeReceipt(addr common.Address) *types.Receipt {
receipt := types.NewReceipt(false, 0)
receipt.Logs = []*types.Log{
{Address: addr},
}
receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
return receipt
}
func BenchmarkFilters(b *testing.B) {
db := kv.NewTestKV(b)
defer db.Close()
var (
backend = &testBackend{db: kv.NewObjectDatabase(db)}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))
addr3 = common.BytesToAddress([]byte("ethereum"))
addr4 = common.BytesToAddress([]byte("random addresses please"))
)
genesis := core.GenesisBlockForTesting(db, addr1, big.NewInt(1000000))
chain, err := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 100010, func(i int, gen *core.BlockGen) {
switch i {
case 2403:
receipt := makeReceipt(addr1)
gen.AddUncheckedReceipt(receipt)
case 1034:
receipt := makeReceipt(addr2)
gen.AddUncheckedReceipt(receipt)
case 34:
receipt := makeReceipt(addr3)
gen.AddUncheckedReceipt(receipt)
case 99999:
receipt := makeReceipt(addr4)
gen.AddUncheckedReceipt(receipt)
}
}, false /* intermediateHashes */)
if err != nil {
b.Fatalf("generate chain: %v", err)
}
if err := db.Update(context.Background(), func(tx ethdb.RwTx) error {
for i, block := range chain.Blocks {
if err := rawdb.WriteBlock(tx, block); err != nil {
panic(err)
}
if err := rawdb.WriteCanonicalHash(tx, block.Hash(), block.NumberU64()); err != nil {
panic(err)
}
rawdb.WriteHeadBlockHash(tx, block.Hash())
if err := rawdb.WriteReceipts(tx, block.NumberU64(), chain.Receipts[i]); err != nil {
panic(err)
}
}
return nil
}); err != nil {
b.Fatal(err)
}
b.ResetTimer()
filter := NewRangeFilter(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
for i := 0; i < b.N; i++ {
logs, _ := filter.Logs(context.Background())
if len(logs) != 4 {
b.Fatal("expected 4 logs, got", len(logs))
}
}
}

View File

@ -7,15 +7,17 @@ import (
"github.com/ledgerwatch/erigon/ethdb"
)
func ReplacementStages(ctx context.Context,
func DefaultStages(ctx context.Context,
sm ethdb.StorageMode,
headers HeadersCfg,
blockHashCfg BlockHashesCfg,
headersSnapshotGenCfg HeadersSnapshotGenCfg,
snapshotHeaders SnapshotHeadersCfg,
bodies BodiesCfg,
snapshotBodies SnapshotBodiesCfg,
senders SendersCfg,
exec ExecuteBlockCfg,
trans TranspileCfg,
snapshotState SnapshotStateCfg,
hashState HashStateCfg,
trieCfg TrieCfg,
history HistoryCfg,
@ -66,29 +68,10 @@ func ReplacementStages(ctx context.Context,
Disabled: world.snapshotsDir == "",
DisabledDescription: "Enable by --snapshot.layout",
ExecFunc: func(s *StageState, u Unwinder, tx ethdb.RwTx) error {
return SpawnHeadersSnapshotGenerationStage(s, tx, headersSnapshotGenCfg, world.InitialCycle, world.SnapshotBuilder, world.btClient, world.QuitCh)
return SpawnHeadersSnapshotGenerationStage(s, tx, snapshotHeaders, world.InitialCycle, world.QuitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState, tx ethdb.RwTx) error {
useExternalTx := tx != nil
if !useExternalTx {
var err error
tx, err = headersSnapshotGenCfg.db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
}
err := u.Done(tx)
if err != nil {
return err
}
if !useExternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
return nil
return UnwindHeadersSnapshotGenerationStage(u, s, tx, snapshotHeaders, world.QuitCh)
},
}
},
@ -117,29 +100,10 @@ func ReplacementStages(ctx context.Context,
Disabled: world.snapshotsDir == "",
DisabledDescription: "Enable by --snapshot.layout",
ExecFunc: func(s *StageState, u Unwinder, tx ethdb.RwTx) error {
return SpawnBodiesSnapshotGenerationStage(s, world.DB.RwKV(), tx, world.snapshotsDir, world.btClient, world.QuitCh)
return SpawnBodiesSnapshotGenerationStage(s, tx, snapshotBodies, world.QuitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState, tx ethdb.RwTx) error {
useExternalTx := tx != nil
if !useExternalTx {
var err error
tx, err = world.DB.RwKV().BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
}
err := u.Done(tx)
if err != nil {
return err
}
if !useExternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
return nil
return UnwindBodiesSnapshotGenerationStage(u, s, tx, snapshotBodies, world.QuitCh)
},
}
},
@ -200,10 +164,10 @@ func ReplacementStages(ctx context.Context,
Disabled: world.snapshotsDir == "",
DisabledDescription: "Enable by --snapshot.layout",
ExecFunc: func(s *StageState, u Unwinder, tx ethdb.RwTx) error {
return SpawnStateSnapshotGenerationStage(s, world.DB.RwKV(), tx, world.snapshotsDir, world.btClient, world.QuitCh)
return SpawnStateSnapshotGenerationStage(s, tx, snapshotState, world.QuitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState, tx ethdb.RwTx) error {
return u.Done(world.DB)
return UnwindStateSnapshotGenerationStage(u, s, tx, snapshotState, world.QuitCh)
},
}
},
@ -346,7 +310,7 @@ func ReplacementStages(ctx context.Context,
ID: stages.Finish,
Description: "Final: update current block for the RPC API",
ExecFunc: func(s *StageState, _ Unwinder, tx ethdb.RwTx) error {
return FinishForward(s, tx, finish, world.btClient, world.SnapshotBuilder)
return FinishForward(s, tx, finish)
},
UnwindFunc: func(u *UnwindState, s *StageState, tx ethdb.RwTx) error {
return UnwindFinish(u, s, tx, finish)

View File

@ -1,11 +1,54 @@
package stagedsync
import (
"context"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
)
func SpawnBodiesSnapshotGenerationStage(s *StageState, db ethdb.RwKV, tx ethdb.RwTx, snapshotDir string, torrentClient *snapshotsync.Client, quit <-chan struct{}) error {
type SnapshotBodiesCfg struct {
db ethdb.RwKV
snapshotDir string
tmpDir string
client *snapshotsync.Client
snapshotMigrator *snapshotsync.SnapshotMigrator
}
func StageSnapshotBodiesCfg(db ethdb.RwKV, snapshot ethconfig.Snapshot, client *snapshotsync.Client, snapshotMigrator *snapshotsync.SnapshotMigrator, tmpDir string) SnapshotBodiesCfg {
return SnapshotBodiesCfg{
db: db,
snapshotDir: snapshot.Dir,
client: client,
snapshotMigrator: snapshotMigrator,
tmpDir: tmpDir,
}
}
func SpawnBodiesSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg SnapshotBodiesCfg, quit <-chan struct{}) error {
s.Done()
return nil
}
func UnwindBodiesSnapshotGenerationStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg SnapshotBodiesCfg, quit <-chan struct{}) error {
useExternalTx := tx != nil
if !useExternalTx {
var err error
tx, err = cfg.db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
}
if err := u.Done(tx); err != nil {
return err
}
if !useExternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}

View File

@ -13,18 +13,22 @@ import (
)
type FinishCfg struct {
db ethdb.RwKV
tmpDir string
db ethdb.RwKV
tmpDir string
btClient *snapshotsync.Client
snBuilder *snapshotsync.SnapshotMigrator
}
func StageFinishCfg(db ethdb.RwKV, tmpDir string) FinishCfg {
func StageFinishCfg(db ethdb.RwKV, tmpDir string, btClient *snapshotsync.Client, snBuilder *snapshotsync.SnapshotMigrator) FinishCfg {
return FinishCfg{
db: db,
tmpDir: tmpDir,
db: db,
tmpDir: tmpDir,
btClient: btClient,
snBuilder: snBuilder,
}
}
func FinishForward(s *StageState, tx ethdb.RwTx, cfg FinishCfg, btClient *snapshotsync.Client, snBuilder *snapshotsync.SnapshotMigrator) error {
func FinishForward(s *StageState, tx ethdb.RwTx, cfg FinishCfg) error {
useExternalTx := tx != nil
if !useExternalTx {
var err error
@ -45,14 +49,14 @@ func FinishForward(s *StageState, tx ethdb.RwTx, cfg FinishCfg, btClient *snapsh
return nil
}
if snBuilder != nil && useExternalTx {
if cfg.snBuilder != nil && useExternalTx {
snBlock := snapshotsync.CalculateEpoch(executionAt, snapshotsync.EpochSize)
err = snBuilder.AsyncStages(snBlock, cfg.db, tx, btClient, true)
err = cfg.snBuilder.AsyncStages(snBlock, cfg.db, tx, cfg.btClient, true)
if err != nil {
return err
}
if snBuilder.Replaced() {
err = snBuilder.SyncStages(snBlock, cfg.db, tx)
if cfg.snBuilder.Replaced() {
err = cfg.snBuilder.SyncStages(snBlock, cfg.db, tx)
if err != nil {
return err
}

View File

@ -6,25 +6,30 @@ import (
"sync/atomic"
"time"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/log"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
)
type HeadersSnapshotGenCfg struct {
db ethdb.RwKV
snapshotDir string
type SnapshotHeadersCfg struct {
db ethdb.RwKV
snapshotDir string
client *snapshotsync.Client
snapshotMigrator *snapshotsync.SnapshotMigrator
}
func StageHeadersSnapshotGenCfg(db ethdb.RwKV, snapshotDir string) HeadersSnapshotGenCfg {
return HeadersSnapshotGenCfg{
db: db,
snapshotDir: snapshotDir,
func StageSnapshotHeadersCfg(db ethdb.RwKV, snapshot ethconfig.Snapshot, client *snapshotsync.Client, snapshotMigrator *snapshotsync.SnapshotMigrator) SnapshotHeadersCfg {
return SnapshotHeadersCfg{
db: db,
snapshotDir: snapshot.Dir,
client: client,
snapshotMigrator: snapshotMigrator,
}
}
func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg HeadersSnapshotGenCfg, initial bool, sm *snapshotsync.SnapshotMigrator, torrentClient *snapshotsync.Client, quit <-chan struct{}) error {
func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg SnapshotHeadersCfg, initial bool, quit <-chan struct{}) error {
//generate snapshot only on initial mode
if !initial {
s.Done()
@ -62,13 +67,13 @@ func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg Heade
return nil
}
err = sm.AsyncStages(snapshotBlock, cfg.db, readTX, torrentClient, false)
err = cfg.snapshotMigrator.AsyncStages(snapshotBlock, cfg.db, readTX, cfg.client, false)
if err != nil {
return err
}
readTX.Rollback()
for !sm.Replaced() {
for !cfg.snapshotMigrator.Replaced() {
time.Sleep(time.Minute)
log.Info("Wait old snapshot to close")
}
@ -79,7 +84,7 @@ func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg Heade
}
defer writeTX.Rollback()
err = sm.SyncStages(snapshotBlock, cfg.db, writeTX)
err = cfg.snapshotMigrator.SyncStages(snapshotBlock, cfg.db, writeTX)
if err != nil {
return err
}
@ -99,9 +104,9 @@ func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg Heade
return false, err
}
defer readTX.Rollback()
err = sm.Final(readTX)
err = cfg.snapshotMigrator.Final(readTX)
return atomic.LoadUint64(&sm.HeadersCurrentSnapshot) == snapshotBlock, err
return atomic.LoadUint64(&cfg.snapshotMigrator.HeadersCurrentSnapshot) == snapshotBlock, err
}
for {
@ -116,3 +121,25 @@ func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg Heade
}
return nil
}
func UnwindHeadersSnapshotGenerationStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg SnapshotHeadersCfg, quit <-chan struct{}) error {
useExternalTx := tx != nil
if !useExternalTx {
var err error
tx, err = cfg.db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
}
if err := u.Done(tx); err != nil {
return err
}
if !useExternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}

View File

@ -3,15 +3,34 @@ package stagedsync
import (
"context"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
)
func SpawnStateSnapshotGenerationStage(s *StageState, db ethdb.RwKV, tx ethdb.RwTx, snapshotDir string, torrentClient *snapshotsync.Client, quit <-chan struct{}) error {
type SnapshotStateCfg struct {
db ethdb.RwKV
snapshotDir string
tmpDir string
client *snapshotsync.Client
snapshotMigrator *snapshotsync.SnapshotMigrator
}
func StageSnapshotStateCfg(db ethdb.RwKV, snapshot ethconfig.Snapshot, tmpDir string, client *snapshotsync.Client, snapshotMigrator *snapshotsync.SnapshotMigrator) SnapshotStateCfg {
return SnapshotStateCfg{
db: db,
snapshotDir: snapshot.Dir,
client: client,
snapshotMigrator: snapshotMigrator,
tmpDir: tmpDir,
}
}
func SpawnStateSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg SnapshotStateCfg, quit <-chan struct{}) error {
useExternalTx := tx != nil
if !useExternalTx {
var err error
tx, err = db.BeginRw(context.Background())
tx, err = cfg.db.BeginRw(context.Background())
if err != nil {
return err
}
@ -29,5 +48,26 @@ func SpawnStateSnapshotGenerationStage(s *StageState, db ethdb.RwKV, tx ethdb.Rw
}
}
return nil
}
func UnwindStateSnapshotGenerationStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg SnapshotStateCfg, quit <-chan struct{}) error {
useExternalTx := tx != nil
if !useExternalTx {
var err error
tx, err = cfg.db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
}
if err := u.Done(tx); err != nil {
return err
}
if !useExternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}

View File

@ -8,7 +8,6 @@ import (
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
)
type ChainEventNotifier interface {
@ -19,17 +18,14 @@ type ChainEventNotifier interface {
// StageParameters contains the stage that stages receives at runtime when initializes.
// Then the stage can use it to receive different useful functions.
type StageParameters struct {
DB ethdb.Database
// QuitCh is a channel that is closed. This channel is useful to listen to when
// the stage can take significant time and gracefully shutdown at Ctrl+C.
QuitCh <-chan struct{}
InitialCycle bool
mining *MiningCfg
snapshotsDir string
btClient *snapshotsync.Client
SnapshotBuilder *snapshotsync.SnapshotMigrator
Accumulator *shards.Accumulator // State change accumulator
snapshotsDir string
Accumulator *shards.Accumulator // State change accumulator
}
type MiningCfg struct {

View File

@ -1,7 +1,8 @@
package stagedsync
import (
"github.com/ledgerwatch/erigon/core/vm"
"context"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
@ -42,31 +43,20 @@ func New(stages StageBuilders, unwindOrder UnwindOrder, params OptionalParameter
}
func (stagedSync *StagedSync) Prepare(
vmConfig *vm.Config,
db ethdb.Database,
db ethdb.RwKV,
tx ethdb.Tx,
storageMode ethdb.StorageMode,
quitCh <-chan struct{},
initialCycle bool,
miningConfig *MiningCfg,
accumulator *shards.Accumulator,
) (*State, error) {
if vmConfig == nil {
vmConfig = &vm.Config{}
}
vmConfig.EnableTEMV = storageMode.TEVM
stages := stagedSync.stageBuilders.Build(
StageParameters{
DB: db,
QuitCh: quitCh,
InitialCycle: initialCycle,
mining: miningConfig,
snapshotsDir: stagedSync.params.SnapshotDir,
btClient: stagedSync.params.TorrentClient,
SnapshotBuilder: stagedSync.params.SnapshotMigrator,
Accumulator: accumulator,
QuitCh: quitCh,
InitialCycle: initialCycle,
mining: miningConfig,
snapshotsDir: stagedSync.params.SnapshotDir,
Accumulator: accumulator,
},
)
state := NewState(stages)
@ -82,18 +72,15 @@ func (stagedSync *StagedSync) Prepare(
return nil, err
}
} else {
if err := state.LoadUnwindInfo(db); err != nil {
if err := db.View(context.Background(), func(tx ethdb.Tx) error {
return state.LoadUnwindInfo(tx)
}); err != nil {
return nil, err
}
}
return state, nil
}
func (stagedSync *StagedSync) SetTorrentParams(client *snapshotsync.Client, snapshotsDir string, snapshotMigrator *snapshotsync.SnapshotMigrator) {
stagedSync.params.TorrentClient = client
stagedSync.params.SnapshotDir = snapshotsDir
stagedSync.params.SnapshotMigrator = snapshotMigrator
}
func (stagedSync *StagedSync) GetSnapshotMigratorFinal() func(tx ethdb.Tx) error {
if stagedSync.params.SnapshotMigrator != nil {
return stagedSync.params.SnapshotMigrator.Final

View File

@ -126,9 +126,9 @@ func ApplyFlagsForEthConfig(ctx *cli.Context, cfg *ethconfig.Config) {
if err != nil {
utils.Fatalf(fmt.Sprintf("error while parsing mode: %v", err))
}
cfg.SnapshotMode = snMode
cfg.SnapshotSeeding = ctx.GlobalBool(SeedSnapshotsFlag.Name)
cfg.SnapshotLayout = ctx.GlobalBool(SnapshotDatabaseLayoutFlag.Name)
cfg.Snapshot.Mode = snMode
cfg.Snapshot.Seeding = ctx.GlobalBool(SeedSnapshotsFlag.Name)
cfg.Snapshot.Enabled = ctx.GlobalBool(SnapshotDatabaseLayoutFlag.Name)
if ctx.GlobalString(BatchSizeFlag.Name) != "" {
err := cfg.BatchSize.UnmarshalText([]byte(ctx.GlobalString(BatchSizeFlag.Name)))
@ -164,10 +164,10 @@ func ApplyFlagsForEthConfigCobra(f *pflag.FlagSet, cfg *ethconfig.Config) {
if err != nil {
utils.Fatalf(fmt.Sprintf("error while parsing mode: %v", err))
}
cfg.SnapshotMode = snMode
cfg.Snapshot.Mode = snMode
}
if v := f.Bool(SeedSnapshotsFlag.Name, false, SeedSnapshotsFlag.Usage); v != nil {
cfg.SnapshotSeeding = *v
cfg.Snapshot.Seeding = *v
}
if v := f.String(BatchSizeFlag.Name, BatchSizeFlag.Value, BatchSizeFlag.Usage); v != nil {
err := cfg.BatchSize.UnmarshalText([]byte(*v))

View File

@ -224,7 +224,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
batchSize,
),
stagedsync.StageBlockHashesCfg(mock.DB, mock.tmpdir),
stagedsync.StageHeadersSnapshotGenCfg(mock.DB, mock.tmpdir),
stagedsync.StageSnapshotHeadersCfg(mock.DB, ethconfig.Snapshot{Enabled: false}, nil, nil),
stagedsync.StageBodiesCfg(
mock.DB,
mock.downloader.Bd,
@ -235,6 +235,12 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
*mock.ChainConfig,
batchSize,
),
stagedsync.StageSnapshotBodiesCfg(
mock.DB,
ethconfig.Snapshot{Enabled: false},
nil, nil,
"",
),
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir),
stagedsync.StageExecuteBlocksCfg(
mock.DB,
@ -256,6 +262,12 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
nil,
mock.ChainConfig,
),
stagedsync.StageSnapshotStateCfg(
mock.DB,
ethconfig.Snapshot{Enabled: false},
"",
nil, nil,
),
stagedsync.StageHashStateCfg(mock.DB, mock.tmpdir),
stagedsync.StageTrieCfg(mock.DB, true, true, mock.tmpdir),
stagedsync.StageHistoryCfg(mock.DB, mock.tmpdir),
@ -269,7 +281,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
mock.StreamWg.Wait()
mock.TxPoolP2PServer.TxFetcher.Start()
}),
stagedsync.StageFinishCfg(mock.DB, mock.tmpdir),
stagedsync.StageFinishCfg(mock.DB, mock.tmpdir, nil, nil),
true, /* test */
)
@ -384,7 +396,7 @@ func (ms *MockSentry) InsertChain(chain *core.ChainPack) error {
notifier := &remotedbserver.Events{}
initialCycle := false
highestSeenHeader := uint64(chain.TopBlock.NumberU64())
if err := StageLoopStep(ms.Ctx, ms.DB, ms.Sync, highestSeenHeader, ms.ChainConfig, notifier, initialCycle, nil, ms.UpdateHead, nil); err != nil {
if err := StageLoopStep(ms.Ctx, ms.DB, ms.Sync, highestSeenHeader, notifier, initialCycle, nil, ms.UpdateHead, nil); err != nil {
return err
}
// Check if the latest header was imported or rolled back

View File

@ -60,7 +60,7 @@ func TestHeaderStep(t *testing.T) {
notifier := &remotedbserver.Events{}
initialCycle := true
highestSeenHeader := uint64(chain.TopBlock.NumberU64())
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.ChainConfig, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}
@ -101,7 +101,7 @@ func TestMineBlockWith1Tx(t *testing.T) {
notifier := &remotedbserver.Events{}
initialCycle := true
highestSeenHeader := uint64(chain.TopBlock.NumberU64())
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.ChainConfig, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}
@ -173,7 +173,7 @@ func TestReorg(t *testing.T) {
notifier := &remotedbserver.Events{}
initialCycle := true
highestSeenHeader := uint64(chain.TopBlock.NumberU64())
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.ChainConfig, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
@ -227,7 +227,7 @@ func TestReorg(t *testing.T) {
highestSeenHeader = uint64(short.TopBlock.NumberU64())
initialCycle = false
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.ChainConfig, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
@ -271,7 +271,7 @@ func TestReorg(t *testing.T) {
// This is unwind step
highestSeenHeader = uint64(long1.TopBlock.NumberU64())
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.ChainConfig, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
@ -309,7 +309,7 @@ func TestReorg(t *testing.T) {
highestSeenHeader = uint64(short2.TopBlock.NumberU64())
initialCycle = false
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.ChainConfig, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}
@ -409,7 +409,7 @@ func TestAnchorReplace(t *testing.T) {
highestSeenHeader := uint64(long.TopBlock.NumberU64())
notifier := &remotedbserver.Events{}
initialCycle := true
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.ChainConfig, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}
@ -517,7 +517,7 @@ func TestAnchorReplace2(t *testing.T) {
highestSeenHeader := uint64(long.TopBlock.NumberU64())
notifier := &remotedbserver.Events{}
initialCycle := true
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.ChainConfig, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}

View File

@ -17,13 +17,14 @@ import (
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/ethdb/kv"
"github.com/ledgerwatch/erigon/log"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
"github.com/ledgerwatch/erigon/turbo/txpool"
)
@ -33,11 +34,13 @@ func NewStagedSync(
sm ethdb.StorageMode,
headers stagedsync.HeadersCfg,
blockHashes stagedsync.BlockHashesCfg,
snapshotHeaderGen stagedsync.HeadersSnapshotGenCfg,
snapshotHeader stagedsync.SnapshotHeadersCfg,
bodies stagedsync.BodiesCfg,
snapshotBodies stagedsync.SnapshotBodiesCfg,
senders stagedsync.SendersCfg,
exec stagedsync.ExecuteBlockCfg,
trans stagedsync.TranspileCfg,
snapshotState stagedsync.SnapshotStateCfg,
hashState stagedsync.HashStateCfg,
trieCfg stagedsync.TrieCfg,
history stagedsync.HistoryCfg,
@ -49,7 +52,7 @@ func NewStagedSync(
test bool,
) *stagedsync.StagedSync {
return stagedsync.New(
stagedsync.ReplacementStages(ctx, sm, headers, blockHashes, snapshotHeaderGen, bodies, senders, exec, trans, hashState, trieCfg, history, logIndex, callTraces, txLookup, txPool, finish, test),
stagedsync.DefaultStages(ctx, sm, headers, blockHashes, snapshotHeader, bodies, snapshotBodies, senders, exec, trans, snapshotState, hashState, trieCfg, history, logIndex, callTraces, txLookup, txPool, finish, test),
stagedsync.ReplacementUnwindOrder(),
stagedsync.OptionalParameters{},
)
@ -83,7 +86,7 @@ func StageLoop(
if !initialCycle && stateStream {
accumulator = &shards.Accumulator{}
}
if err := StageLoopStep(ctx, db, sync, height, chainConfig, notifier, initialCycle, accumulator, updateHead, sync.GetSnapshotMigratorFinal()); err != nil {
if err := StageLoopStep(ctx, db, sync, height, notifier, initialCycle, accumulator, updateHead, sync.GetSnapshotMigratorFinal()); err != nil {
if errors.Is(err, common.ErrStopped) {
return
}
@ -105,7 +108,6 @@ func StageLoopStep(
db ethdb.RwKV,
sync *stagedsync.StagedSync,
highestSeenHeader uint64,
chainConfig *params.ChainConfig,
notifier stagedsync.ChainEventNotifier,
initialCycle bool,
accumulator *shards.Accumulator,
@ -113,13 +115,8 @@ func StageLoopStep(
snapshotMigratorFinal func(tx ethdb.Tx) error,
) (err error) {
defer func() { err = debug.ReportPanicAndRecover() }() // avoid crash because Erigon's core does many things -
var sm ethdb.StorageMode
var origin, hashStateStageProgress, finishProgressBefore, unwindTo uint64
if err := db.View(ctx, func(tx ethdb.Tx) error {
sm, err = ethdb.GetStorageModeFromDB(tx)
if err != nil {
return err
}
origin, err = stages.GetStageProgress(tx, stages.Headers)
if err != nil {
return err
@ -146,7 +143,7 @@ func StageLoopStep(
return err
}
st, err1 := sync.Prepare(&vm.Config{}, kv.NewObjectDatabase(db), nil, sm, ctx.Done(), initialCycle, nil, accumulator)
st, err1 := sync.Prepare(db, nil, ctx.Done(), initialCycle, nil, accumulator)
if err1 != nil {
return fmt.Errorf("prepare staged sync: %w", err1)
}
@ -225,10 +222,8 @@ func MiningStep(ctx context.Context, kv ethdb.RwKV, mining *stagedsync.StagedSyn
}
defer tx.Rollback()
miningState, err := mining.Prepare(
nil,
nil,
tx,
ethdb.DefaultStorageMode,
ctx.Done(),
false,
stagedsync.StageMiningCfg(true),
@ -252,9 +247,11 @@ func NewStagedSync2(
bodyDownloadTimeout int,
controlServer *download.ControlServerImpl,
tmpdir string,
snapshotsDir string,
snapshotCfg ethconfig.Snapshot,
txPool *core.TxPool,
txPoolServer *txpool.P2PServer,
client *snapshotsync.Client, snapshotMigrator *snapshotsync.SnapshotMigrator,
) (*stagedsync.StagedSync, error) {
var pruningDistance uint64
if !sm.History {
@ -272,7 +269,7 @@ func NewStagedSync2(
batchSize,
),
stagedsync.StageBlockHashesCfg(db, tmpdir),
stagedsync.StageHeadersSnapshotGenCfg(db, snapshotsDir),
stagedsync.StageSnapshotHeadersCfg(db, snapshotCfg, client, snapshotMigrator),
stagedsync.StageBodiesCfg(
db,
controlServer.Bd,
@ -283,6 +280,7 @@ func NewStagedSync2(
*controlServer.ChainConfig,
batchSize,
),
stagedsync.StageSnapshotBodiesCfg(db, snapshotCfg, client, snapshotMigrator, tmpdir),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir),
stagedsync.StageExecuteBlocksCfg(
db,
@ -304,6 +302,7 @@ func NewStagedSync2(
nil,
controlServer.ChainConfig,
),
stagedsync.StageSnapshotStateCfg(db, snapshotCfg, tmpdir, client, snapshotMigrator),
stagedsync.StageHashStateCfg(db, tmpdir),
stagedsync.StageTrieCfg(db, true, true, tmpdir),
stagedsync.StageHistoryCfg(db, tmpdir),
@ -318,7 +317,7 @@ func NewStagedSync2(
}
txPoolServer.TxFetcher.Start()
}),
stagedsync.StageFinishCfg(db, tmpdir),
stagedsync.StageFinishCfg(db, tmpdir, client, snapshotMigrator),
false, /* test */
), nil
}