move_integration_to_replacement_stages (#1980)

This commit is contained in:
Alex Sharov 2021-05-21 12:21:06 +07:00 committed by GitHub
parent 4dcfd24af5
commit 1df1b85d64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 67 additions and 31 deletions

View File

@ -119,6 +119,8 @@ var cmdSnapshotCheck = &cobra.Command{
func snapshotCheck(ctx context.Context, db ethdb.Database, isNew bool, tmpDir string) (err error) {
kv := db.RwKV()
sm, engine, chainConfig, vmConfig, _, st, _ := newSync3(db)
var snapshotBlock uint64 = 11_000_000
blockNum, err := stages.GetStageProgress(db, stages.Execution)
if err != nil {
@ -209,7 +211,6 @@ func snapshotCheck(ctx context.Context, db ethdb.Database, isNew bool, tmpDir st
defer tx.Rollback()
tmpdir := path.Join(datadir, etl.TmpDirName)
sm, engine, chainConfig, vmConfig, _, st, _ := newSync2(db, tx)
sync, err := st.Prepare(nil, chainConfig, engine, vmConfig, db, tx, "integration_test", sm, tmpdir, 0, ctx.Done(), nil, nil, false, nil)
if err != nil {
return nil

View File

@ -4,13 +4,14 @@ import (
"context"
"fmt"
"path"
"runtime"
"sort"
"strings"
"unsafe"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon/cmd/headers/download"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/common/etl"
"github.com/ledgerwatch/erigon/consensus"
@ -18,7 +19,9 @@ import (
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/fetcher"
"github.com/ledgerwatch/erigon/eth/integrity"
"github.com/ledgerwatch/erigon/eth/protocols/eth"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb"
@ -353,6 +356,7 @@ func stageBodies(db ethdb.Database, ctx context.Context) error {
func stageSenders(db ethdb.Database, ctx context.Context) error {
kv := db.RwKV()
tmpdir := path.Join(datadir, etl.TmpDirName)
sm, engine, chainConfig, vmConfig, _, st, _ := newSync3(db)
tx, err := kv.BeginRw(ctx)
if err != nil {
@ -360,7 +364,6 @@ func stageSenders(db ethdb.Database, ctx context.Context) error {
}
defer tx.Rollback()
sm, engine, chainConfig, vmConfig, _, st, _ := newSync2(db, tx)
sync, err := st.Prepare(nil, chainConfig, engine, vmConfig, db, tx, "integration_test", sm, tmpdir, 0, ctx.Done(), nil, nil, false, nil)
if err != nil {
return nil
@ -412,13 +415,13 @@ func silkwormExecutionFunc() unsafe.Pointer {
func stageExec(db ethdb.Database, ctx context.Context) error {
kv := db.RwKV()
sm, engine, chainConfig, vmConfig, _, st, _ := newSync3(db)
tx, err := kv.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
tmpdir := path.Join(datadir, etl.TmpDirName)
sm, engine, chainConfig, vmConfig, _, st, _ := newSync2(db, tx)
sync, err := st.Prepare(nil, chainConfig, engine, vmConfig, db, tx, "integration_test", sm, tmpdir, 0, ctx.Done(), nil, nil, false, nil)
if err != nil {
return nil
@ -462,6 +465,7 @@ func stageExec(db ethdb.Database, ctx context.Context) error {
func stageTrie(db ethdb.Database, ctx context.Context) error {
kv := db.RwKV()
sm, engine, chainConfig, vmConfig, _, st, _ := newSync3(db)
tx, err := kv.BeginRw(ctx)
if err != nil {
return err
@ -469,7 +473,6 @@ func stageTrie(db ethdb.Database, ctx context.Context) error {
defer tx.Rollback()
tmpdir := path.Join(datadir, etl.TmpDirName)
sm, engine, chainConfig, vmConfig, _, st, _ := newSync2(db, tx)
sync, err := st.Prepare(nil, chainConfig, engine, vmConfig, db, tx, "integration_test", sm, tmpdir, 0, ctx.Done(), nil, nil, false, nil)
if err != nil {
return nil
@ -507,13 +510,13 @@ func stageHashState(db ethdb.Database, ctx context.Context) error {
kv := db.RwKV()
tmpdir := path.Join(datadir, etl.TmpDirName)
sm, engine, chainConfig, vmConfig, _, st, _ := newSync3(db)
tx, err := kv.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
sm, engine, chainConfig, vmConfig, _, st, _ := newSync2(db, tx)
sync, err := st.Prepare(nil, chainConfig, engine, vmConfig, db, tx, "integration_test", sm, tmpdir, 0, ctx.Done(), nil, nil, false, nil)
if err != nil {
return nil
@ -552,13 +555,13 @@ func stageLogIndex(db ethdb.Database, ctx context.Context) error {
kv := db.RwKV()
tmpdir := path.Join(datadir, etl.TmpDirName)
sm, engine, chainConfig, vmConfig, _, st, _ := newSync3(db)
tx, err := kv.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
sm, engine, chainConfig, vmConfig, _, st, _ := newSync2(db, tx)
sync, err := st.Prepare(nil, chainConfig, engine, vmConfig, db, tx, "integration_test", sm, tmpdir, 0, ctx.Done(), nil, nil, false, nil)
if err != nil {
return nil
@ -597,13 +600,13 @@ func stageCallTraces(db ethdb.Database, ctx context.Context) error {
kv := db.RwKV()
tmpdir := path.Join(datadir, etl.TmpDirName)
sm, engine, chainConfig, vmConfig, _, st, _ := newSync3(db)
tx, err := kv.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
sm, engine, chainConfig, vmConfig, _, st, _ := newSync2(db, tx)
sync, err := st.Prepare(nil, chainConfig, engine, vmConfig, db, tx, "integration_test", sm, tmpdir, 0, ctx.Done(), nil, nil, false, nil)
if err != nil {
return nil
@ -647,13 +650,13 @@ func stageCallTraces(db ethdb.Database, ctx context.Context) error {
func stageHistory(db ethdb.Database, ctx context.Context) error {
kv := db.RwKV()
tmpdir := path.Join(datadir, etl.TmpDirName)
sm, engine, chainConfig, vmConfig, _, st, _ := newSync3(db)
tx, err := kv.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
sm, engine, chainConfig, vmConfig, _, st, _ := newSync2(db, tx)
sync, err := st.Prepare(nil, chainConfig, engine, vmConfig, db, tx, "integration_test", sm, tmpdir, 0, ctx.Done(), nil, nil, false, nil)
if err != nil {
return nil
@ -700,13 +703,14 @@ func stageTxLookup(db ethdb.Database, ctx context.Context) error {
kv := db.RwKV()
tmpdir := path.Join(datadir, etl.TmpDirName)
sm, engine, chainConfig, vmConfig, _, st, _ := newSync3(db)
tx, err := kv.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
sm, engine, chainConfig, vmConfig, _, st, _ := newSync2(db, tx)
sync, err := st.Prepare(nil, chainConfig, engine, vmConfig, db, tx, "integration_test", sm, tmpdir, 0, ctx.Done(), nil, nil, false, nil)
if err != nil {
return nil
@ -767,38 +771,63 @@ func removeMigration(db ethdb.Deleter, _ context.Context) error {
return nil
}
func newSync2(db ethdb.Database, tx ethdb.RwTx) (ethdb.StorageMode, consensus.Engine, *params.ChainConfig, *vm.Config, *core.TxPool, *stagedsync.StagedSync, *stagedsync.StagedSync) {
func newSync3(db ethdb.Database) (ethdb.StorageMode, consensus.Engine, *params.ChainConfig, *vm.Config, *core.TxPool, *stagedsync.StagedSync, *stagedsync.StagedSync) {
var sm ethdb.StorageMode
var err error
if tx != nil {
sm, err = ethdb.GetStorageModeFromDB(tx)
if err != nil {
panic(err)
}
} else {
sm, err = ethdb.GetStorageModeFromDB(db)
if err != nil {
panic(err)
}
sm, err = ethdb.GetStorageModeFromDB(db)
if err != nil {
panic(err)
}
vmConfig := &vm.Config{NoReceipts: !sm.Receipts}
chainConfig := params.MainnetChainConfig
events := remotedbserver.NewEvents()
txCacher := core.NewTxSenderCacher(runtime.NumCPU())
txCacher := core.NewTxSenderCacher(1)
txPool := core.NewTxPool(ethconfig.Defaults.TxPool, chainConfig, db, txCacher)
st := stagedsync.New(
stagedsync.DefaultStages(),
stagedsync.DefaultUnwindOrder(),
stagedsync.OptionalParameters{SilkwormExecutionFunc: silkwormExecutionFunc(), Notifier: events},
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(db, core.DefaultGenesisBlock(), sm.History, false /* overwrite */)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
panic(genesisErr)
}
log.Info("Initialised chain configuration", "config", chainConfig)
var batchSize datasize.ByteSize
must(batchSize.UnmarshalText([]byte(batchSizeStr)))
bodyDownloadTimeoutSeconds := 30 // TODO: convert to duration, make configurable
engine := ethash.NewFaker()
blockDownloaderWindow := 65536
downloadServer, err := download.NewControlServer(db.RwKV(), "", chainConfig, genesisHash, engine, 1, nil, blockDownloaderWindow)
if err != nil {
panic(err)
}
txPoolP2PServer, err := eth.NewTxPoolServer(context.Background(), nil, txPool)
if err != nil {
panic(err)
}
fetchTx := func(peerID string, hashes []common.Hash) error {
txPoolP2PServer.SendTxsRequest(context.TODO(), peerID, hashes)
return nil
}
txPoolP2PServer.TxFetcher = fetcher.NewTxFetcher(txPool.Has, txPool.AddRemotes, fetchTx)
st, err := download.NewStagedSync(context.Background(), db.RwKV(), sm, batchSize,
bodyDownloadTimeoutSeconds,
downloadServer,
path.Join(datadir, etl.TmpDirName),
txPool,
txPoolP2PServer,
)
if err != nil {
panic(err)
}
stMining := stagedsync.New(
stagedsync.MiningStages(),
stagedsync.MiningUnwindOrder(),
stagedsync.OptionalParameters{SilkwormExecutionFunc: silkwormExecutionFunc(), Notifier: events},
)
return sm, ethash.NewFaker(), chainConfig, vmConfig, txPool, st, stMining
return sm, engine, chainConfig, vmConfig, txPool, st, stMining
}
func progress(tx ethdb.KVGetter, stage stages.SyncStage) uint64 {

View File

@ -139,6 +139,8 @@ func init() {
func syncBySmallSteps(db ethdb.Database, miningConfig params.MiningConfig, ctx context.Context) error {
kv := db.RwKV()
sm, engine, chainConfig, vmConfig, txPool, st, mining := newSync3(db)
tx, err := kv.BeginRw(ctx)
if err != nil {
return err
@ -170,12 +172,16 @@ func syncBySmallSteps(db ethdb.Database, miningConfig params.MiningConfig, ctx c
}
}
sm, engine, chainConfig, vmConfig, txPool, st, mining := newSync2(db, tx)
stateStages, err2 := st.Prepare(nil, chainConfig, engine, vmConfig, db, tx, "integration_test", sm, tmpDir, batchSize, quit, nil, txPool, false, nil)
if err2 != nil {
panic(err2)
}
stateStages.DisableStages(stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders, stages.Finish)
stateStages.DisableStages(stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders,
stages.CreateHeadersSnapshot,
stages.CreateBodiesSnapshot,
stages.CreateStateSnapshot,
stages.TxPool, // TODO: enable TxPool stage
stages.Finish)
execCfg := stagedsync.StageExecuteBlocksCfg(kv, sm.Receipts, sm.CallTraces, batchSize, nil, nil, nil, changeSetHook, chainConfig, engine, vmConfig, tmpDir)
@ -415,6 +421,7 @@ func checkMinedBlock(b1, b2 *types.Block, chainConfig *params.ChainConfig) {
func loopIh(db ethdb.Database, ctx context.Context, unwind uint64) error {
ch := ctx.Done()
kv := db.RwKV()
sm, engine, chainConfig, vmConfig, _, st, _ := newSync3(db)
tmpdir := path.Join(datadir, etl.TmpDirName)
tx, err := kv.BeginRw(ctx)
if err != nil {
@ -422,7 +429,6 @@ func loopIh(db ethdb.Database, ctx context.Context, unwind uint64) error {
}
defer tx.Rollback()
sm, engine, chainConfig, vmConfig, _, st, _ := newSync2(db, tx)
sync, err := st.Prepare(nil, chainConfig, engine, vmConfig, db, tx, "integration_test", sm, tmpdir, 0, ctx.Done(), nil, nil, false, nil)
if err != nil {
return nil
@ -491,13 +497,13 @@ func loopExec(db ethdb.Database, ctx context.Context, unwind uint64) error {
tmpdir := path.Join(datadir, etl.TmpDirName)
ch := ctx.Done()
sm, engine, chainConfig, vmConfig, _, st, _ := newSync3(db)
tx, err := kv.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
sm, engine, chainConfig, vmConfig, _, st, _ := newSync2(db, tx)
sync, err := st.Prepare(nil, chainConfig, engine, vmConfig, db, tx, "integration_test", sm, tmpdir, 0, ctx.Done(), nil, nil, false, nil)
if err != nil {
return nil