move staged sync to its own package (#605)

This commit is contained in:
Igor Mandrigin 2020-06-02 17:52:50 +03:00 committed by GitHub
parent 4ce69916dc
commit 1e20ed255d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 182 additions and 139 deletions

View File

@ -37,8 +37,8 @@ import (
"github.com/ledgerwatch/turbo-geth/core/types/accounts"
"github.com/ledgerwatch/turbo-geth/core/vm"
"github.com/ledgerwatch/turbo-geth/crypto"
"github.com/ledgerwatch/turbo-geth/eth/downloader"
"github.com/ledgerwatch/turbo-geth/eth/mgr"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/node"
@ -2166,7 +2166,7 @@ func resetState(chaindata string) {
core.UsePlainStateExecution = true
_, _, err = core.DefaultGenesisBlock().CommitGenesisState(db, false)
check(err)
err = downloader.SaveStageProgress(db, downloader.Execution, 0)
err = stages.SaveStageProgress(db, stages.Execution, 0)
check(err)
fmt.Printf("Reset state done\n")
}
@ -2193,7 +2193,7 @@ func resetHashedState(chaindata string) {
return nil
})
check(err)
err = downloader.SaveStageProgress(db, downloader.HashCheck, 0)
err = stages.SaveStageProgress(db, stages.HashCheck, 0)
check(err)
fmt.Printf("Reset hashed state done\n")
}
@ -2206,11 +2206,11 @@ func resetHistoryIndex(chaindata string) {
db.DeleteBucket(dbutils.AccountsHistoryBucket)
//nolint:errcheck
db.DeleteBucket(dbutils.StorageHistoryBucket)
err = downloader.SaveStageProgress(db, downloader.AccountHistoryIndex, 0)
err = stages.SaveStageProgress(db, stages.AccountHistoryIndex, 0)
check(err)
err = downloader.SaveStageProgress(db, downloader.StorageHistoryIndex, 0)
err = stages.SaveStageProgress(db, stages.StorageHistoryIndex, 0)
check(err)
err = downloader.SaveStageProgress(db, downloader.HashCheck, 0)
err = stages.SaveStageProgress(db, stages.HashCheck, 0)
check(err)
fmt.Printf("Reset history index done\n")
}

View File

