From f6ca07e215c5a3daf1e8c5c7d57788f374b697b0 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sat, 10 Sep 2022 10:37:56 +0700 Subject: [PATCH] integration --workers flag (#5326) --- cmd/integration/commands/flags.go | 5 +++++ cmd/integration/commands/stages.go | 3 ++- cmd/integration/commands/state_stages.go | 7 ++++--- cmd/state/commands/erigon22.go | 3 +-- turbo/stages/stageloop.go | 2 +- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/cmd/integration/commands/flags.go b/cmd/integration/commands/flags.go index 183ace2a2..b42d445dd 100644 --- a/cmd/integration/commands/flags.go +++ b/cmd/integration/commands/flags.go @@ -31,6 +31,7 @@ var ( chain string // Which chain to use (mainnet, ropsten, rinkeby, goerli, etc.) _forceSetHistoryV2 bool + workers uint64 ) func must(err error) { @@ -133,3 +134,7 @@ func withChain(cmd *cobra.Command) { func withHeimdall(cmd *cobra.Command) { cmd.Flags().StringVar(&HeimdallURL, "bor.heimdall", "http://localhost:1317", "URL of Heimdall service") } + +func withWorkers(cmd *cobra.Command) { + cmd.Flags().Uint64Var(&workers, "workers", 1, "") +} diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 171d99379..48d0a8a58 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -342,6 +342,7 @@ func init() { withTxTrace(cmdStageExec) withChain(cmdStageExec) withHeimdall(cmdStageExec) + withWorkers(cmdStageExec) rootCmd.AddCommand(cmdStageExec) @@ -681,7 +682,7 @@ func stageExec(db kv.RwDB, ctx context.Context) error { genesis := core.DefaultGenesisBlockByChainName(chain) cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, nil, chainConfig, engine, vmConfig, nil, /*stateStream=*/ false, - /*badBlockHalt=*/ false, historyV2, dirs, getBlockReader(db), nil, genesis, 1, txNums, agg()) + /*badBlockHalt=*/ false, historyV2, dirs, getBlockReader(db), nil, genesis, int(workers), txNums, agg()) if unwind > 0 { u := sync.NewUnwindState(stages.Execution, s.BlockNumber-unwind, s.BlockNumber) err := stagedsync.UnwindExecutionStage(u, s, nil, ctx, cfg, true) diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index 1cebc1cfd..ffe4a7de9 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -123,6 +123,7 @@ func init() { withMining(stateStags) withChain(stateStags) withHeimdall(stateStags) + withWorkers(stateStags) rootCmd.AddCommand(stateStags) @@ -139,6 +140,7 @@ func init() { withUnwind(loopExecCmd) withChain(loopExecCmd) withHeimdall(loopExecCmd) + withWorkers(loopExecCmd) rootCmd.AddCommand(loopExecCmd) } @@ -184,8 +186,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context. stateStages.DisableStages(stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders) genesis := core.DefaultGenesisBlockByChainName(chain) - workers := 2 - execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, changeSetHook, chainConfig, engine, vmConfig, nil, false, false, historyV2, dirs, getBlockReader(db), nil, genesis, workers, txNums, agg()) + execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, changeSetHook, chainConfig, engine, vmConfig, nil, false, false, historyV2, dirs, getBlockReader(db), nil, genesis, int(workers), txNums, agg()) execUntilFunc := func(execToBlock uint64) func(firstCycle bool, badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error { return func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error { @@ -503,7 +504,7 @@ func loopExec(db kv.RwDB, ctx context.Context, unwind uint64) error { genesis := core.DefaultGenesisBlockByChainName(chain) cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, nil, chainConfig, engine, vmConfig, nil, /*stateStream=*/ false, - /*badBlockHalt=*/ false, historyV2, dirs, getBlockReader(db), nil, genesis, 1, txNums, agg()) + /*badBlockHalt=*/ false, historyV2, dirs, getBlockReader(db), nil, genesis, int(workers), txNums, agg()) // set block limit of execute stage sync.MockExecFunc(stages.Execution, func(firstCycle bool, badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error { diff --git a/cmd/state/commands/erigon22.go b/cmd/state/commands/erigon22.go index 44703e35d..16b03a24e 100644 --- a/cmd/state/commands/erigon22.go +++ b/cmd/state/commands/erigon22.go @@ -132,10 +132,9 @@ func Erigon22(execCtx context.Context, genesis *core.Genesis, logger log.Logger) block = execStage.BlockNumber + 1 } - workerCount := workers execCfg := stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, nil, chainConfig, engine, &vm.Config{}, nil, /*stateStream=*/ false, - /*badBlockHalt=*/ false, cfg.HistoryV2, dirs, blockReader, nil, genesis, workerCount, txNums, agg) + /*badBlockHalt=*/ false, cfg.HistoryV2, dirs, blockReader, nil, genesis, int(workers), txNums, agg) maxBlockNum := allSnapshots.BlocksAvailable() + 1 if err := stagedsync.SpawnExecuteBlocksStage(execStage, stagedSync, nil, maxBlockNum, ctx, execCfg, true); err != nil { return err diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 874b728e8..5b3537b42 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -507,7 +507,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config blockReader, controlServer.Hd, cfg.Genesis, - 1, + cfg.Sync.ExecWorkerCount, txNums, agg, ),