integration --workers flag (#5326)

This commit is contained in:
Alex Sharov 2022-09-10 10:37:56 +07:00 committed by GitHub
parent 6d1d3a9f47
commit f6ca07e215
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 13 additions and 7 deletions

View File

@ -31,6 +31,7 @@ var (
chain string // Which chain to use (mainnet, ropsten, rinkeby, goerli, etc.) chain string // Which chain to use (mainnet, ropsten, rinkeby, goerli, etc.)
_forceSetHistoryV2 bool _forceSetHistoryV2 bool
workers uint64
) )
func must(err error) { func must(err error) {
@ -133,3 +134,7 @@ func withChain(cmd *cobra.Command) {
func withHeimdall(cmd *cobra.Command) { func withHeimdall(cmd *cobra.Command) {
cmd.Flags().StringVar(&HeimdallURL, "bor.heimdall", "http://localhost:1317", "URL of Heimdall service") cmd.Flags().StringVar(&HeimdallURL, "bor.heimdall", "http://localhost:1317", "URL of Heimdall service")
} }
func withWorkers(cmd *cobra.Command) {
cmd.Flags().Uint64Var(&workers, "workers", 1, "")
}

View File

@ -342,6 +342,7 @@ func init() {
withTxTrace(cmdStageExec) withTxTrace(cmdStageExec)
withChain(cmdStageExec) withChain(cmdStageExec)
withHeimdall(cmdStageExec) withHeimdall(cmdStageExec)
withWorkers(cmdStageExec)
rootCmd.AddCommand(cmdStageExec) rootCmd.AddCommand(cmdStageExec)
@ -681,7 +682,7 @@ func stageExec(db kv.RwDB, ctx context.Context) error {
genesis := core.DefaultGenesisBlockByChainName(chain) genesis := core.DefaultGenesisBlockByChainName(chain)
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, nil, chainConfig, engine, vmConfig, nil, cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, nil, chainConfig, engine, vmConfig, nil,
/*stateStream=*/ false, /*stateStream=*/ false,
/*badBlockHalt=*/ false, historyV2, dirs, getBlockReader(db), nil, genesis, 1, txNums, agg()) /*badBlockHalt=*/ false, historyV2, dirs, getBlockReader(db), nil, genesis, int(workers), txNums, agg())
if unwind > 0 { if unwind > 0 {
u := sync.NewUnwindState(stages.Execution, s.BlockNumber-unwind, s.BlockNumber) u := sync.NewUnwindState(stages.Execution, s.BlockNumber-unwind, s.BlockNumber)
err := stagedsync.UnwindExecutionStage(u, s, nil, ctx, cfg, true) err := stagedsync.UnwindExecutionStage(u, s, nil, ctx, cfg, true)

View File

@ -123,6 +123,7 @@ func init() {
withMining(stateStags) withMining(stateStags)
withChain(stateStags) withChain(stateStags)
withHeimdall(stateStags) withHeimdall(stateStags)
withWorkers(stateStags)
rootCmd.AddCommand(stateStags) rootCmd.AddCommand(stateStags)
@ -139,6 +140,7 @@ func init() {
withUnwind(loopExecCmd) withUnwind(loopExecCmd)
withChain(loopExecCmd) withChain(loopExecCmd)
withHeimdall(loopExecCmd) withHeimdall(loopExecCmd)
withWorkers(loopExecCmd)
rootCmd.AddCommand(loopExecCmd) rootCmd.AddCommand(loopExecCmd)
} }
@ -184,8 +186,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.
stateStages.DisableStages(stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders) stateStages.DisableStages(stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders)
genesis := core.DefaultGenesisBlockByChainName(chain) genesis := core.DefaultGenesisBlockByChainName(chain)
workers := 2 execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, changeSetHook, chainConfig, engine, vmConfig, nil, false, false, historyV2, dirs, getBlockReader(db), nil, genesis, int(workers), txNums, agg())
execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, changeSetHook, chainConfig, engine, vmConfig, nil, false, false, historyV2, dirs, getBlockReader(db), nil, genesis, workers, txNums, agg())
execUntilFunc := func(execToBlock uint64) func(firstCycle bool, badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error { execUntilFunc := func(execToBlock uint64) func(firstCycle bool, badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error {
return func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error { return func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error {
@ -503,7 +504,7 @@ func loopExec(db kv.RwDB, ctx context.Context, unwind uint64) error {
genesis := core.DefaultGenesisBlockByChainName(chain) genesis := core.DefaultGenesisBlockByChainName(chain)
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, nil, chainConfig, engine, vmConfig, nil, cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, nil, chainConfig, engine, vmConfig, nil,
/*stateStream=*/ false, /*stateStream=*/ false,
/*badBlockHalt=*/ false, historyV2, dirs, getBlockReader(db), nil, genesis, 1, txNums, agg()) /*badBlockHalt=*/ false, historyV2, dirs, getBlockReader(db), nil, genesis, int(workers), txNums, agg())
// set block limit of execute stage // set block limit of execute stage
sync.MockExecFunc(stages.Execution, func(firstCycle bool, badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error { sync.MockExecFunc(stages.Execution, func(firstCycle bool, badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error {

View File

@ -132,10 +132,9 @@ func Erigon22(execCtx context.Context, genesis *core.Genesis, logger log.Logger)
block = execStage.BlockNumber + 1 block = execStage.BlockNumber + 1
} }
workerCount := workers
execCfg := stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, nil, chainConfig, engine, &vm.Config{}, nil, execCfg := stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, nil, chainConfig, engine, &vm.Config{}, nil,
/*stateStream=*/ false, /*stateStream=*/ false,
/*badBlockHalt=*/ false, cfg.HistoryV2, dirs, blockReader, nil, genesis, workerCount, txNums, agg) /*badBlockHalt=*/ false, cfg.HistoryV2, dirs, blockReader, nil, genesis, int(workers), txNums, agg)
maxBlockNum := allSnapshots.BlocksAvailable() + 1 maxBlockNum := allSnapshots.BlocksAvailable() + 1
if err := stagedsync.SpawnExecuteBlocksStage(execStage, stagedSync, nil, maxBlockNum, ctx, execCfg, true); err != nil { if err := stagedsync.SpawnExecuteBlocksStage(execStage, stagedSync, nil, maxBlockNum, ctx, execCfg, true); err != nil {
return err return err

View File

@ -507,7 +507,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config
blockReader, blockReader,
controlServer.Hd, controlServer.Hd,
cfg.Genesis, cfg.Genesis,
1, cfg.Sync.ExecWorkerCount,
txNums, txNums,
agg, agg,
), ),