From 2ec64aee1d0ff650c86d3f4ec787e885c50cea21 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sun, 11 Jul 2021 04:05:33 +0000 Subject: [PATCH] Simplify mining (#2339) * save * save --- cmd/integration/commands/snapshot_check.go | 2 +- cmd/integration/commands/stages.go | 40 +++++++++++--------- cmd/integration/commands/state_stages.go | 42 ++++++++++----------- eth/backend.go | 10 +++-- eth/stagedsync/stage_mining_create_block.go | 40 +++++++++++++++----- eth/stagedsync/stage_mining_exec.go | 20 ++++++---- eth/stagedsync/stage_mining_finish.go | 36 +++++++++--------- eth/stagedsync/stagebuilder.go | 34 ++--------------- eth/stagedsync/stagedsync.go | 2 - turbo/stages/mock_sentry.go | 11 +++--- turbo/stages/stageloop.go | 3 +- 11 files changed, 120 insertions(+), 120 deletions(-) diff --git a/cmd/integration/commands/snapshot_check.go b/cmd/integration/commands/snapshot_check.go index ada2d8af9..6d7f251c5 100644 --- a/cmd/integration/commands/snapshot_check.go +++ b/cmd/integration/commands/snapshot_check.go @@ -104,7 +104,7 @@ var cmdSnapshotCheck = &cobra.Command{ } func snapshotCheck(ctx context.Context, db ethdb.RwKV, isNew bool, tmpDir string) (err error) { - _, engine, chainConfig, vmConfig, _, sync, _, _, _ := newSync(ctx, db) + _, engine, chainConfig, vmConfig, _, sync, _, _ := newSync(ctx, db, nil) var snapshotBlock uint64 = 11_000_000 var lastBlockHeaderNumber, blockNum uint64 diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index cb4d9514b..f162ba1b0 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -16,7 +16,6 @@ import ( "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus/ethash" "github.com/ledgerwatch/erigon/core" - "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/fetcher" @@ -367,7 +366,7 @@ func stageBodies(db ethdb.RwKV, ctx context.Context) error { func stageSenders(db ethdb.RwKV, ctx context.Context) error { tmpdir := path.Join(datadir, etl.TmpDirName) - _, _, chainConfig, _, _, sync, _, _, _ := newSync(ctx, db) + _, _, chainConfig, _, _, sync, _, _ := newSync(ctx, db, nil) tx, err := db.BeginRw(ctx) if err != nil { @@ -408,7 +407,7 @@ func stageSenders(db ethdb.RwKV, ctx context.Context) error { } func stageExec(db ethdb.RwKV, ctx context.Context) error { - sm, engine, chainConfig, vmConfig, _, sync, _, _, _ := newSync(ctx, db) + sm, engine, chainConfig, vmConfig, _, sync, _, _ := newSync(ctx, db, nil) if reset { genesis, _ := byChain() @@ -454,7 +453,7 @@ func stageExec(db ethdb.RwKV, ctx context.Context) error { } func stageTrie(db ethdb.RwKV, ctx context.Context) error { - _, _, _, _, _, sync, _, _, _ := newSync(ctx, db) + _, _, _, _, _, sync, _, _ := newSync(ctx, db, nil) tmpdir := path.Join(datadir, etl.TmpDirName) if reset { @@ -497,7 +496,7 @@ func stageTrie(db ethdb.RwKV, ctx context.Context) error { func stageHashState(db ethdb.RwKV, ctx context.Context) error { tmpdir := path.Join(datadir, etl.TmpDirName) - _, _, _, _, _, sync, _, _, _ := newSync(ctx, db) + _, _, _, _, _, sync, _, _ := newSync(ctx, db, nil) tx, err := db.BeginRw(ctx) if err != nil { @@ -537,7 +536,7 @@ func stageHashState(db ethdb.RwKV, ctx context.Context) error { func stageLogIndex(db ethdb.RwKV, ctx context.Context) error { tmpdir := path.Join(datadir, etl.TmpDirName) - _, _, _, _, _, sync, _, _, _ := newSync(ctx, db) + _, _, _, _, _, sync, _, _ := newSync(ctx, db, nil) tx, err := db.BeginRw(ctx) if err != nil { return err @@ -576,7 +575,7 @@ func stageLogIndex(db ethdb.RwKV, ctx context.Context) error { func stageCallTraces(kv ethdb.RwKV, ctx context.Context) error { tmpdir := path.Join(datadir, etl.TmpDirName) - _, engine, chainConfig, _, _, sync, _, _, _ := newSync(ctx, kv) + _, engine, chainConfig, _, _, sync, _, _ := newSync(ctx, kv, nil) tx, err := kv.BeginRw(ctx) if err != nil { return err @@ -620,7 +619,7 @@ func stageCallTraces(kv ethdb.RwKV, ctx context.Context) error { func stageHistory(db ethdb.RwKV, ctx context.Context) error { tmpdir := path.Join(datadir, etl.TmpDirName) - _, _, _, _, _, sync, _, _, _ := newSync(ctx, db) + _, _, _, _, _, sync, _, _ := newSync(ctx, db, nil) tx, err := db.BeginRw(ctx) if err != nil { return err @@ -667,7 +666,7 @@ func stageHistory(db ethdb.RwKV, ctx context.Context) error { func stageTxLookup(db ethdb.RwKV, ctx context.Context) error { tmpdir := path.Join(datadir, etl.TmpDirName) - _, _, _, _, _, sync, _, _, _ := newSync(ctx, db) + _, _, _, _, _, sync, _, _ := newSync(ctx, db, nil) tx, err := db.BeginRw(ctx) if err != nil { @@ -758,7 +757,7 @@ func byChain() (*core.Genesis, *params.ChainConfig) { return genesis, chainConfig } -func newSync(ctx context.Context, db ethdb.RwKV) (ethdb.StorageMode, consensus.Engine, *params.ChainConfig, *vm.Config, *core.TxPool, *stagedsync.State, *stagedsync.StagedSync, chan *types.Block, chan *types.Block) { +func newSync(ctx context.Context, db ethdb.RwKV, miningConfig *params.MiningConfig) (ethdb.StorageMode, consensus.Engine, *params.ChainConfig, *vm.Config, *core.TxPool, *stagedsync.State, *stagedsync.State, stagedsync.MiningState) { tmpdir := path.Join(datadir, etl.TmpDirName) snapshotDir = path.Join(datadir, "erigon", "snapshot") @@ -817,6 +816,9 @@ func newSync(ctx context.Context, db ethdb.RwKV) (ethdb.StorageMode, consensus.E cfg := ethconfig.Defaults cfg.StorageMode = sm cfg.BatchSize = batchSize + if miningConfig != nil { + cfg.Miner = *miningConfig + } st, err := stages2.NewStagedSync2(context.Background(), db, cfg, downloadServer, @@ -828,24 +830,28 @@ func newSync(ctx context.Context, db ethdb.RwKV) (ethdb.StorageMode, consensus.E if err != nil { panic(err) } - pendingResultCh := make(chan *types.Block, 1) - miningResultCh := make(chan *types.Block, 1) + miner := stagedsync.NewMiningState(&cfg.Miner) stMining := stagedsync.New( stagedsync.MiningStages( - stagedsync.StageMiningCreateBlockCfg(db, ethconfig.Defaults.Miner, *chainConfig, engine, txPool, tmpdir), - stagedsync.StageMiningExecCfg(db, ethconfig.Defaults.Miner, events, *chainConfig, engine, &vm.Config{}, tmpdir), + stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, txPool, tmpdir), + stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, tmpdir), stagedsync.StageHashStateCfg(db, tmpdir), stagedsync.StageTrieCfg(db, false, true, tmpdir), - stagedsync.StageMiningFinishCfg(db, *chainConfig, engine, pendingResultCh, miningResultCh, nil), + stagedsync.StageMiningFinishCfg(db, *chainConfig, engine, miner, ctx.Done()), ), stagedsync.MiningUnwindOrder(), stagedsync.OptionalParameters{}, ) var sync *stagedsync.State + var miningSync *stagedsync.State if err := db.View(context.Background(), func(tx ethdb.Tx) (err error) { - sync, err = st.Prepare(nil, tx, ctx.Done(), false, nil) + sync, err = st.Prepare(nil, tx, ctx.Done(), false) + if err != nil { + return nil + } + miningSync, err = stMining.Prepare(nil, tx, ctx.Done(), false) if err != nil { return nil } @@ -854,7 +860,7 @@ func newSync(ctx context.Context, db ethdb.RwKV) (ethdb.StorageMode, consensus.E panic(err) } - return sm, engine, chainConfig, vmConfig, txPool, sync, stMining, pendingResultCh, miningResultCh + return sm, engine, chainConfig, vmConfig, txPool, sync, miningSync, miner } func progress(tx ethdb.KVGetter, stage stages.SyncStage) uint64 { diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index 00df2fa05..0e39a23bd 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -139,7 +139,7 @@ func init() { } func syncBySmallSteps(db ethdb.RwKV, miningConfig params.MiningConfig, ctx context.Context) error { - sm, engine, chainConfig, vmConfig, txPool, stateStages, mining, _, miningResultCh := newSync(ctx, db) + sm, engine, chainConfig, vmConfig, txPool, stateStages, miningStages, miner := newSync(ctx, db, &miningConfig) tx, err := db.BeginRw(ctx) if err != nil { @@ -305,33 +305,28 @@ func syncBySmallSteps(db ethdb.RwKV, miningConfig params.MiningConfig, ctx conte panic(err) } - if miningConfig.Enabled && nextBlock != nil && nextBlock.Header().Coinbase != (common.Address{}) { - miningWorld := stagedsync.StageMiningCfg(true) - - miningConfig.Etherbase = nextBlock.Header().Coinbase - miningConfig.ExtraData = nextBlock.Header().Extra - miningStages, err := mining.Prepare(db, tx, quit, false, miningWorld) - if err != nil { - panic(err) - } - // Use all non-mining fields from nextBlock + if miner.MiningConfig.Enabled && nextBlock != nil && nextBlock.Header().Coinbase != (common.Address{}) { + miner.MiningConfig.Etherbase = nextBlock.Header().Coinbase + miner.MiningConfig.ExtraData = nextBlock.Header().Extra miningStages.MockExecFunc(stages.MiningCreateBlock, func(s *stagedsync.StageState, u stagedsync.Unwinder, tx ethdb.RwTx) error { err = stagedsync.SpawnMiningCreateBlockStage(s, tx, stagedsync.StageMiningCreateBlockCfg(db, - miningConfig, + miner, *chainConfig, engine, txPool, tmpDir), - miningWorld.Block, quit) - miningWorld.Block.Uncles = nextBlock.Uncles() - miningWorld.Block.Header.Time = nextBlock.Header().Time - miningWorld.Block.Header.GasLimit = nextBlock.Header().GasLimit - miningWorld.Block.Header.Difficulty = nextBlock.Header().Difficulty - miningWorld.Block.Header.Nonce = nextBlock.Header().Nonce - miningWorld.Block.LocalTxs = types.NewTransactionsFixedOrder(nextBlock.Transactions()) - miningWorld.Block.RemoteTxs = types.NewTransactionsFixedOrder(nil) + if err != nil { + return err + } + miner.MiningBlock.Uncles = nextBlock.Uncles() + miner.MiningBlock.Header.Time = nextBlock.Header().Time + miner.MiningBlock.Header.GasLimit = nextBlock.Header().GasLimit + miner.MiningBlock.Header.Difficulty = nextBlock.Header().Difficulty + miner.MiningBlock.Header.Nonce = nextBlock.Header().Nonce + miner.MiningBlock.LocalTxs = types.NewTransactionsFixedOrder(nextBlock.Transactions()) + miner.MiningBlock.RemoteTxs = types.NewTransactionsFixedOrder(nil) //debugprint.Headers(miningWorld.Block.Header, nextBlock.Header()) return err }) @@ -341,6 +336,7 @@ func syncBySmallSteps(db ethdb.RwKV, miningConfig params.MiningConfig, ctx conte //return stagedsync.SpawnMiningFinishStage(s, tx, miningWorld.Block, cc.Engine(), chainConfig, quit) //}) + _ = miningStages.SetCurrentStage(stages.MiningCreateBlock) if err := miningStages.Run(db, tx); err != nil { return err } @@ -350,7 +346,7 @@ func syncBySmallSteps(db ethdb.RwKV, miningConfig params.MiningConfig, ctx conte return err } defer tx.Rollback() - minedBlock := <-miningResultCh + minedBlock := <-miner.MiningResultCh checkMinedBlock(nextBlock, minedBlock, chainConfig) } @@ -414,7 +410,7 @@ func checkMinedBlock(b1, b2 *types.Block, chainConfig *params.ChainConfig) { func loopIh(db ethdb.RwKV, ctx context.Context, unwind uint64) error { ch := ctx.Done() - _, _, _, _, _, sync, _, _, _ := newSync(ctx, db) + _, _, _, _, _, sync, _, _ := newSync(ctx, db, nil) tmpdir := path.Join(datadir, etl.TmpDirName) tx, err := db.BeginRw(ctx) if err != nil { @@ -481,7 +477,7 @@ func loopIh(db ethdb.RwKV, ctx context.Context, unwind uint64) error { func loopExec(db ethdb.RwKV, ctx context.Context, unwind uint64) error { ch := ctx.Done() - _, engine, chainConfig, vmConfig, _, sync, _, _, _ := newSync(ctx, db) + _, engine, chainConfig, vmConfig, _, sync, _, _ := newSync(ctx, db, nil) tx, err := db.BeginRw(ctx) if err != nil { diff --git a/eth/backend.go b/eth/backend.go index 5db692228..400a58afb 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -265,13 +265,17 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { backend.pendingBlocks = make(chan *types.Block, 1) backend.minedBlocks = make(chan *types.Block, 1) + miner := stagedsync.NewMiningState(&config.Miner) + backend.pendingBlocks = miner.PendingResultCh + backend.minedBlocks = miner.MiningResultCh + mining := stagedsync.New( stagedsync.MiningStages( - stagedsync.StageMiningCreateBlockCfg(backend.chainKV, backend.config.Miner, *backend.chainConfig, backend.engine, backend.txPool, tmpdir), - stagedsync.StageMiningExecCfg(backend.chainKV, backend.config.Miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir), + stagedsync.StageMiningCreateBlockCfg(backend.chainKV, miner, *backend.chainConfig, backend.engine, backend.txPool, tmpdir), + stagedsync.StageMiningExecCfg(backend.chainKV, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir), stagedsync.StageHashStateCfg(backend.chainKV, tmpdir), stagedsync.StageTrieCfg(backend.chainKV, false, true, tmpdir), - stagedsync.StageMiningFinishCfg(backend.chainKV, *backend.chainConfig, backend.engine, backend.pendingBlocks, backend.minedBlocks, backend.miningSealingQuit), + stagedsync.StageMiningFinishCfg(backend.chainKV, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit), ), stagedsync.MiningUnwindOrder(), stagedsync.OptionalParameters{}) var ethashApi *ethash.API diff --git a/eth/stagedsync/stage_mining_create_block.go b/eth/stagedsync/stage_mining_create_block.go index da5c37d6f..a85ebb6a0 100644 --- a/eth/stagedsync/stage_mining_create_block.go +++ b/eth/stagedsync/stage_mining_create_block.go @@ -21,7 +21,7 @@ import ( "github.com/ledgerwatch/erigon/params" ) -type miningBlock struct { +type MiningBlock struct { Header *types.Header Uncles []*types.Header Txs []types.Transaction @@ -31,9 +31,25 @@ type miningBlock struct { RemoteTxs types.TransactionsStream } +type MiningState struct { + MiningConfig *params.MiningConfig + PendingResultCh chan *types.Block + MiningResultCh chan *types.Block + MiningBlock *MiningBlock +} + +func NewMiningState(cfg *params.MiningConfig) MiningState { + return MiningState{ + MiningConfig: cfg, + PendingResultCh: make(chan *types.Block, 1), + MiningResultCh: make(chan *types.Block, 1), + MiningBlock: &MiningBlock{}, + } +} + type MiningCreateBlockCfg struct { db ethdb.RwKV - mining params.MiningConfig + miner MiningState chainConfig params.ChainConfig engine consensus.Engine txPool *core.TxPool @@ -42,7 +58,7 @@ type MiningCreateBlockCfg struct { func StageMiningCreateBlockCfg( db ethdb.RwKV, - mining params.MiningConfig, + miner MiningState, chainConfig params.ChainConfig, engine consensus.Engine, txPool *core.TxPool, @@ -50,7 +66,7 @@ func StageMiningCreateBlockCfg( ) MiningCreateBlockCfg { return MiningCreateBlockCfg{ db: db, - mining: mining, + miner: miner, chainConfig: chainConfig, engine: engine, txPool: txPool, @@ -61,19 +77,22 @@ func StageMiningCreateBlockCfg( // SpawnMiningCreateBlockStage //TODO: // - resubmitAdjustCh - variable is not implemented -func SpawnMiningCreateBlockStage(s *StageState, tx ethdb.RwTx, cfg MiningCreateBlockCfg, current *miningBlock, quit <-chan struct{}) error { +func SpawnMiningCreateBlockStage(s *StageState, tx ethdb.RwTx, cfg MiningCreateBlockCfg, quit <-chan struct{}) error { txPoolLocals := cfg.txPool.Locals() pendingTxs, err := cfg.txPool.Pending() if err != nil { return err } + current := cfg.miner.MiningBlock + coinbase := cfg.miner.MiningConfig.Etherbase + const ( // staleThreshold is the maximum depth of the acceptable stale block. staleThreshold = 7 ) - if cfg.mining.Etherbase == (common.Address{}) { + if cfg.miner.MiningConfig.Etherbase == (common.Address{}) { return fmt.Errorf("refusing to mine without etherbase") } @@ -90,7 +109,7 @@ func SpawnMiningCreateBlockStage(s *StageState, tx ethdb.RwTx, cfg MiningCreateB blockNum := executionAt + 1 signer := types.MakeSigner(&cfg.chainConfig, blockNum) - localUncles, remoteUncles, err := readNonCanonicalHeaders(tx, blockNum, cfg.engine, cfg.mining.Etherbase, txPoolLocals) + localUncles, remoteUncles, err := readNonCanonicalHeaders(tx, blockNum, cfg.engine, coinbase, txPoolLocals) if err != nil { return err } @@ -134,14 +153,14 @@ func SpawnMiningCreateBlockStage(s *StageState, tx ethdb.RwTx, cfg MiningCreateB header := &types.Header{ ParentHash: parent.Hash(), Number: num.Add(num, common.Big1), - GasLimit: core.CalcGasLimit(parent.GasUsed, parent.GasLimit, cfg.mining.GasFloor, cfg.mining.GasCeil), - Extra: cfg.mining.ExtraData, + GasLimit: core.CalcGasLimit(parent.GasUsed, parent.GasLimit, cfg.miner.MiningConfig.GasFloor, cfg.miner.MiningConfig.GasCeil), + Extra: cfg.miner.MiningConfig.ExtraData, Time: uint64(timestamp), } // Only set the coinbase if our consensus engine is running (avoid spurious block rewards) //if w.isRunning() { - header.Coinbase = cfg.mining.Etherbase + header.Coinbase = coinbase //} if err = cfg.engine.Prepare(chain, header); err != nil { @@ -268,6 +287,7 @@ func SpawnMiningCreateBlockStage(s *StageState, tx ethdb.RwTx, cfg MiningCreateB current.LocalTxs = types.NewTransactionsByPriceAndNonce(*signer, localTxs) current.RemoteTxs = types.NewTransactionsByPriceAndNonce(*signer, remoteTxs) s.Done() + fmt.Printf("aa: %t, %t,%t\n", current == nil, cfg.miner.MiningBlock == nil, current.Header == nil) return nil } diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index 7ee33456b..9be41df0d 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -19,7 +19,7 @@ import ( type MiningExecCfg struct { db ethdb.RwKV - mining params.MiningConfig + miningState MiningState notifier ChainEventNotifier chainConfig params.ChainConfig engine consensus.Engine @@ -29,7 +29,7 @@ type MiningExecCfg struct { func StageMiningExecCfg( db ethdb.RwKV, - mining params.MiningConfig, + miningState MiningState, notifier ChainEventNotifier, chainConfig params.ChainConfig, engine consensus.Engine, @@ -38,7 +38,7 @@ func StageMiningExecCfg( ) MiningExecCfg { return MiningExecCfg{ db: db, - mining: mining, + miningState: miningState, notifier: notifier, chainConfig: chainConfig, engine: engine, @@ -50,9 +50,13 @@ func StageMiningExecCfg( // SpawnMiningExecStage //TODO: // - resubmitAdjustCh - variable is not implemented -func SpawnMiningExecStage(s *StageState, tx ethdb.RwTx, cfg MiningExecCfg, current *miningBlock, localTxs, remoteTxs types.TransactionsStream, noempty bool, quit <-chan struct{}) error { +func SpawnMiningExecStage(s *StageState, tx ethdb.RwTx, cfg MiningExecCfg, quit <-chan struct{}) error { cfg.vmConfig.NoReceipts = false logPrefix := s.state.LogPrefix() + current := cfg.miningState.MiningBlock + localTxs := current.LocalTxs + remoteTxs := current.RemoteTxs + noempty := true ibs := state.New(state.NewPlainStateReader(tx)) stateWriter := state.NewPlainStateWriter(tx, tx, current.Header.Number.Uint64()) @@ -76,7 +80,7 @@ func SpawnMiningExecStage(s *StageState, tx ethdb.RwTx, cfg MiningExecCfg, curre // empty block is necessary to keep the liveness of the network. if noempty { if !localTxs.Empty() { - logs, err := addTransactionsToMiningBlock(current, cfg.chainConfig, cfg.vmConfig, getHeader, checkTEVM, cfg.engine, localTxs, cfg.mining.Etherbase, ibs, quit) + logs, err := addTransactionsToMiningBlock(current, cfg.chainConfig, cfg.vmConfig, getHeader, checkTEVM, cfg.engine, localTxs, cfg.miningState.MiningConfig.Etherbase, ibs, quit) if err != nil { return err } @@ -88,7 +92,7 @@ func SpawnMiningExecStage(s *StageState, tx ethdb.RwTx, cfg MiningExecCfg, curre //} } if !remoteTxs.Empty() { - logs, err := addTransactionsToMiningBlock(current, cfg.chainConfig, cfg.vmConfig, getHeader, checkTEVM, cfg.engine, remoteTxs, cfg.mining.Etherbase, ibs, quit) + logs, err := addTransactionsToMiningBlock(current, cfg.chainConfig, cfg.vmConfig, getHeader, checkTEVM, cfg.engine, remoteTxs, cfg.miningState.MiningConfig.Etherbase, ibs, quit) if err != nil { return err } @@ -141,7 +145,7 @@ func SpawnMiningExecStage(s *StageState, tx ethdb.RwTx, cfg MiningExecCfg, curre return nil } -func addTransactionsToMiningBlock(current *miningBlock, chainConfig params.ChainConfig, vmConfig *vm.Config, getHeader func(hash common.Hash, number uint64) *types.Header, checkTEVM func(common.Hash) (bool, error), engine consensus.Engine, txs types.TransactionsStream, coinbase common.Address, ibs *state.IntraBlockState, quit <-chan struct{}) (types.Logs, error) { +func addTransactionsToMiningBlock(current *MiningBlock, chainConfig params.ChainConfig, vmConfig *vm.Config, getHeader func(hash common.Hash, number uint64) *types.Header, checkTEVM func(common.Hash) (bool, error), engine consensus.Engine, txs types.TransactionsStream, coinbase common.Address, ibs *state.IntraBlockState, quit <-chan struct{}) (types.Logs, error) { header := current.Header tcount := 0 gasPool := new(core.GasPool).AddGas(current.Header.GasLimit) @@ -150,7 +154,7 @@ func addTransactionsToMiningBlock(current *miningBlock, chainConfig params.Chain var coalescedLogs types.Logs noop := state.NewNoopWriter() - var miningCommitTx = func(txn types.Transaction, coinbase common.Address, vmConfig *vm.Config, chainConfig params.ChainConfig, ibs *state.IntraBlockState, current *miningBlock) ([]*types.Log, error) { + var miningCommitTx = func(txn types.Transaction, coinbase common.Address, vmConfig *vm.Config, chainConfig params.ChainConfig, ibs *state.IntraBlockState, current *MiningBlock) ([]*types.Log, error) { snap := ibs.Snapshot() receipt, _, err := core.ApplyTransaction(&chainConfig, getHeader, engine, &coinbase, gasPool, ibs, noop, header, txn, &header.GasUsed, *vmConfig, checkTEVM) if err != nil { diff --git a/eth/stagedsync/stage_mining_finish.go b/eth/stagedsync/stage_mining_finish.go index 6dbc4c913..62627a8f3 100644 --- a/eth/stagedsync/stage_mining_finish.go +++ b/eth/stagedsync/stage_mining_finish.go @@ -12,34 +12,32 @@ import ( ) type MiningFinishCfg struct { - db ethdb.RwKV - chainConfig params.ChainConfig - engine consensus.Engine - pendingBlocksCh chan<- *types.Block - minedBlocksCh chan<- *types.Block - sealCancel <-chan struct{} + db ethdb.RwKV + chainConfig params.ChainConfig + engine consensus.Engine + sealCancel <-chan struct{} + miningState MiningState } func StageMiningFinishCfg( db ethdb.RwKV, chainConfig params.ChainConfig, engine consensus.Engine, - pendingBlocksCh chan<- *types.Block, - minedBlocksCh chan<- *types.Block, + miningState MiningState, sealCancel <-chan struct{}, ) MiningFinishCfg { return MiningFinishCfg{ - db: db, - chainConfig: chainConfig, - engine: engine, - pendingBlocksCh: pendingBlocksCh, - minedBlocksCh: minedBlocksCh, - sealCancel: sealCancel, + db: db, + chainConfig: chainConfig, + engine: engine, + miningState: miningState, + sealCancel: sealCancel, } } -func SpawnMiningFinishStage(s *StageState, tx ethdb.RwTx, current *miningBlock, cfg MiningFinishCfg, quit <-chan struct{}) error { +func SpawnMiningFinishStage(s *StageState, tx ethdb.RwTx, cfg MiningFinishCfg, quit <-chan struct{}) error { logPrefix := s.state.LogPrefix() + current := cfg.miningState.MiningBlock // Short circuit when receiving duplicate result caused by resubmitting. //if w.chain.HasBlock(block.Hash(), block.NumberU64()) { @@ -47,7 +45,7 @@ func SpawnMiningFinishStage(s *StageState, tx ethdb.RwTx, current *miningBlock, //} block := types.NewBlock(current.Header, current.Txs, current.Uncles, current.Receipts) - *current = miningBlock{} // hack to clean global data + *current = MiningBlock{} // hack to clean global data //sealHash := engine.SealHash(block.Header()) // Reject duplicate sealing work due to resubmitting. @@ -59,12 +57,12 @@ func SpawnMiningFinishStage(s *StageState, tx ethdb.RwTx, current *miningBlock, // Tests may set pre-calculated nonce if block.Header().Nonce.Uint64() != 0 { - cfg.minedBlocksCh <- block + cfg.miningState.MiningResultCh <- block s.Done() return nil } - cfg.pendingBlocksCh <- block + cfg.miningState.PendingResultCh <- block log.Info(fmt.Sprintf("[%s] block ready for seal", logPrefix), "number", block.NumberU64(), @@ -75,7 +73,7 @@ func SpawnMiningFinishStage(s *StageState, tx ethdb.RwTx, current *miningBlock, ) chain := ChainReader{Cfg: cfg.chainConfig, Db: kv.WrapIntoTxDB(tx)} - if err := cfg.engine.Seal(chain, block, cfg.minedBlocksCh, cfg.sealCancel); err != nil { + if err := cfg.engine.Seal(chain, block, cfg.miningState.MiningResultCh, cfg.sealCancel); err != nil { log.Warn("Block sealing failed", "err", err) } diff --git a/eth/stagedsync/stagebuilder.go b/eth/stagedsync/stagebuilder.go index 2df0a8cc3..eed3dab9c 100644 --- a/eth/stagedsync/stagebuilder.go +++ b/eth/stagedsync/stagebuilder.go @@ -28,27 +28,10 @@ type StageParameters struct { // the stage can take significant time and gracefully shutdown at Ctrl+C. QuitCh <-chan struct{} InitialCycle bool - mining *MiningCfg snapshotsDir string } -type MiningCfg struct { - // noempty is the flag used to control whether the feature of pre-seal empty - // block is enabled. The default value is false(pre-seal is enabled by default). - // But in some special scenario the consensus engine will seal blocks instantaneously, - // in this case this feature will add all empty blocks into canonical chain - // non-stop and no real transaction will be included. - noempty bool - - // runtime dat - Block *miningBlock -} - -func StageMiningCfg(noempty bool) *MiningCfg { - return &MiningCfg{noempty: noempty, Block: &miningBlock{}} -} - // StageBuilder represent an object to create a single stage for staged sync type StageBuilder struct { // ID is the stage identifier. Should be unique. It is recommended to prefix it with reverse domain `com.example.my-stage` to avoid conflicts. @@ -107,10 +90,7 @@ func MiningStages( ID: stages.MiningCreateBlock, Description: "Mining: construct new block from tx pool", ExecFunc: func(s *StageState, u Unwinder, tx ethdb.RwTx) error { - return SpawnMiningCreateBlockStage(s, tx, - createBlockCfg, - world.mining.Block, - world.QuitCh) + return SpawnMiningCreateBlockStage(s, tx, createBlockCfg, world.QuitCh) }, UnwindFunc: func(u *UnwindState, s *StageState, tx ethdb.RwTx) error { return nil }, } @@ -123,13 +103,7 @@ func MiningStages( ID: stages.MiningExecution, Description: "Mining: construct new block from tx pool", ExecFunc: func(s *StageState, u Unwinder, tx ethdb.RwTx) error { - return SpawnMiningExecStage(s, tx, - execCfg, - world.mining.Block, - world.mining.Block.LocalTxs, - world.mining.Block.RemoteTxs, - world.mining.noempty, - world.QuitCh) + return SpawnMiningExecStage(s, tx, execCfg, world.QuitCh) }, UnwindFunc: func(u *UnwindState, s *StageState, tx ethdb.RwTx) error { return nil }, } @@ -159,7 +133,7 @@ func MiningStages( if err != nil { return err } - world.mining.Block.Header.Root = stateRoot + createBlockCfg.miner.MiningBlock.Header.Root = stateRoot return nil }, UnwindFunc: func(u *UnwindState, s *StageState, tx ethdb.RwTx) error { return nil }, @@ -173,7 +147,7 @@ func MiningStages( ID: stages.MiningFinish, Description: "Mining: create and propagate valid block", ExecFunc: func(s *StageState, u Unwinder, tx ethdb.RwTx) error { - return SpawnMiningFinishStage(s, tx, world.mining.Block, finish, world.QuitCh) + return SpawnMiningFinishStage(s, tx, finish, world.QuitCh) }, UnwindFunc: func(u *UnwindState, s *StageState, tx ethdb.RwTx) error { return nil }, } diff --git a/eth/stagedsync/stagedsync.go b/eth/stagedsync/stagedsync.go index 1e5ee11d1..4cf260af6 100644 --- a/eth/stagedsync/stagedsync.go +++ b/eth/stagedsync/stagedsync.go @@ -46,13 +46,11 @@ func (stagedSync *StagedSync) Prepare( tx ethdb.Tx, quitCh <-chan struct{}, initialCycle bool, - miningConfig *MiningCfg, ) (*State, error) { stages := stagedSync.stageBuilders.Build( StageParameters{ QuitCh: quitCh, InitialCycle: initialCycle, - mining: miningConfig, snapshotsDir: stagedSync.params.SnapshotDir, }, ) diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index ccc337ae7..4a440c1df 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -304,16 +304,17 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey miningConfig.Etherbase = mock.Address miningConfig.SigKey = mock.Key - mock.PendingBlocks = make(chan *types.Block, 1) - mock.MinedBlocks = make(chan *types.Block, 1) + miner := stagedsync.NewMiningState(&miningConfig) + mock.PendingBlocks = miner.PendingResultCh + mock.MinedBlocks = miner.MiningResultCh mock.MiningSync = stagedsync.New( stagedsync.MiningStages( - stagedsync.StageMiningCreateBlockCfg(mock.DB, miningConfig, *mock.ChainConfig, mock.Engine, txPool, mock.tmpdir), - stagedsync.StageMiningExecCfg(mock.DB, miningConfig, nil, *mock.ChainConfig, mock.Engine, &vm.Config{}, mock.tmpdir), + stagedsync.StageMiningCreateBlockCfg(mock.DB, miner, *mock.ChainConfig, mock.Engine, txPool, mock.tmpdir), + stagedsync.StageMiningExecCfg(mock.DB, miner, nil, *mock.ChainConfig, mock.Engine, &vm.Config{}, mock.tmpdir), stagedsync.StageHashStateCfg(mock.DB, mock.tmpdir), stagedsync.StageTrieCfg(mock.DB, false, true, mock.tmpdir), - stagedsync.StageMiningFinishCfg(mock.DB, *mock.ChainConfig, mock.Engine, mock.PendingBlocks, mock.MinedBlocks, mock.Ctx.Done()), + stagedsync.StageMiningFinishCfg(mock.DB, *mock.ChainConfig, mock.Engine, miner, mock.Ctx.Done()), ), stagedsync.MiningUnwindOrder(), stagedsync.OptionalParameters{}, diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 44492ed38..c4f5363cc 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -152,7 +152,7 @@ func StageLoopStep( if notifications != nil && notifications.Accumulator != nil { notifications.Accumulator.Reset() } - st, err1 := sync.Prepare(db, nil, ctx.Done(), initialCycle, nil) + st, err1 := sync.Prepare(db, nil, ctx.Done(), initialCycle) if err1 != nil { return fmt.Errorf("prepare staged sync: %w", err1) } @@ -235,7 +235,6 @@ func MiningStep(ctx context.Context, kv ethdb.RwKV, mining *stagedsync.StagedSyn tx, ctx.Done(), false, - stagedsync.StageMiningCfg(true), ) if err != nil { return err