From 1e20ed255d548e3bd87541ea6d68b8b920315984 Mon Sep 17 00:00:00 2001 From: Igor Mandrigin Date: Tue, 2 Jun 2020 17:52:50 +0300 Subject: [PATCH] move staged sync to its own package (#605) --- cmd/hack/hack.go | 12 ++-- eth/downloader/downloader.go | 21 ++++-- ...age_bodies.go => downloader_stagedsync.go} | 44 +++---------- ..._test.go => downloader_stagedsync_test.go} | 7 +- .../stagedsync_stage_indexes_test.go | 17 ----- eth/stagedsync/stage_bodies.go | 42 ++++++++++++ .../stage_execute.go} | 13 ++-- .../stage_execute_test.go} | 15 +++-- .../stage_hashcheck.go} | 13 ++-- .../stage_hashcheck_test.go} | 2 +- .../stage_indexes.go} | 12 ++-- .../stage_senders.go} | 34 +++++----- .../stagedsync.go} | 65 +++++++++++-------- .../stages/stages.go} | 2 +- .../testutil.go} | 2 +- eth/stagedsync/types.go | 20 ++++++ 16 files changed, 182 insertions(+), 139 deletions(-) rename eth/downloader/{stagedsync_stage_bodies.go => downloader_stagedsync.go} (71%) rename eth/downloader/{stagedsync_test.go => downloader_stagedsync_test.go} (99%) delete mode 100644 eth/downloader/stagedsync_stage_indexes_test.go create mode 100644 eth/stagedsync/stage_bodies.go rename eth/{downloader/stagedsync_stage_execute.go => stagedsync/stage_execute.go} (95%) rename eth/{downloader/stagedsync_stage_execute_test.go => stagedsync/stage_execute_test.go} (89%) rename eth/{downloader/stagedsync_stage_hashcheck.go => stagedsync/stage_hashcheck.go} (92%) rename eth/{downloader/stagedsync_stage_hashcheck_test.go => stagedsync/stage_hashcheck_test.go} (99%) rename eth/{downloader/stagedsync_stage_indexes.go => stagedsync/stage_indexes.go} (81%) rename eth/{downloader/stagedsync_stage_senders.go => stagedsync/stage_senders.go} (80%) rename eth/{downloader/stagedsync_downloader.go => stagedsync/stagedsync.go} (57%) rename eth/{downloader/stagedsync_stages.go => stagedsync/stages/stages.go} (99%) rename eth/{downloader/stagedsync_testutil.go => stagedsync/testutil.go} (99%) create mode 100644 eth/stagedsync/types.go diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go index db03bd0c9..33c69821a 100644 --- a/cmd/hack/hack.go +++ b/cmd/hack/hack.go @@ -37,8 +37,8 @@ import ( "github.com/ledgerwatch/turbo-geth/core/types/accounts" "github.com/ledgerwatch/turbo-geth/core/vm" "github.com/ledgerwatch/turbo-geth/crypto" - "github.com/ledgerwatch/turbo-geth/eth/downloader" "github.com/ledgerwatch/turbo-geth/eth/mgr" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/node" @@ -2166,7 +2166,7 @@ func resetState(chaindata string) { core.UsePlainStateExecution = true _, _, err = core.DefaultGenesisBlock().CommitGenesisState(db, false) check(err) - err = downloader.SaveStageProgress(db, downloader.Execution, 0) + err = stages.SaveStageProgress(db, stages.Execution, 0) check(err) fmt.Printf("Reset state done\n") } @@ -2193,7 +2193,7 @@ func resetHashedState(chaindata string) { return nil }) check(err) - err = downloader.SaveStageProgress(db, downloader.HashCheck, 0) + err = stages.SaveStageProgress(db, stages.HashCheck, 0) check(err) fmt.Printf("Reset hashed state done\n") } @@ -2206,11 +2206,11 @@ func resetHistoryIndex(chaindata string) { db.DeleteBucket(dbutils.AccountsHistoryBucket) //nolint:errcheck db.DeleteBucket(dbutils.StorageHistoryBucket) - err = downloader.SaveStageProgress(db, downloader.AccountHistoryIndex, 0) + err = stages.SaveStageProgress(db, stages.AccountHistoryIndex, 0) check(err) - err = downloader.SaveStageProgress(db, downloader.StorageHistoryIndex, 0) + err = stages.SaveStageProgress(db, stages.StorageHistoryIndex, 0) check(err) - err = downloader.SaveStageProgress(db, downloader.HashCheck, 0) + err = stages.SaveStageProgress(db, stages.HashCheck, 0) check(err) fmt.Printf("Reset history index done\n") } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index c4a9b01a1..cc668e827 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -32,6 +32,8 @@ import ( "github.com/ledgerwatch/turbo-geth/core/rawdb" "github.com/ledgerwatch/turbo-geth/core/types" "github.com/ledgerwatch/turbo-geth/core/vm" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/event" "github.com/ledgerwatch/turbo-geth/log" @@ -543,7 +545,16 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I // Turbo-Geth's staged sync goes here if d.mode == StagedSync { - return d.doStagedSyncWithFetchers(p, fetchers) + return stagedsync.DoStagedSyncWithFetchers( + d, + d.blockchain, + d.stateDB, + p.id, + d.history, + d.datadir, + d.quitCh, + fetchers, + ) } fetchers = append(fetchers, func() error { return d.fetchBodies(origin + 1) }) // Bodies are retrieved during normal and fast sync @@ -735,7 +746,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) case FastSync: localHeight = d.blockchain.CurrentFastBlock().NumberU64() case StagedSync: - localHeight, err = GetStageProgress(d.stateDB, Headers) + localHeight, err = stages.GetStageProgress(d.stateDB, stages.Headers) if err != nil { return 0, err } @@ -1509,7 +1520,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er n, newCanonical, lowestCanonicalNumber, err = d.blockchain.InsertHeaderChainStaged(chunk, frequency) if newCanonical { // Need to unwind further stages - if err1 := UnwindAllStages(d.stateDB, lowestCanonicalNumber); err1 != nil { + if err1 := stages.UnwindAllStages(d.stateDB, lowestCanonicalNumber); err1 != nil { return fmt.Errorf("unwinding all stages to %d: %v", lowestCanonicalNumber, err1) } } @@ -1517,7 +1528,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er n, err = d.lightchain.InsertHeaderChain(chunk, frequency) } if d.mode == StagedSync && n > 0 { - if err1 := SaveStageProgress(d.stateDB, Headers, chunk[n-1].Number.Uint64()); err1 != nil { + if err1 := stages.SaveStageProgress(d.stateDB, stages.Headers, chunk[n-1].Number.Uint64()); err1 != nil { return fmt.Errorf("saving SyncStage Headers progress: %v", err1) } } @@ -1626,7 +1637,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult, execute bool) (u return 0, errInvalidChain } if d.mode == StagedSync && index > 0 { - if err1 := SaveStageProgress(d.stateDB, Bodies, blocks[index-1].NumberU64()); err1 != nil { + if err1 := stages.SaveStageProgress(d.stateDB, stages.Bodies, blocks[index-1].NumberU64()); err1 != nil { return 0, fmt.Errorf("saving SyncStage Bodies progress: %v", err1) } return blocks[index-1].NumberU64() + 1, nil diff --git a/eth/downloader/stagedsync_stage_bodies.go b/eth/downloader/downloader_stagedsync.go similarity index 71% rename from eth/downloader/stagedsync_stage_bodies.go rename to eth/downloader/downloader_stagedsync.go index 5e41d4d51..132d5b55b 100644 --- a/eth/downloader/stagedsync_stage_bodies.go +++ b/eth/downloader/downloader_stagedsync.go @@ -8,11 +8,13 @@ import ( "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/core/types" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/rlp" ) -func (d *Downloader) spawnBodyDownloadStage(id string) (bool, error) { +// externsions for downloader needed for staged sync +func (d *Downloader) SpawnBodyDownloadStage(id string, origin uint64) (bool, error) { // Create cancel channel for aborting mid-flight and mark the master peer d.cancelLock.Lock() d.cancelCh = make(chan struct{}) @@ -20,11 +22,6 @@ func (d *Downloader) spawnBodyDownloadStage(id string) (bool, error) { d.cancelLock.Unlock() defer d.Cancel() // No matter what, we can't leave the cancel channel open - // Figure out how many blocks have already been downloaded - origin, err := GetStageProgress(d.stateDB, Bodies) - if err != nil { - return false, fmt.Errorf("getting Bodies stage progress: %w", err) - } // Figure out how many headers we have currentNumber := origin + 1 var missingHeader uint64 @@ -33,8 +30,8 @@ func (d *Downloader) spawnBodyDownloadStage(id string) (bool, error) { var hashes [N]common.Hash // Canonical hashes of the blocks var headers = make(map[common.Hash]*types.Header) // We use map because there might be more than one header by block number var hashCount = 0 - err = d.stateDB.Walk(dbutils.HeaderPrefix, dbutils.EncodeBlockNumber(currentNumber), 0, func(k, v []byte) (bool, error) { - if err = common.Stopped(d.quitCh); err != nil { + err := d.stateDB.Walk(dbutils.HeaderPrefix, dbutils.EncodeBlockNumber(currentNumber), 0, func(k, v []byte) (bool, error) { + if err := common.Stopped(d.quitCh); err != nil { return false, err } @@ -72,7 +69,7 @@ func (d *Downloader) spawnBodyDownloadStage(id string) (bool, error) { return false, fmt.Errorf("walking over canonical hashes: %w", err) } if missingHeader != 0 { - if err1 := SaveStageProgress(d.stateDB, Headers, missingHeader); err1 != nil { + if err1 := stages.SaveStageProgress(d.stateDB, stages.Headers, missingHeader); err1 != nil { return false, fmt.Errorf("resetting SyncStage Headers to missing header: %w", err1) } // This will cause the sync return to the header stage @@ -131,31 +128,6 @@ func (d *Downloader) processBodiesStage(to uint64) error { } } -func (d *Downloader) unwindBodyDownloadStage(unwindPoint uint64) error { - // Here we may want to remove all blocks if we wanted to - lastProcessedBlockNumber, err := GetStageProgress(d.stateDB, Bodies) - if err != nil { - return fmt.Errorf("unwind Bodies: get stage progress: %v", err) - } - unwindPoint, err1 := GetStageUnwind(d.stateDB, Bodies) - if err1 != nil { - return err1 - } - if unwindPoint >= lastProcessedBlockNumber { - err = SaveStageUnwind(d.stateDB, Bodies, 0) - if err != nil { - return fmt.Errorf("unwind Bodies: reset: %v", err) - } - return nil - } - mutation := d.stateDB.NewBatch() - err = SaveStageUnwind(mutation, Bodies, 0) - if err != nil { - return fmt.Errorf("unwind Bodies: reset: %v", err) - } - _, err = mutation.Commit() - if err != nil { - return fmt.Errorf("unwind Bodies: failed to write db commit: %v", err) - } - return nil +func (d *Downloader) SpawnSync(fetchers []func() error) error { + return d.spawnSync(fetchers) } diff --git a/eth/downloader/stagedsync_test.go b/eth/downloader/downloader_stagedsync_test.go similarity index 99% rename from eth/downloader/stagedsync_test.go rename to eth/downloader/downloader_stagedsync_test.go index 0e1db2e29..fe554dfff 100644 --- a/eth/downloader/stagedsync_test.go +++ b/eth/downloader/downloader_stagedsync_test.go @@ -4,6 +4,10 @@ import ( "context" "errors" "fmt" + "math/big" + "sync" + "testing" + "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/consensus" "github.com/ledgerwatch/turbo-geth/consensus/ethash" @@ -16,9 +20,6 @@ import ( "github.com/ledgerwatch/turbo-geth/event" "github.com/ledgerwatch/turbo-geth/params" "github.com/ledgerwatch/turbo-geth/trie" - "math/big" - "sync" - "testing" ) type stagedSyncTester struct { diff --git a/eth/downloader/stagedsync_stage_indexes_test.go b/eth/downloader/stagedsync_stage_indexes_test.go deleted file mode 100644 index 93ddc8304..000000000 --- a/eth/downloader/stagedsync_stage_indexes_test.go +++ /dev/null @@ -1,17 +0,0 @@ -package downloader - -import ( - "github.com/ledgerwatch/turbo-geth/core" - "testing" -) - -func TestName(t *testing.T) { - core.UsePlainStateExecution = true - tester := newStagedSyncTester(true) - if err := tester.newPeer("peer", 65, testChainForkLightA); err != nil { - t.Fatal(err) - } - if err := tester.sync("peer", nil); err != nil { - t.Fatal(err) - } -} diff --git a/eth/stagedsync/stage_bodies.go b/eth/stagedsync/stage_bodies.go new file mode 100644 index 000000000..8e3cbcc08 --- /dev/null +++ b/eth/stagedsync/stage_bodies.go @@ -0,0 +1,42 @@ +package stagedsync + +import ( + "fmt" + + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" + "github.com/ledgerwatch/turbo-geth/ethdb" +) + +func spawnBodyDownloadStage(db ethdb.Getter, d DownloaderGlue, pid string) (bool, error) { + // Figure out how many blocks have already been downloaded + origin, err := stages.GetStageProgress(db, stages.Bodies) + if err != nil { + return false, fmt.Errorf("getting Bodies stage progress: %w", err) + } + return d.SpawnBodyDownloadStage(pid, origin) +} + +func unwindBodyDownloadStage(db ethdb.Database, unwindPoint uint64) error { + // Here we may want to remove all blocks if we wanted to + lastProcessedBlockNumber, err := stages.GetStageProgress(db, stages.Bodies) + if err != nil { + return fmt.Errorf("unwind Bodies: get stage progress: %v", err) + } + if unwindPoint >= lastProcessedBlockNumber { + err = stages.SaveStageUnwind(db, stages.Bodies, 0) + if err != nil { + return fmt.Errorf("unwind Bodies: reset: %v", err) + } + return nil + } + mutation := db.NewBatch() + err = stages.SaveStageUnwind(mutation, stages.Bodies, 0) + if err != nil { + return fmt.Errorf("unwind Bodies: reset: %v", err) + } + _, err = mutation.Commit() + if err != nil { + return fmt.Errorf("unwind Bodies: failed to write db commit: %v", err) + } + return nil +} diff --git a/eth/downloader/stagedsync_stage_execute.go b/eth/stagedsync/stage_execute.go similarity index 95% rename from eth/downloader/stagedsync_stage_execute.go rename to eth/stagedsync/stage_execute.go index 61f29d79a..1a9f0a72f 100644 --- a/eth/downloader/stagedsync_stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -1,4 +1,4 @@ -package downloader +package stagedsync import ( "fmt" @@ -16,6 +16,7 @@ import ( "github.com/ledgerwatch/turbo-geth/core/rawdb" "github.com/ledgerwatch/turbo-geth/core/state" "github.com/ledgerwatch/turbo-geth/core/types/accounts" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" ) @@ -73,7 +74,7 @@ const StateBatchSize = 50 * 1024 * 1024 // 50 Mb const ChangeBatchSize = 1024 * 2014 // 1 Mb func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit chan struct{}) (uint64, error) { - lastProcessedBlockNumber, err := GetStageProgress(stateDB, Execution) + lastProcessedBlockNumber, err := stages.GetStageProgress(stateDB, stages.Execution) if err != nil { return 0, err } @@ -158,7 +159,7 @@ func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit return 0, err } - if err = SaveStageProgress(stateBatch, Execution, blockNum); err != nil { + if err = stages.SaveStageProgress(stateBatch, stages.Execution, blockNum); err != nil { return 0, err } @@ -199,13 +200,13 @@ func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit } func unwindExecutionStage(unwindPoint uint64, stateDB ethdb.Database) error { - lastProcessedBlockNumber, err := GetStageProgress(stateDB, Execution) + lastProcessedBlockNumber, err := stages.GetStageProgress(stateDB, stages.Execution) if err != nil { return fmt.Errorf("unwind Execution: get stage progress: %v", err) } if unwindPoint >= lastProcessedBlockNumber { - err = SaveStageUnwind(stateDB, Execution, 0) + err = stages.SaveStageUnwind(stateDB, stages.Execution, 0) if err != nil { return fmt.Errorf("unwind Execution: reset: %v", err) } @@ -275,7 +276,7 @@ func unwindExecutionStage(unwindPoint uint64, stateDB ethdb.Database) error { } } - err = SaveStageUnwind(mutation, Execution, 0) + err = stages.SaveStageUnwind(mutation, stages.Execution, 0) if err != nil { return fmt.Errorf("unwind Execution: reset: %v", err) } diff --git a/eth/downloader/stagedsync_stage_execute_test.go b/eth/stagedsync/stage_execute_test.go similarity index 89% rename from eth/downloader/stagedsync_stage_execute_test.go rename to eth/stagedsync/stage_execute_test.go index 7d43c25b4..e24b36313 100644 --- a/eth/downloader/stagedsync_stage_execute_test.go +++ b/eth/stagedsync/stage_execute_test.go @@ -1,10 +1,11 @@ -package downloader +package stagedsync import ( "testing" "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/core" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/ethdb" ) @@ -15,7 +16,7 @@ func TestUnwindExecutionStageHashedStatic(t *testing.T) { mutation := ethdb.NewMemDatabase() generateBlocks(t, 1, 100, hashedWriterGen(mutation), staticCodeStaticIncarnations) - err := SaveStageProgress(mutation, Execution, 100) + err := stages.SaveStageProgress(mutation, stages.Execution, 100) if err != nil { t.Errorf("error while saving progress: %v", err) } @@ -35,7 +36,7 @@ func TestUnwindExecutionStageHashedWithIncarnationChanges(t *testing.T) { mutation := ethdb.NewMemDatabase() generateBlocks(t, 1, 100, hashedWriterGen(mutation), changeCodeWithIncarnations) - err := SaveStageProgress(mutation, Execution, 100) + err := stages.SaveStageProgress(mutation, stages.Execution, 100) if err != nil { t.Errorf("error while saving progress: %v", err) } @@ -55,7 +56,7 @@ func TestUnwindExecutionStageHashedWithCodeChanges(t *testing.T) { mutation := ethdb.NewMemDatabase() generateBlocks(t, 1, 100, hashedWriterGen(mutation), changeCodeIndepenentlyOfIncarnations) - err := SaveStageProgress(mutation, Execution, 100) + err := stages.SaveStageProgress(mutation, stages.Execution, 100) if err != nil { t.Errorf("error while saving progress: %v", err) } @@ -74,7 +75,7 @@ func TestUnwindExecutionStagePlainStatic(t *testing.T) { mutation := ethdb.NewMemDatabase() generateBlocks(t, 1, 100, plainWriterGen(mutation), staticCodeStaticIncarnations) - err := SaveStageProgress(mutation, Execution, 100) + err := stages.SaveStageProgress(mutation, stages.Execution, 100) if err != nil { t.Errorf("error while saving progress: %v", err) } @@ -94,7 +95,7 @@ func TestUnwindExecutionStagePlainWithIncarnationChanges(t *testing.T) { mutation := ethdb.NewMemDatabase() generateBlocks(t, 1, 100, plainWriterGen(mutation), changeCodeWithIncarnations) - err := SaveStageProgress(mutation, Execution, 100) + err := stages.SaveStageProgress(mutation, stages.Execution, 100) if err != nil { t.Errorf("error while saving progress: %v", err) } @@ -115,7 +116,7 @@ func TestUnwindExecutionStagePlainWithCodeChanges(t *testing.T) { mutation := ethdb.NewMemDatabase() generateBlocks(t, 1, 100, plainWriterGen(mutation), changeCodeIndepenentlyOfIncarnations) - err := SaveStageProgress(mutation, Execution, 100) + err := stages.SaveStageProgress(mutation, stages.Execution, 100) if err != nil { t.Errorf("error while saving progress: %v", err) } diff --git a/eth/downloader/stagedsync_stage_hashcheck.go b/eth/stagedsync/stage_hashcheck.go similarity index 92% rename from eth/downloader/stagedsync_stage_hashcheck.go rename to eth/stagedsync/stage_hashcheck.go index f222087e3..ac99f1cb4 100644 --- a/eth/downloader/stagedsync_stage_hashcheck.go +++ b/eth/stagedsync/stage_hashcheck.go @@ -1,4 +1,4 @@ -package downloader +package stagedsync import ( "fmt" @@ -8,6 +8,7 @@ import ( "github.com/ledgerwatch/turbo-geth/common/etl" "github.com/ledgerwatch/turbo-geth/core" "github.com/ledgerwatch/turbo-geth/core/rawdb" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/trie" @@ -19,7 +20,7 @@ import ( var cbor codec.CborHandle func spawnCheckFinalHashStage(stateDB ethdb.Database, syncHeadNumber uint64, datadir string, quit chan struct{}) error { - hashProgress, err := GetStageProgress(stateDB, HashCheck) + hashProgress, err := stages.GetStageProgress(stateDB, stages.HashCheck) if err != nil { return err } @@ -61,25 +62,25 @@ func spawnCheckFinalHashStage(stateDB ethdb.Database, syncHeadNumber uint64, dat return fmt.Errorf("wrong trie root: %x, expected (from header): %x", subTries.Hashes[0], syncHeadBlock.Root()) } - return SaveStageProgress(stateDB, HashCheck, blockNr) + return stages.SaveStageProgress(stateDB, stages.HashCheck, blockNr) } func unwindHashCheckStage(unwindPoint uint64, stateDB ethdb.Database) error { // Currently it does not require unwinding because it does not create any Intemediate Hash records // and recomputes the state root from scratch - lastProcessedBlockNumber, err := GetStageProgress(stateDB, HashCheck) + lastProcessedBlockNumber, err := stages.GetStageProgress(stateDB, stages.HashCheck) if err != nil { return fmt.Errorf("unwind HashCheck: get stage progress: %v", err) } if unwindPoint >= lastProcessedBlockNumber { - err = SaveStageUnwind(stateDB, HashCheck, 0) + err = stages.SaveStageUnwind(stateDB, stages.HashCheck, 0) if err != nil { return fmt.Errorf("unwind HashCheck: reset: %v", err) } return nil } mutation := stateDB.NewBatch() - err = SaveStageUnwind(mutation, HashCheck, 0) + err = stages.SaveStageUnwind(mutation, stages.HashCheck, 0) if err != nil { return fmt.Errorf("unwind HashCheck: reset: %v", err) } diff --git a/eth/downloader/stagedsync_stage_hashcheck_test.go b/eth/stagedsync/stage_hashcheck_test.go similarity index 99% rename from eth/downloader/stagedsync_stage_hashcheck_test.go rename to eth/stagedsync/stage_hashcheck_test.go index 2541e5e55..5fe429a4f 100644 --- a/eth/downloader/stagedsync_stage_hashcheck_test.go +++ b/eth/stagedsync/stage_hashcheck_test.go @@ -1,4 +1,4 @@ -package downloader +package stagedsync import ( "io/ioutil" diff --git a/eth/downloader/stagedsync_stage_indexes.go b/eth/stagedsync/stage_indexes.go similarity index 81% rename from eth/downloader/stagedsync_stage_indexes.go rename to eth/stagedsync/stage_indexes.go index 2a1e0d91f..bfb532851 100644 --- a/eth/downloader/stagedsync_stage_indexes.go +++ b/eth/stagedsync/stage_indexes.go @@ -1,16 +1,18 @@ -package downloader +package stagedsync import ( "fmt" + "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/core" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" ) func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool, quitCh chan struct{}) error { var blockNum uint64 - if lastProcessedBlockNumber, err := GetStageProgress(db, AccountHistoryIndex); err == nil { + if lastProcessedBlockNumber, err := stages.GetStageProgress(db, stages.AccountHistoryIndex); err == nil { if lastProcessedBlockNumber > 0 { blockNum = lastProcessedBlockNumber + 1 } @@ -31,7 +33,7 @@ func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool return err } - if err := SaveStageProgress(db, AccountHistoryIndex, blockNum); err != nil { + if err := stages.SaveStageProgress(db, stages.AccountHistoryIndex, blockNum); err != nil { return err } return nil @@ -40,7 +42,7 @@ func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool func spawnStorageHistoryIndex(db ethdb.Database, datadir string, plainState bool, quitCh chan struct{}) error { var blockNum uint64 - if lastProcessedBlockNumber, err := GetStageProgress(db, StorageHistoryIndex); err == nil { + if lastProcessedBlockNumber, err := stages.GetStageProgress(db, stages.StorageHistoryIndex); err == nil { if lastProcessedBlockNumber > 0 { blockNum = lastProcessedBlockNumber + 1 } @@ -59,7 +61,7 @@ func spawnStorageHistoryIndex(db ethdb.Database, datadir string, plainState bool return err } - if err = SaveStageProgress(db, StorageHistoryIndex, blockNum); err != nil { + if err = stages.SaveStageProgress(db, stages.StorageHistoryIndex, blockNum); err != nil { return err } diff --git a/eth/downloader/stagedsync_stage_senders.go b/eth/stagedsync/stage_senders.go similarity index 80% rename from eth/downloader/stagedsync_stage_senders.go rename to eth/stagedsync/stage_senders.go index 1c7bef71d..3907eb7c1 100644 --- a/eth/downloader/stagedsync_stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -1,4 +1,4 @@ -package downloader +package stagedsync import ( "context" @@ -12,7 +12,10 @@ import ( "github.com/ledgerwatch/turbo-geth/core/rawdb" "github.com/ledgerwatch/turbo-geth/core/types" "github.com/ledgerwatch/turbo-geth/crypto/secp256k1" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" + "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" + "github.com/ledgerwatch/turbo-geth/params" ) var numOfGoroutines int @@ -31,15 +34,15 @@ func init() { } } -func (d *Downloader) spawnRecoverSendersStage() error { - lastProcessedBlockNumber, err := GetStageProgress(d.stateDB, Senders) +func spawnRecoverSendersStage(stateDB ethdb.Database, config *params.ChainConfig, quitCh chan struct{}) error { + lastProcessedBlockNumber, err := stages.GetStageProgress(stateDB, stages.Senders) if err != nil { return err } nextBlockNumber := lastProcessedBlockNumber + 1 - mutation := d.stateDB.NewBatch() + mutation := stateDB.NewBatch() defer func() { _, dbErr := mutation.Commit() if dbErr != nil { @@ -47,7 +50,6 @@ func (d *Downloader) spawnRecoverSendersStage() error { } }() - config := d.blockchain.Config() emptyHash := common.Hash{} var blockNumber big.Int @@ -63,13 +65,13 @@ func (d *Downloader) spawnRecoverSendersStage() error { for i := 0; i < numOfGoroutines; i++ { // each goroutine gets it's own crypto context to make sure they are really parallel - go recoverSenders(cryptoContexts[i], jobs, out, d.quitCh) + go recoverSenders(cryptoContexts[i], jobs, out, quitCh) } log.Info("Sync (Senders): Started recoverer goroutines", "numOfGoroutines", numOfGoroutines) needExit := false for !needExit { - if err = common.Stopped(d.quitCh); err != nil { + if err = common.Stopped(quitCh); err != nil { return err } @@ -102,7 +104,7 @@ func (d *Downloader) spawnRecoverSendersStage() error { rawdb.WriteBody(context.Background(), mutation, j.hash, j.nextBlockNumber, j.blockBody) } - if err = SaveStageProgress(mutation, Senders, nextBlockNumber); err != nil { + if err = stages.SaveStageProgress(mutation, stages.Senders, nextBlockNumber); err != nil { return err } log.Info("Recovered for blocks:", "blockNumber", nextBlockNumber) @@ -111,7 +113,7 @@ func (d *Downloader) spawnRecoverSendersStage() error { if _, err = mutation.Commit(); err != nil { return err } - mutation = d.stateDB.NewBatch() + mutation = stateDB.NewBatch() } } @@ -153,25 +155,21 @@ func recoverSenders(cryptoContext *secp256k1.Context, in chan *senderRecoveryJob } } -func (d *Downloader) unwindSendersStage(unwindPoint uint64) error { +func unwindSendersStage(stateDB ethdb.Database, unwindPoint uint64) error { // Does not require any special processing - lastProcessedBlockNumber, err := GetStageProgress(d.stateDB, Senders) + lastProcessedBlockNumber, err := stages.GetStageProgress(stateDB, stages.Senders) if err != nil { return fmt.Errorf("unwind Senders: get stage progress: %v", err) } - unwindPoint, err1 := GetStageUnwind(d.stateDB, Senders) - if err1 != nil { - return err1 - } if unwindPoint >= lastProcessedBlockNumber { - err = SaveStageUnwind(d.stateDB, Senders, 0) + err = stages.SaveStageUnwind(stateDB, stages.Senders, 0) if err != nil { return fmt.Errorf("unwind Senders: reset: %v", err) } return nil } - mutation := d.stateDB.NewBatch() - err = SaveStageUnwind(mutation, Senders, 0) + mutation := stateDB.NewBatch() + err = stages.SaveStageUnwind(mutation, stages.Senders, 0) if err != nil { return fmt.Errorf("unwind Senders: reset: %v", err) } diff --git a/eth/downloader/stagedsync_downloader.go b/eth/stagedsync/stagedsync.go similarity index 57% rename from eth/downloader/stagedsync_downloader.go rename to eth/stagedsync/stagedsync.go index 9f6794426..760ae18c2 100644 --- a/eth/downloader/stagedsync_downloader.go +++ b/eth/stagedsync/stagedsync.go @@ -1,13 +1,24 @@ -package downloader +package stagedsync import ( "fmt" "github.com/ledgerwatch/turbo-geth/core" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" + "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" ) -func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers []func() error) error { +func DoStagedSyncWithFetchers( + d DownloaderGlue, + blockchain BlockChain, + stateDB ethdb.Database, + pid string, + history bool, + datadir string, + quitCh chan struct{}, + headersFetchers []func() error, +) error { var err error defer log.Info("Staged sync finished") @@ -15,7 +26,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers * Stage 1. Download Headers */ log.Info("Sync stage 1/7. Downloading headers...") - err = d.DownloadHeaders(headersFetchers) + err = DownloadHeaders(d, stateDB, headersFetchers, quitCh) if err != nil { return err } @@ -27,7 +38,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers cont := true for cont && err == nil { - cont, err = d.spawnBodyDownloadStage(p.id) + cont, err = spawnBodyDownloadStage(stateDB, d, pid) if err != nil { return err } @@ -40,7 +51,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers */ log.Info("Sync stage 3/7. Recovering senders from tx signatures...") - if err = d.spawnRecoverSendersStage(); err != nil { + if err = spawnRecoverSendersStage(stateDB, blockchain.Config(), quitCh); err != nil { return err } log.Info("Sync stage 3/7. Recovering senders from tx signatures... Complete!") @@ -49,7 +60,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers * Stage 4. Execute block bodies w/o calculating trie roots */ log.Info("Sync stage 4/7. Executing blocks w/o hash checks...") - syncHeadNumber, err := spawnExecuteBlocksStage(d.stateDB, d.blockchain, d.quitCh) + syncHeadNumber, err := spawnExecuteBlocksStage(stateDB, blockchain, quitCh) if err != nil { return err } @@ -58,16 +69,16 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers // Further stages go there log.Info("Sync stage 5/7. Validating final hash") - err = spawnCheckFinalHashStage(d.stateDB, syncHeadNumber, d.datadir, d.quitCh) + err = spawnCheckFinalHashStage(stateDB, syncHeadNumber, datadir, quitCh) if err != nil { return err } log.Info("Sync stage 5/7. Validating final hash... Complete!") - if d.history { + if history { log.Info("Sync stage 6/7. Generating account history index") - err = spawnAccountHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution, d.quitCh) + err = spawnAccountHistoryIndex(stateDB, datadir, core.UsePlainStateExecution, quitCh) if err != nil { return err } @@ -76,9 +87,9 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers log.Info("Sync stage 6/7, generating account history index is disabled. Enable by adding `h` to --storage-mode") } - if d.history { + if history { log.Info("Sync stage 7/7. Generating storage history index") - err = spawnStorageHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution, d.quitCh) + err = spawnStorageHistoryIndex(stateDB, datadir, core.UsePlainStateExecution, quitCh) if err != nil { return err } @@ -90,8 +101,8 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers return err } -func (d *Downloader) DownloadHeaders(headersFetchers []func() error) error { - err := d.spawnSync(headersFetchers) +func DownloadHeaders(d DownloaderGlue, stateDB ethdb.Database, headersFetchers []func() error, quitCh chan struct{}) error { + err := d.SpawnSync(headersFetchers) if err != nil { return err } @@ -99,8 +110,8 @@ func (d *Downloader) DownloadHeaders(headersFetchers []func() error) error { log.Info("Sync stage 1/7. Downloading headers... Complete!") log.Info("Checking for unwinding...") // Check unwinds backwards and if they are outstanding, invoke corresponding functions - for stage := Finish - 1; stage > Headers; stage-- { - unwindPoint, err := GetStageUnwind(d.stateDB, stage) + for stage := stages.Finish - 1; stage > stages.Headers; stage-- { + unwindPoint, err := stages.GetStageUnwind(stateDB, stage) if err != nil { return err } @@ -110,18 +121,18 @@ func (d *Downloader) DownloadHeaders(headersFetchers []func() error) error { } switch stage { - case Bodies: - err = d.unwindBodyDownloadStage(unwindPoint) - case Senders: - err = d.unwindSendersStage(unwindPoint) - case Execution: - err = unwindExecutionStage(unwindPoint, d.stateDB) - case HashCheck: - err = unwindHashCheckStage(unwindPoint, d.stateDB) - case AccountHistoryIndex: - err = unwindAccountHistoryIndex(unwindPoint, d.stateDB, core.UsePlainStateExecution, d.quitCh) - case StorageHistoryIndex: - err = unwindStorageHistoryIndex(unwindPoint, d.stateDB, core.UsePlainStateExecution, d.quitCh) + case stages.Bodies: + err = unwindBodyDownloadStage(stateDB, unwindPoint) + case stages.Senders: + err = unwindSendersStage(stateDB, unwindPoint) + case stages.Execution: + err = unwindExecutionStage(unwindPoint, stateDB) + case stages.HashCheck: + err = unwindHashCheckStage(unwindPoint, stateDB) + case stages.AccountHistoryIndex: + err = unwindAccountHistoryIndex(unwindPoint, stateDB, core.UsePlainStateExecution, quitCh) + case stages.StorageHistoryIndex: + err = unwindStorageHistoryIndex(unwindPoint, stateDB, core.UsePlainStateExecution, quitCh) default: return fmt.Errorf("unrecognized stage for unwinding: %d", stage) } diff --git a/eth/downloader/stagedsync_stages.go b/eth/stagedsync/stages/stages.go similarity index 99% rename from eth/downloader/stagedsync_stages.go rename to eth/stagedsync/stages/stages.go index 948d4247b..3660bac65 100644 --- a/eth/downloader/stagedsync_stages.go +++ b/eth/stagedsync/stages/stages.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package downloader +package stages import ( "encoding/binary" diff --git a/eth/downloader/stagedsync_testutil.go b/eth/stagedsync/testutil.go similarity index 99% rename from eth/downloader/stagedsync_testutil.go rename to eth/stagedsync/testutil.go index 4fbf45a30..9a24a68bb 100644 --- a/eth/downloader/stagedsync_testutil.go +++ b/eth/stagedsync/testutil.go @@ -1,4 +1,4 @@ -package downloader +package stagedsync import ( "context" diff --git a/eth/stagedsync/types.go b/eth/stagedsync/types.go new file mode 100644 index 000000000..5b7699bfa --- /dev/null +++ b/eth/stagedsync/types.go @@ -0,0 +1,20 @@ +package stagedsync + +import ( + "github.com/ledgerwatch/turbo-geth/core" + "github.com/ledgerwatch/turbo-geth/core/types" + "github.com/ledgerwatch/turbo-geth/core/vm" + "github.com/ledgerwatch/turbo-geth/params" +) + +type BlockChain interface { + core.ChainContext + Config() *params.ChainConfig + GetVMConfig() *vm.Config + GetBlockByNumber(uint64) *types.Block +} + +type DownloaderGlue interface { + SpawnSync([]func() error) error + SpawnBodyDownloadStage(string, uint64) (bool, error) +}