From 0533eed812967aff7a218b6c01df818402967d3b Mon Sep 17 00:00:00 2001 From: Igor Mandrigin Date: Wed, 3 Jun 2020 17:25:44 +0300 Subject: [PATCH] Refactor Stagedsync part 1 (#610) --- cmd/hack/hack.go | 3 +- eth/stagedsync/stage.go | 13 +++ eth/stagedsync/stage_bodies.go | 13 ++- eth/stagedsync/stage_execute.go | 35 +++--- eth/stagedsync/stage_hashcheck.go | 11 +- eth/stagedsync/stage_headers.go | 57 +++++++++ eth/stagedsync/stage_indexes.go | 38 ++---- eth/stagedsync/stage_senders.go | 15 +-- eth/stagedsync/stagedsync.go | 185 +++++++++++++----------------- eth/stagedsync/state.go | 70 +++++++++++ 10 files changed, 267 insertions(+), 173 deletions(-) create mode 100644 eth/stagedsync/stage.go create mode 100644 eth/stagedsync/stage_headers.go create mode 100644 eth/stagedsync/state.go diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go index ea50bd5b6..da9901dc5 100644 --- a/cmd/hack/hack.go +++ b/cmd/hack/hack.go @@ -2503,7 +2503,8 @@ func testStage5(chaindata string) error { log.Info("Stage4", "progress", stage4progress) core.UsePlainStateExecution = true ch := make(chan struct{}) - if err = stagedsync.SpawnCheckFinalHashStage(db, stage4progress, "", ch); err != nil { + stageState := &stagedsync.StageState{Stage: stages.HashCheck} + if err = stagedsync.SpawnCheckFinalHashStage(stageState, db, "", ch); err != nil { return err } close(ch) diff --git a/eth/stagedsync/stage.go b/eth/stagedsync/stage.go new file mode 100644 index 000000000..3bfe14fb2 --- /dev/null +++ b/eth/stagedsync/stage.go @@ -0,0 +1,13 @@ +package stagedsync + +import "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" + +type ExecFunc func(*StageState) error + +type Stage struct { + ID stages.SyncStage + Description string + ExecFunc ExecFunc + Disabled bool + DisabledDescription string +} diff --git a/eth/stagedsync/stage_bodies.go b/eth/stagedsync/stage_bodies.go index 8e3cbcc08..5a5c2e36a 100644 --- a/eth/stagedsync/stage_bodies.go +++ b/eth/stagedsync/stage_bodies.go @@ -7,13 +7,16 @@ import ( "github.com/ledgerwatch/turbo-geth/ethdb" ) -func spawnBodyDownloadStage(db ethdb.Getter, d DownloaderGlue, pid string) (bool, error) { - // Figure out how many blocks have already been downloaded - origin, err := stages.GetStageProgress(db, stages.Bodies) +func spawnBodyDownloadStage(s *StageState, d DownloaderGlue, pid string) error { + cont, err := d.SpawnBodyDownloadStage(pid, s.BlockNumber) if err != nil { - return false, fmt.Errorf("getting Bodies stage progress: %w", err) + return err } - return d.SpawnBodyDownloadStage(pid, origin) + if !cont { + s.Done() + } + return nil + } func unwindBodyDownloadStage(db ethdb.Database, unwindPoint uint64) error { diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 1a9f0a72f..adf3ba4dc 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -73,11 +73,8 @@ func (l *progressLogger) Stop() { const StateBatchSize = 50 * 1024 * 1024 // 50 Mb const ChangeBatchSize = 1024 * 2014 // 1 Mb -func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit chan struct{}) (uint64, error) { - lastProcessedBlockNumber, err := stages.GetStageProgress(stateDB, stages.Execution) - if err != nil { - return 0, err - } +func spawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, blockchain BlockChain, quit chan struct{}) error { + lastProcessedBlockNumber := s.BlockNumber nextBlockNumber := uint64(0) @@ -110,8 +107,8 @@ func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit engine := blockchain.Engine() vmConfig := blockchain.GetVMConfig() for { - if err = common.Stopped(quit); err != nil { - return 0, err + if err := common.Stopped(quit); err != nil { + return err } blockNum := atomic.LoadUint64(&nextBlockNumber) @@ -154,13 +151,13 @@ func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit stateWriter.SetCodeSizeCache(codeSizeCache) // where the magic happens - err = core.ExecuteBlockEuphemerally(chainConfig, vmConfig, blockchain, engine, block, stateReader, stateWriter) + err := core.ExecuteBlockEuphemerally(chainConfig, vmConfig, blockchain, engine, block, stateReader, stateWriter) if err != nil { - return 0, err + return err } - if err = stages.SaveStageProgress(stateBatch, stages.Execution, blockNum); err != nil { - return 0, err + if err = s.Update(stateBatch, blockNum); err != nil { + return err } atomic.AddUint64(&nextBlockNumber, 1) @@ -168,13 +165,13 @@ func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit if stateBatch.BatchSize() >= StateBatchSize { start := time.Now() if _, err = stateBatch.Commit(); err != nil { - return 0, err + return err } log.Info("State batch committed", "in", time.Since(start)) } if changeBatch.BatchSize() >= ChangeBatchSize { if _, err = changeBatch.Commit(); err != nil { - return 0, err + return err } } /* @@ -185,18 +182,16 @@ func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit */ } - // the last processed block - syncHeadNumber := atomic.LoadUint64(&nextBlockNumber) - 1 - - _, err = stateBatch.Commit() + _, err := stateBatch.Commit() if err != nil { - return syncHeadNumber, fmt.Errorf("sync Execute: failed to write state batch commit: %v", err) + return fmt.Errorf("sync Execute: failed to write state batch commit: %v", err) } _, err = changeBatch.Commit() if err != nil { - return syncHeadNumber, fmt.Errorf("sync Execute: failed to write change batch commit: %v", err) + return fmt.Errorf("sync Execute: failed to write change batch commit: %v", err) } - return syncHeadNumber, nil + s.Done() + return nil } func unwindExecutionStage(unwindPoint uint64, stateDB ethdb.Database) error { diff --git a/eth/stagedsync/stage_hashcheck.go b/eth/stagedsync/stage_hashcheck.go index 7c53be8ba..3e534408a 100644 --- a/eth/stagedsync/stage_hashcheck.go +++ b/eth/stagedsync/stage_hashcheck.go @@ -19,8 +19,10 @@ import ( var cbor codec.CborHandle -func SpawnCheckFinalHashStage(stateDB ethdb.Database, syncHeadNumber uint64, datadir string, quit chan struct{}) error { - hashProgress, err := stages.GetStageProgress(stateDB, stages.HashCheck) +func SpawnCheckFinalHashStage(s *StageState, stateDB ethdb.Database, datadir string, quit chan struct{}) error { + hashProgress := s.BlockNumber + + syncHeadNumber, err := s.ExecutionAt(stateDB) if err != nil { return err } @@ -28,12 +30,13 @@ func SpawnCheckFinalHashStage(stateDB ethdb.Database, syncHeadNumber uint64, dat if hashProgress == syncHeadNumber { // we already did hash check for this block // we don't do the obvious `if hashProgress > syncHeadNumber` to support reorgs more naturally + s.Done() return nil } if core.UsePlainStateExecution { log.Info("Promoting plain state", "from", hashProgress, "to", syncHeadNumber) - err = promoteHashedState(stateDB, hashProgress, datadir, quit) + err := promoteHashedState(stateDB, hashProgress, datadir, quit) if err != nil { return err } @@ -58,7 +61,7 @@ func SpawnCheckFinalHashStage(stateDB ethdb.Database, syncHeadNumber uint64, dat return fmt.Errorf("wrong trie root: %x, expected (from header): %x", subTries.Hashes[0], syncHeadBlock.Root()) } - return stages.SaveStageProgress(stateDB, stages.HashCheck, blockNr) + return s.DoneAndUpdate(stateDB, blockNr) } func unwindHashCheckStage(unwindPoint uint64, stateDB ethdb.Database) error { diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go new file mode 100644 index 000000000..deee2dfd3 --- /dev/null +++ b/eth/stagedsync/stage_headers.go @@ -0,0 +1,57 @@ +package stagedsync + +import ( + "fmt" + + "github.com/ledgerwatch/turbo-geth/core" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/ledgerwatch/turbo-geth/log" +) + +func DownloadHeaders(s *StageState, d DownloaderGlue, stateDB ethdb.Database, headersFetchers []func() error, quitCh chan struct{}) error { + if false { + err := d.SpawnSync(headersFetchers) + if err != nil { + return err + } + + log.Info("Checking for unwinding...") + // Check unwinds backwards and if they are outstanding, invoke corresponding functions + for stage := stages.Finish - 1; stage > stages.Headers; stage-- { + unwindPoint, err := stages.GetStageUnwind(stateDB, stage) + if err != nil { + return err + } + + if unwindPoint == 0 { + continue + } + + switch stage { + case stages.Bodies: + err = unwindBodyDownloadStage(stateDB, unwindPoint) + case stages.Senders: + err = unwindSendersStage(stateDB, unwindPoint) + case stages.Execution: + err = unwindExecutionStage(unwindPoint, stateDB) + case stages.HashCheck: + err = unwindHashCheckStage(unwindPoint, stateDB) + case stages.AccountHistoryIndex: + err = unwindAccountHistoryIndex(unwindPoint, stateDB, core.UsePlainStateExecution, quitCh) + case stages.StorageHistoryIndex: + err = unwindStorageHistoryIndex(unwindPoint, stateDB, core.UsePlainStateExecution, quitCh) + default: + return fmt.Errorf("unrecognized stage for unwinding: %d", stage) + } + + if err != nil { + return fmt.Errorf("error unwinding stage: %d: %w", stage, err) + } + } + log.Info("Checking for unwinding... Complete!") + } + + s.Done() + return nil +} diff --git a/eth/stagedsync/stage_indexes.go b/eth/stagedsync/stage_indexes.go index bfb532851..50846cb80 100644 --- a/eth/stagedsync/stage_indexes.go +++ b/eth/stagedsync/stage_indexes.go @@ -1,24 +1,19 @@ package stagedsync import ( - "fmt" - "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/core" - "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" ) -func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool, quitCh chan struct{}) error { +func spawnAccountHistoryIndex(s *StageState, db ethdb.Database, datadir string, plainState bool, quitCh chan struct{}) error { var blockNum uint64 - if lastProcessedBlockNumber, err := stages.GetStageProgress(db, stages.AccountHistoryIndex); err == nil { - if lastProcessedBlockNumber > 0 { - blockNum = lastProcessedBlockNumber + 1 - } - } else { - return fmt.Errorf("reading account history process: %v", err) + lastProcessedBlockNumber := s.BlockNumber + if lastProcessedBlockNumber > 0 { + blockNum = lastProcessedBlockNumber + 1 } + log.Info("Account history index generation started", "from", blockNum) ig := core.NewIndexGenerator(db, quitCh) @@ -33,21 +28,14 @@ func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool return err } - if err := stages.SaveStageProgress(db, stages.AccountHistoryIndex, blockNum); err != nil { - return err - } - return nil - + return s.DoneAndUpdate(db, blockNum) } -func spawnStorageHistoryIndex(db ethdb.Database, datadir string, plainState bool, quitCh chan struct{}) error { +func spawnStorageHistoryIndex(s *StageState, db ethdb.Database, datadir string, plainState bool, quitCh chan struct{}) error { var blockNum uint64 - if lastProcessedBlockNumber, err := stages.GetStageProgress(db, stages.StorageHistoryIndex); err == nil { - if lastProcessedBlockNumber > 0 { - blockNum = lastProcessedBlockNumber + 1 - } - } else { - return fmt.Errorf("reading storage history process: %v", err) + lastProcessedBlockNumber := s.BlockNumber + if lastProcessedBlockNumber > 0 { + blockNum = lastProcessedBlockNumber + 1 } ig := core.NewIndexGenerator(db, quitCh) ig.TempDir = datadir @@ -61,11 +49,7 @@ func spawnStorageHistoryIndex(db ethdb.Database, datadir string, plainState bool return err } - if err = stages.SaveStageProgress(db, stages.StorageHistoryIndex, blockNum); err != nil { - return err - } - - return nil + return s.DoneAndUpdate(db, blockNum) } func unwindAccountHistoryIndex(unwindPoint uint64, db ethdb.Database, plainState bool, quitCh chan struct{}) error { diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index 3907eb7c1..f6f9b19dc 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -34,12 +34,8 @@ func init() { } } -func spawnRecoverSendersStage(stateDB ethdb.Database, config *params.ChainConfig, quitCh chan struct{}) error { - lastProcessedBlockNumber, err := stages.GetStageProgress(stateDB, stages.Senders) - if err != nil { - return err - } - +func spawnRecoverSendersStage(s *StageState, stateDB ethdb.Database, config *params.ChainConfig, quitCh chan struct{}) error { + lastProcessedBlockNumber := s.BlockNumber nextBlockNumber := lastProcessedBlockNumber + 1 mutation := stateDB.NewBatch() @@ -71,7 +67,7 @@ func spawnRecoverSendersStage(stateDB ethdb.Database, config *params.ChainConfig needExit := false for !needExit { - if err = common.Stopped(quitCh); err != nil { + if err := common.Stopped(quitCh); err != nil { return err } @@ -104,19 +100,20 @@ func spawnRecoverSendersStage(stateDB ethdb.Database, config *params.ChainConfig rawdb.WriteBody(context.Background(), mutation, j.hash, j.nextBlockNumber, j.blockBody) } - if err = stages.SaveStageProgress(mutation, stages.Senders, nextBlockNumber); err != nil { + if err := s.Update(mutation, nextBlockNumber); err != nil { return err } log.Info("Recovered for blocks:", "blockNumber", nextBlockNumber) if mutation.BatchSize() >= mutation.IdealBatchSize() { - if _, err = mutation.Commit(); err != nil { + if _, err := mutation.Commit(); err != nil { return err } mutation = stateDB.NewBatch() } } + s.Done() return nil } diff --git a/eth/stagedsync/stagedsync.go b/eth/stagedsync/stagedsync.go index ae636a18e..77495e496 100644 --- a/eth/stagedsync/stagedsync.go +++ b/eth/stagedsync/stagedsync.go @@ -19,128 +19,99 @@ func DoStagedSyncWithFetchers( quitCh chan struct{}, headersFetchers []func() error, ) error { - var err error defer log.Info("Staged sync finished") - /* - * Stage 1. Download Headers - */ - log.Info("Sync stage 1/7. Downloading headers...") - err = DownloadHeaders(d, stateDB, headersFetchers, quitCh) - if err != nil { - return err + stages := []*Stage{ + { + ID: stages.Headers, + Description: "Downloading headers", + ExecFunc: func(s *StageState) error { + return DownloadHeaders(s, d, stateDB, headersFetchers, quitCh) + }, + }, + { + ID: stages.Bodies, + Description: "Downloading block bodiess", + ExecFunc: func(s *StageState) error { + return spawnBodyDownloadStage(s, d, pid) + }, + }, + { + ID: stages.Senders, + Description: "Recovering senders from tx signatures", + ExecFunc: func(s *StageState) error { + return spawnRecoverSendersStage(s, stateDB, blockchain.Config(), quitCh) + }, + }, + { + ID: stages.Execution, + Description: "Executing blocks w/o hash checks", + ExecFunc: func(s *StageState) error { + return spawnExecuteBlocksStage(s, stateDB, blockchain, quitCh) + }, + }, + { + ID: stages.HashCheck, + Description: "Validating final hash", + ExecFunc: func(s *StageState) error { + return SpawnCheckFinalHashStage(s, stateDB, datadir, quitCh) + }, + }, + { + ID: stages.AccountHistoryIndex, + Description: "Generating account history index", + Disabled: !history, + DisabledDescription: "Enable by adding `h` to --storage-mode", + ExecFunc: func(s *StageState) error { + return spawnAccountHistoryIndex(s, stateDB, datadir, core.UsePlainStateExecution, quitCh) + }, + }, + { + ID: stages.StorageHistoryIndex, + Description: "Generating storage history index", + Disabled: !history, + DisabledDescription: "Enable by adding `h` to --storage-mode", + ExecFunc: func(s *StageState) error { + return spawnStorageHistoryIndex(s, stateDB, datadir, core.UsePlainStateExecution, quitCh) + }, + }, } - /* - * Stage 2. Download Block bodies - */ - log.Info("Sync stage 2/7. Downloading block bodies...") - cont := true + state := NewState(stages) - for cont && err == nil { - cont, err = spawnBodyDownloadStage(stateDB, d, pid) - if err != nil { - return err - } - } + for !state.IsDone() { + index, stage := state.CurrentStage() - log.Info("Sync stage 2/7. Downloading block bodies... Complete!") + if stage.Disabled { + message := fmt.Sprintf( + "Sync stage %d/%d. %v disabled. %s", + index+1, + state.Len(), + stage.Description, + stage.DisabledDescription, + ) - /* - * Stage 3. Recover senders from tx signatures - */ - log.Info("Sync stage 3/7. Recovering senders from tx signatures...") + log.Info(message) - if err = spawnRecoverSendersStage(stateDB, blockchain.Config(), quitCh); err != nil { - return err - } - log.Info("Sync stage 3/7. Recovering senders from tx signatures... Complete!") - - /* - * Stage 4. Execute block bodies w/o calculating trie roots - */ - log.Info("Sync stage 4/7. Executing blocks w/o hash checks...") - syncHeadNumber, err := spawnExecuteBlocksStage(stateDB, blockchain, quitCh) - if err != nil { - return err - } - - log.Info("Sync stage 4/7. Executing blocks w/o hash checks... Complete!") - - // Further stages go there - log.Info("Sync stage 5/7. Validating final hash") - err = SpawnCheckFinalHashStage(stateDB, syncHeadNumber, datadir, quitCh) - if err != nil { - return err - } - - log.Info("Sync stage 5/7. Validating final hash... Complete!") - - if history { - log.Info("Sync stage 6/7. Generating account history index") - err = spawnAccountHistoryIndex(stateDB, datadir, core.UsePlainStateExecution, quitCh) - if err != nil { - return err - } - log.Info("Sync stage 6/7. Generating account history index... Complete!") - } else { - log.Info("Sync stage 6/7, generating account history index is disabled. Enable by adding `h` to --storage-mode") - } - - if history { - log.Info("Sync stage 7/7. Generating storage history index") - err = spawnStorageHistoryIndex(stateDB, datadir, core.UsePlainStateExecution, quitCh) - if err != nil { - return err - } - log.Info("Sync stage 7/7. Generating storage history index... Complete!") - } else { - log.Info("Sync stage 7/7, generating storage history index is disabled. Enable by adding `h` to --storage-mode") - } - - return err -} - -func DownloadHeaders(d DownloaderGlue, stateDB ethdb.Database, headersFetchers []func() error, quitCh chan struct{}) error { - err := d.SpawnSync(headersFetchers) - if err != nil { - return err - } - - log.Info("Sync stage 1/7. Downloading headers... Complete!") - log.Info("Checking for unwinding...") - // Check unwinds backwards and if they are outstanding, invoke corresponding functions - for stage := stages.Finish - 1; stage > stages.Headers; stage-- { - unwindPoint, err := stages.GetStageUnwind(stateDB, stage) - if err != nil { - return err - } - - if unwindPoint == 0 { + state.NextStage() continue } - switch stage { - case stages.Bodies: - err = unwindBodyDownloadStage(stateDB, unwindPoint) - case stages.Senders: - err = unwindSendersStage(stateDB, unwindPoint) - case stages.Execution: - err = unwindExecutionStage(unwindPoint, stateDB) - case stages.HashCheck: - err = unwindHashCheckStage(unwindPoint, stateDB) - case stages.AccountHistoryIndex: - err = unwindAccountHistoryIndex(unwindPoint, stateDB, core.UsePlainStateExecution, quitCh) - case stages.StorageHistoryIndex: - err = unwindStorageHistoryIndex(unwindPoint, stateDB, core.UsePlainStateExecution, quitCh) - default: - return fmt.Errorf("unrecognized stage for unwinding: %d", stage) + stageState, err := state.StageState(stage.ID, stateDB) + if err != nil { + return err } + message := fmt.Sprintf("Sync stage %d/%d. %v...", index+1, state.Len(), stage.Description) + log.Info(message) + + err = stage.ExecFunc(stageState) if err != nil { - return fmt.Errorf("error unwinding stage: %d: %w", stage, err) + return err } + + log.Info(fmt.Sprintf("%s DONE!", message)) } - log.Info("Checking for unwinding... Complete!") + return nil } diff --git a/eth/stagedsync/state.go b/eth/stagedsync/state.go new file mode 100644 index 000000000..bb8820347 --- /dev/null +++ b/eth/stagedsync/state.go @@ -0,0 +1,70 @@ +package stagedsync + +import ( + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" + "github.com/ledgerwatch/turbo-geth/ethdb" +) + +type State struct { + stages []*Stage + currentStage uint +} + +func (s *State) Len() int { + return len(s.stages) +} + +func (s *State) NextStage() { + s.currentStage++ +} + +func (s *State) IsDone() bool { + return s.currentStage >= uint(len(s.stages)) +} + +func (s *State) CurrentStage() (uint, *Stage) { + return s.currentStage, s.stages[s.currentStage] +} + +func NewState(stages []*Stage) *State { + return &State{ + stages: stages, + currentStage: 0, + } +} + +func (s *State) StageState(stage stages.SyncStage, db ethdb.Getter) (*StageState, error) { + blockNum, err := stages.GetStageProgress(db, stage) + if err != nil { + return nil, err + } + return &StageState{s, stage, blockNum}, nil +} + +type StageState struct { + state *State + Stage stages.SyncStage + BlockNumber uint64 +} + +func (s *StageState) Update(db ethdb.Putter, newBlockNum uint64) error { + return stages.SaveStageProgress(db, s.Stage, newBlockNum) +} + +func (s *StageState) Done() { + if s.state != nil { + s.state.NextStage() + } +} + +func (s *StageState) ExecutionAt(db ethdb.Getter) (uint64, error) { + return stages.GetStageProgress(db, stages.Execution) +} + +func (s *StageState) DoneAndUpdate(db ethdb.Putter, newBlockNum uint64) error { + err := stages.SaveStageProgress(db, s.Stage, newBlockNum) + if s.state != nil { + s.state.NextStage() + } + return err +}