@ -32,6 +32,8 @@ import (
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/core/vm"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/event"
"github.com/ledgerwatch/turbo-geth/log"
@ -543,7 +545,16 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// Turbo-Geth's staged sync goes here
if d.mode == StagedSync {
return d.doStagedSyncWithFetchers(p, fetchers)
return stagedsync.DoStagedSyncWithFetchers(
d,
d.blockchain,
d.stateDB,
p.id,
d.history,
d.datadir,
d.quitCh,
fetchers,
)
}
fetchers = append(fetchers, func() error { return d.fetchBodies(origin + 1) }) // Bodies are retrieved during normal and fast sync
@ -735,7 +746,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
case FastSync:
localHeight = d.blockchain.CurrentFastBlock().NumberU64()
case StagedSync:
localHeight, err = GetStageProgress(d.stateDB, Headers)
localHeight, err = stages.GetStageProgress(d.stateDB, stages.Headers)
if err != nil {
return 0, err
}
@ -1509,7 +1520,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
n, newCanonical, lowestCanonicalNumber, err = d.blockchain.InsertHeaderChainStaged(chunk, frequency)
if newCanonical {
// Need to unwind further stages
if err1 := UnwindAllStages(d.stateDB, lowestCanonicalNumber); err1 != nil {
if err1 := stages.UnwindAllStages(d.stateDB, lowestCanonicalNumber); err1 != nil {
return fmt.Errorf("unwinding all stages to %d: %v", lowestCanonicalNumber, err1)
}
}
@ -1517,7 +1528,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
n, err = d.lightchain.InsertHeaderChain(chunk, frequency)
}
if d.mode == StagedSync && n > 0 {
if err1 := SaveStageProgress(d.stateDB, Headers, chunk[n-1].Number.Uint64()); err1 != nil {
if err1 := stages.SaveStageProgress(d.stateDB, stages.Headers, chunk[n-1].Number.Uint64()); err1 != nil {
return fmt.Errorf("saving SyncStage Headers progress: %v", err1)
}
}
@ -1626,7 +1637,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult, execute bool) (u
return 0, errInvalidChain
}
if d.mode == StagedSync && index > 0 {
if err1 := SaveStageProgress(d.stateDB, Bodies, blocks[index-1].NumberU64()); err1 != nil {
if err1 := stages.SaveStageProgress(d.stateDB, stages.Bodies, blocks[index-1].NumberU64()); err1 != nil {
return 0, fmt.Errorf("saving SyncStage Bodies progress: %v", err1)
}
return blocks[index-1].NumberU64() + 1, nil

View File

@ -8,11 +8,13 @@ import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/rlp"
)
func (d *Downloader) spawnBodyDownloadStage(id string) (bool, error) {
// externsions for downloader needed for staged sync
func (d *Downloader) SpawnBodyDownloadStage(id string, origin uint64) (bool, error) {
// Create cancel channel for aborting mid-flight and mark the master peer
d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
@ -20,11 +22,6 @@ func (d *Downloader) spawnBodyDownloadStage(id string) (bool, error) {
d.cancelLock.Unlock()
defer d.Cancel() // No matter what, we can't leave the cancel channel open
// Figure out how many blocks have already been downloaded
origin, err := GetStageProgress(d.stateDB, Bodies)
if err != nil {
return false, fmt.Errorf("getting Bodies stage progress: %w", err)
}
// Figure out how many headers we have
currentNumber := origin + 1
var missingHeader uint64
@ -33,8 +30,8 @@ func (d *Downloader) spawnBodyDownloadStage(id string) (bool, error) {
var hashes [N]common.Hash // Canonical hashes of the blocks
var headers = make(map[common.Hash]*types.Header) // We use map because there might be more than one header by block number
var hashCount = 0
err = d.stateDB.Walk(dbutils.HeaderPrefix, dbutils.EncodeBlockNumber(currentNumber), 0, func(k, v []byte) (bool, error) {
if err = common.Stopped(d.quitCh); err != nil {
err := d.stateDB.Walk(dbutils.HeaderPrefix, dbutils.EncodeBlockNumber(currentNumber), 0, func(k, v []byte) (bool, error) {
if err := common.Stopped(d.quitCh); err != nil {
return false, err
}
@ -72,7 +69,7 @@ func (d *Downloader) spawnBodyDownloadStage(id string) (bool, error) {
return false, fmt.Errorf("walking over canonical hashes: %w", err)
}
if missingHeader != 0 {
if err1 := SaveStageProgress(d.stateDB, Headers, missingHeader); err1 != nil {
if err1 := stages.SaveStageProgress(d.stateDB, stages.Headers, missingHeader); err1 != nil {
return false, fmt.Errorf("resetting SyncStage Headers to missing header: %w", err1)
}
// This will cause the sync return to the header stage
@ -131,31 +128,6 @@ func (d *Downloader) processBodiesStage(to uint64) error {
}
}
func (d *Downloader) unwindBodyDownloadStage(unwindPoint uint64) error {
// Here we may want to remove all blocks if we wanted to
lastProcessedBlockNumber, err := GetStageProgress(d.stateDB, Bodies)
if err != nil {
return fmt.Errorf("unwind Bodies: get stage progress: %v", err)
}
unwindPoint, err1 := GetStageUnwind(d.stateDB, Bodies)
if err1 != nil {
return err1
}
if unwindPoint >= lastProcessedBlockNumber {
err = SaveStageUnwind(d.stateDB, Bodies, 0)
if err != nil {
return fmt.Errorf("unwind Bodies: reset: %v", err)
}
return nil
}
mutation := d.stateDB.NewBatch()
err = SaveStageUnwind(mutation, Bodies, 0)
if err != nil {
return fmt.Errorf("unwind Bodies: reset: %v", err)
}
_, err = mutation.Commit()
if err != nil {
return fmt.Errorf("unwind Bodies: failed to write db commit: %v", err)
}
return nil
func (d *Downloader) SpawnSync(fetchers []func() error) error {
return d.spawnSync(fetchers)
}

View File

@ -4,6 +4,10 @@ import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"testing"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/consensus"
"github.com/ledgerwatch/turbo-geth/consensus/ethash"
@ -16,9 +20,6 @@ import (
"github.com/ledgerwatch/turbo-geth/event"
"github.com/ledgerwatch/turbo-geth/params"
"github.com/ledgerwatch/turbo-geth/trie"
"math/big"
"sync"
"testing"
)
type stagedSyncTester struct {

View File

@ -1,17 +0,0 @@
package downloader
import (
"github.com/ledgerwatch/turbo-geth/core"
"testing"
)
func TestName(t *testing.T) {
core.UsePlainStateExecution = true
tester := newStagedSyncTester(true)
if err := tester.newPeer("peer", 65, testChainForkLightA); err != nil {
t.Fatal(err)
}
if err := tester.sync("peer", nil); err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,42 @@
package stagedsync
import (
"fmt"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
)
func spawnBodyDownloadStage(db ethdb.Getter, d DownloaderGlue, pid string) (bool, error) {
// Figure out how many blocks have already been downloaded
origin, err := stages.GetStageProgress(db, stages.Bodies)
if err != nil {
return false, fmt.Errorf("getting Bodies stage progress: %w", err)
}
return d.SpawnBodyDownloadStage(pid, origin)
}
func unwindBodyDownloadStage(db ethdb.Database, unwindPoint uint64) error {
// Here we may want to remove all blocks if we wanted to
lastProcessedBlockNumber, err := stages.GetStageProgress(db, stages.Bodies)
if err != nil {
return fmt.Errorf("unwind Bodies: get stage progress: %v", err)
}
if unwindPoint >= lastProcessedBlockNumber {
err = stages.SaveStageUnwind(db, stages.Bodies, 0)
if err != nil {
return fmt.Errorf("unwind Bodies: reset: %v", err)
}
return nil
}
mutation := db.NewBatch()
err = stages.SaveStageUnwind(mutation, stages.Bodies, 0)
if err != nil {
return fmt.Errorf("unwind Bodies: reset: %v", err)
}
_, err = mutation.Commit()
if err != nil {
return fmt.Errorf("unwind Bodies: failed to write db commit: %v", err)
}
return nil
}

View File

@ -1,4 +1,4 @@
package downloader
package stagedsync
import (
"fmt"
@ -16,6 +16,7 @@ import (
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/state"
"github.com/ledgerwatch/turbo-geth/core/types/accounts"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
)
@ -73,7 +74,7 @@ const StateBatchSize = 50 * 1024 * 1024 // 50 Mb
const ChangeBatchSize = 1024 * 2014 // 1 Mb
func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit chan struct{}) (uint64, error) {
lastProcessedBlockNumber, err := GetStageProgress(stateDB, Execution)
lastProcessedBlockNumber, err := stages.GetStageProgress(stateDB, stages.Execution)
if err != nil {
return 0, err
}
@ -158,7 +159,7 @@ func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit
return 0, err
}
if err = SaveStageProgress(stateBatch, Execution, blockNum); err != nil {
if err = stages.SaveStageProgress(stateBatch, stages.Execution, blockNum); err != nil {
return 0, err
}
@ -199,13 +200,13 @@ func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit
}
func unwindExecutionStage(unwindPoint uint64, stateDB ethdb.Database) error {
lastProcessedBlockNumber, err := GetStageProgress(stateDB, Execution)
lastProcessedBlockNumber, err := stages.GetStageProgress(stateDB, stages.Execution)
if err != nil {
return fmt.Errorf("unwind Execution: get stage progress: %v", err)
}
if unwindPoint >= lastProcessedBlockNumber {
err = SaveStageUnwind(stateDB, Execution, 0)
err = stages.SaveStageUnwind(stateDB, stages.Execution, 0)
if err != nil {
return fmt.Errorf("unwind Execution: reset: %v", err)
}
@ -275,7 +276,7 @@ func unwindExecutionStage(unwindPoint uint64, stateDB ethdb.Database) error {
}
}
err = SaveStageUnwind(mutation, Execution, 0)
err = stages.SaveStageUnwind(mutation, stages.Execution, 0)
if err != nil {
return fmt.Errorf("unwind Execution: reset: %v", err)
}

View File

@ -1,10 +1,11 @@
package downloader
package stagedsync
import (
"testing"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
)
@ -15,7 +16,7 @@ func TestUnwindExecutionStageHashedStatic(t *testing.T) {
mutation := ethdb.NewMemDatabase()
generateBlocks(t, 1, 100, hashedWriterGen(mutation), staticCodeStaticIncarnations)
err := SaveStageProgress(mutation, Execution, 100)
err := stages.SaveStageProgress(mutation, stages.Execution, 100)
if err != nil {
t.Errorf("error while saving progress: %v", err)
}
@ -35,7 +36,7 @@ func TestUnwindExecutionStageHashedWithIncarnationChanges(t *testing.T) {
mutation := ethdb.NewMemDatabase()
generateBlocks(t, 1, 100, hashedWriterGen(mutation), changeCodeWithIncarnations)
err := SaveStageProgress(mutation, Execution, 100)
err := stages.SaveStageProgress(mutation, stages.Execution, 100)
if err != nil {
t.Errorf("error while saving progress: %v", err)
}
@ -55,7 +56,7 @@ func TestUnwindExecutionStageHashedWithCodeChanges(t *testing.T) {
mutation := ethdb.NewMemDatabase()
generateBlocks(t, 1, 100, hashedWriterGen(mutation), changeCodeIndepenentlyOfIncarnations)
err := SaveStageProgress(mutation, Execution, 100)
err := stages.SaveStageProgress(mutation, stages.Execution, 100)
if err != nil {
t.Errorf("error while saving progress: %v", err)
}
@ -74,7 +75,7 @@ func TestUnwindExecutionStagePlainStatic(t *testing.T) {
mutation := ethdb.NewMemDatabase()
generateBlocks(t, 1, 100, plainWriterGen(mutation), staticCodeStaticIncarnations)
err := SaveStageProgress(mutation, Execution, 100)
err := stages.SaveStageProgress(mutation, stages.Execution, 100)
if err != nil {
t.Errorf("error while saving progress: %v", err)
}
@ -94,7 +95,7 @@ func TestUnwindExecutionStagePlainWithIncarnationChanges(t *testing.T) {
mutation := ethdb.NewMemDatabase()
generateBlocks(t, 1, 100, plainWriterGen(mutation), changeCodeWithIncarnations)
err := SaveStageProgress(mutation, Execution, 100)
err := stages.SaveStageProgress(mutation, stages.Execution, 100)
if err != nil {
t.Errorf("error while saving progress: %v", err)
}
@ -115,7 +116,7 @@ func TestUnwindExecutionStagePlainWithCodeChanges(t *testing.T) {
mutation := ethdb.NewMemDatabase()
generateBlocks(t, 1, 100, plainWriterGen(mutation), changeCodeIndepenentlyOfIncarnations)
err := SaveStageProgress(mutation, Execution, 100)
err := stages.SaveStageProgress(mutation, stages.Execution, 100)
if err != nil {
t.Errorf("error while saving progress: %v", err)
}

View File

@ -1,4 +1,4 @@
package downloader
package stagedsync
import (
"fmt"
@ -8,6 +8,7 @@ import (
"github.com/ledgerwatch/turbo-geth/common/etl"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/trie"
@ -19,7 +20,7 @@ import (
var cbor codec.CborHandle
func spawnCheckFinalHashStage(stateDB ethdb.Database, syncHeadNumber uint64, datadir string, quit chan struct{}) error {
hashProgress, err := GetStageProgress(stateDB, HashCheck)
hashProgress, err := stages.GetStageProgress(stateDB, stages.HashCheck)
if err != nil {
return err
}
@ -61,25 +62,25 @@ func spawnCheckFinalHashStage(stateDB ethdb.Database, syncHeadNumber uint64, dat
return fmt.Errorf("wrong trie root: %x, expected (from header): %x", subTries.Hashes[0], syncHeadBlock.Root())
}
return SaveStageProgress(stateDB, HashCheck, blockNr)
return stages.SaveStageProgress(stateDB, stages.HashCheck, blockNr)
}
func unwindHashCheckStage(unwindPoint uint64, stateDB ethdb.Database) error {
// Currently it does not require unwinding because it does not create any Intemediate Hash records
// and recomputes the state root from scratch
lastProcessedBlockNumber, err := GetStageProgress(stateDB, HashCheck)
lastProcessedBlockNumber, err := stages.GetStageProgress(stateDB, stages.HashCheck)
if err != nil {
return fmt.Errorf("unwind HashCheck: get stage progress: %v", err)
}
if unwindPoint >= lastProcessedBlockNumber {
err = SaveStageUnwind(stateDB, HashCheck, 0)
err = stages.SaveStageUnwind(stateDB, stages.HashCheck, 0)
if err != nil {
return fmt.Errorf("unwind HashCheck: reset: %v", err)
}
return nil
}
mutation := stateDB.NewBatch()
err = SaveStageUnwind(mutation, HashCheck, 0)
err = stages.SaveStageUnwind(mutation, stages.HashCheck, 0)
if err != nil {
return fmt.Errorf("unwind HashCheck: reset: %v", err)
}

View File

@ -1,4 +1,4 @@
package downloader
package stagedsync
import (
"io/ioutil"

View File

@ -1,16 +1,18 @@
package downloader
package stagedsync
import (
"fmt"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
)
func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool, quitCh chan struct{}) error {
var blockNum uint64
if lastProcessedBlockNumber, err := GetStageProgress(db, AccountHistoryIndex); err == nil {
if lastProcessedBlockNumber, err := stages.GetStageProgress(db, stages.AccountHistoryIndex); err == nil {
if lastProcessedBlockNumber > 0 {
blockNum = lastProcessedBlockNumber + 1
}
@ -31,7 +33,7 @@ func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool
return err
}
if err := SaveStageProgress(db, AccountHistoryIndex, blockNum); err != nil {
if err := stages.SaveStageProgress(db, stages.AccountHistoryIndex, blockNum); err != nil {
return err
}
return nil
@ -40,7 +42,7 @@ func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool
func spawnStorageHistoryIndex(db ethdb.Database, datadir string, plainState bool, quitCh chan struct{}) error {
var blockNum uint64
if lastProcessedBlockNumber, err := GetStageProgress(db, StorageHistoryIndex); err == nil {
if lastProcessedBlockNumber, err := stages.GetStageProgress(db, stages.StorageHistoryIndex); err == nil {
if lastProcessedBlockNumber > 0 {
blockNum = lastProcessedBlockNumber + 1
}
@ -59,7 +61,7 @@ func spawnStorageHistoryIndex(db ethdb.Database, datadir string, plainState bool
return err
}
if err = SaveStageProgress(db, StorageHistoryIndex, blockNum); err != nil {
if err = stages.SaveStageProgress(db, stages.StorageHistoryIndex, blockNum); err != nil {
return err
}

View File

@ -1,4 +1,4 @@
package downloader
package stagedsync
import (
"context"
@ -12,7 +12,10 @@ import (
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/crypto/secp256k1"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/params"
)
var numOfGoroutines int
@ -31,15 +34,15 @@ func init() {
}
}
func (d *Downloader) spawnRecoverSendersStage() error {
lastProcessedBlockNumber, err := GetStageProgress(d.stateDB, Senders)
func spawnRecoverSendersStage(stateDB ethdb.Database, config *params.ChainConfig, quitCh chan struct{}) error {
lastProcessedBlockNumber, err := stages.GetStageProgress(stateDB, stages.Senders)
if err != nil {
return err
}
nextBlockNumber := lastProcessedBlockNumber + 1
mutation := d.stateDB.NewBatch()
mutation := stateDB.NewBatch()
defer func() {
_, dbErr := mutation.Commit()
if dbErr != nil {
@ -47,7 +50,6 @@ func (d *Downloader) spawnRecoverSendersStage() error {
}
}()
config := d.blockchain.Config()
emptyHash := common.Hash{}
var blockNumber big.Int
@ -63,13 +65,13 @@ func (d *Downloader) spawnRecoverSendersStage() error {
for i := 0; i < numOfGoroutines; i++ {
// each goroutine gets it's own crypto context to make sure they are really parallel
go recoverSenders(cryptoContexts[i], jobs, out, d.quitCh)
go recoverSenders(cryptoContexts[i], jobs, out, quitCh)
}
log.Info("Sync (Senders): Started recoverer goroutines", "numOfGoroutines", numOfGoroutines)
needExit := false
for !needExit {
if err = common.Stopped(d.quitCh); err != nil {
if err = common.Stopped(quitCh); err != nil {
return err
}
@ -102,7 +104,7 @@ func (d *Downloader) spawnRecoverSendersStage() error {
rawdb.WriteBody(context.Background(), mutation, j.hash, j.nextBlockNumber, j.blockBody)
}
if err = SaveStageProgress(mutation, Senders, nextBlockNumber); err != nil {
if err = stages.SaveStageProgress(mutation, stages.Senders, nextBlockNumber); err != nil {
return err
}
log.Info("Recovered for blocks:", "blockNumber", nextBlockNumber)
@ -111,7 +113,7 @@ func (d *Downloader) spawnRecoverSendersStage() error {
if _, err = mutation.Commit(); err != nil {
return err
}
mutation = d.stateDB.NewBatch()
mutation = stateDB.NewBatch()
}
}
@ -153,25 +155,21 @@ func recoverSenders(cryptoContext *secp256k1.Context, in chan *senderRecoveryJob
}
}
func (d *Downloader) unwindSendersStage(unwindPoint uint64) error {
func unwindSendersStage(stateDB ethdb.Database, unwindPoint uint64) error {
// Does not require any special processing
lastProcessedBlockNumber, err := GetStageProgress(d.stateDB, Senders)
lastProcessedBlockNumber, err := stages.GetStageProgress(stateDB, stages.Senders)
if err != nil {
return fmt.Errorf("unwind Senders: get stage progress: %v", err)
}
unwindPoint, err1 := GetStageUnwind(d.stateDB, Senders)
if err1 != nil {
return err1
}
if unwindPoint >= lastProcessedBlockNumber {
err = SaveStageUnwind(d.stateDB, Senders, 0)
err = stages.SaveStageUnwind(stateDB, stages.Senders, 0)
if err != nil {
return fmt.Errorf("unwind Senders: reset: %v", err)
}
return nil
}
mutation := d.stateDB.NewBatch()
err = SaveStageUnwind(mutation, Senders, 0)
mutation := stateDB.NewBatch()
err = stages.SaveStageUnwind(mutation, stages.Senders, 0)
if err != nil {
return fmt.Errorf("unwind Senders: reset: %v", err)
}

View File

@ -1,13 +1,24 @@
package downloader
package stagedsync
import (
"fmt"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
)
func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers []func() error) error {
func DoStagedSyncWithFetchers(
d DownloaderGlue,
blockchain BlockChain,
stateDB ethdb.Database,
pid string,
history bool,
datadir string,
quitCh chan struct{},
headersFetchers []func() error,
) error {
var err error
defer log.Info("Staged sync finished")
@ -15,7 +26,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers
* Stage 1. Download Headers
*/
log.Info("Sync stage 1/7. Downloading headers...")
err = d.DownloadHeaders(headersFetchers)
err = DownloadHeaders(d, stateDB, headersFetchers, quitCh)
if err != nil {
return err
}
@ -27,7 +38,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers
cont := true
for cont && err == nil {
cont, err = d.spawnBodyDownloadStage(p.id)
cont, err = spawnBodyDownloadStage(stateDB, d, pid)
if err != nil {
return err
}
@ -40,7 +51,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers
*/
log.Info("Sync stage 3/7. Recovering senders from tx signatures...")
if err = d.spawnRecoverSendersStage(); err != nil {
if err = spawnRecoverSendersStage(stateDB, blockchain.Config(), quitCh); err != nil {
return err
}
log.Info("Sync stage 3/7. Recovering senders from tx signatures... Complete!")
@ -49,7 +60,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers
* Stage 4. Execute block bodies w/o calculating trie roots
*/
log.Info("Sync stage 4/7. Executing blocks w/o hash checks...")
syncHeadNumber, err := spawnExecuteBlocksStage(d.stateDB, d.blockchain, d.quitCh)
syncHeadNumber, err := spawnExecuteBlocksStage(stateDB, blockchain, quitCh)
if err != nil {
return err
}
@ -58,16 +69,16 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers
// Further stages go there
log.Info("Sync stage 5/7. Validating final hash")
err = spawnCheckFinalHashStage(d.stateDB, syncHeadNumber, d.datadir, d.quitCh)
err = spawnCheckFinalHashStage(stateDB, syncHeadNumber, datadir, quitCh)
if err != nil {
return err
}
log.Info("Sync stage 5/7. Validating final hash... Complete!")
if d.history {
if history {
log.Info("Sync stage 6/7. Generating account history index")
err = spawnAccountHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution, d.quitCh)
err = spawnAccountHistoryIndex(stateDB, datadir, core.UsePlainStateExecution, quitCh)
if err != nil {
return err
}
@ -76,9 +87,9 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers
log.Info("Sync stage 6/7, generating account history index is disabled. Enable by adding `h` to --storage-mode")
}
if d.history {
if history {
log.Info("Sync stage 7/7. Generating storage history index")
err = spawnStorageHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution, d.quitCh)
err = spawnStorageHistoryIndex(stateDB, datadir, core.UsePlainStateExecution, quitCh)
if err != nil {
return err
}
@ -90,8 +101,8 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers
return err
}
func (d *Downloader) DownloadHeaders(headersFetchers []func() error) error {
err := d.spawnSync(headersFetchers)
func DownloadHeaders(d DownloaderGlue, stateDB ethdb.Database, headersFetchers []func() error, quitCh chan struct{}) error {
err := d.SpawnSync(headersFetchers)
if err != nil {
return err
}
@ -99,8 +110,8 @@ func (d *Downloader) DownloadHeaders(headersFetchers []func() error) error {
log.Info("Sync stage 1/7. Downloading headers... Complete!")
log.Info("Checking for unwinding...")
// Check unwinds backwards and if they are outstanding, invoke corresponding functions
for stage := Finish - 1; stage > Headers; stage-- {
unwindPoint, err := GetStageUnwind(d.stateDB, stage)
for stage := stages.Finish - 1; stage > stages.Headers; stage-- {
unwindPoint, err := stages.GetStageUnwind(stateDB, stage)
if err != nil {
return err
}
@ -110,18 +121,18 @@ func (d *Downloader) DownloadHeaders(headersFetchers []func() error) error {
}
switch stage {
case Bodies:
err = d.unwindBodyDownloadStage(unwindPoint)
case Senders:
err = d.unwindSendersStage(unwindPoint)
case Execution:
err = unwindExecutionStage(unwindPoint, d.stateDB)
case HashCheck:
err = unwindHashCheckStage(unwindPoint, d.stateDB)
case AccountHistoryIndex:
err = unwindAccountHistoryIndex(unwindPoint, d.stateDB, core.UsePlainStateExecution, d.quitCh)
case StorageHistoryIndex:
err = unwindStorageHistoryIndex(unwindPoint, d.stateDB, core.UsePlainStateExecution, d.quitCh)
case stages.Bodies:
err = unwindBodyDownloadStage(stateDB, unwindPoint)
case stages.Senders:
err = unwindSendersStage(stateDB, unwindPoint)
case stages.Execution:
err = unwindExecutionStage(unwindPoint, stateDB)
case stages.HashCheck:
err = unwindHashCheckStage(unwindPoint, stateDB)
case stages.AccountHistoryIndex:
err = unwindAccountHistoryIndex(unwindPoint, stateDB, core.UsePlainStateExecution, quitCh)
case stages.StorageHistoryIndex:
err = unwindStorageHistoryIndex(unwindPoint, stateDB, core.UsePlainStateExecution, quitCh)
default:
return fmt.Errorf("unrecognized stage for unwinding: %d", stage)
}

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
package stages
import (
"encoding/binary"

View File

@ -1,4 +1,4 @@
package downloader
package stagedsync
import (
"context"

20
eth/stagedsync/types.go Normal file
View File

@ -0,0 +1,20 @@
package stagedsync
import (
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/core/vm"
"github.com/ledgerwatch/turbo-geth/params"
)
type BlockChain interface {
core.ChainContext
Config() *params.ChainConfig
GetVMConfig() *vm.Config
GetBlockByNumber(uint64) *types.Block
}
type DownloaderGlue interface {
SpawnSync([]func() error) error
SpawnBodyDownloadStage(string, uint64) (bool, error)
}