Refactor Stagedsync part 1 (#610)

This commit is contained in:
Igor Mandrigin 2020-06-03 17:25:44 +03:00 committed by GitHub
parent e5692d1726
commit 0533eed812
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 267 additions and 173 deletions

View File

@ -2503,7 +2503,8 @@ func testStage5(chaindata string) error {
log.Info("Stage4", "progress", stage4progress)
core.UsePlainStateExecution = true
ch := make(chan struct{})
if err = stagedsync.SpawnCheckFinalHashStage(db, stage4progress, "", ch); err != nil {
stageState := &stagedsync.StageState{Stage: stages.HashCheck}
if err = stagedsync.SpawnCheckFinalHashStage(stageState, db, "", ch); err != nil {
return err
}
close(ch)

13
eth/stagedsync/stage.go Normal file
View File

@ -0,0 +1,13 @@
package stagedsync
import "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
type ExecFunc func(*StageState) error
type Stage struct {
ID stages.SyncStage
Description string
ExecFunc ExecFunc
Disabled bool
DisabledDescription string
}

View File

@ -7,13 +7,16 @@ import (
"github.com/ledgerwatch/turbo-geth/ethdb"
)
func spawnBodyDownloadStage(db ethdb.Getter, d DownloaderGlue, pid string) (bool, error) {
// Figure out how many blocks have already been downloaded
origin, err := stages.GetStageProgress(db, stages.Bodies)
func spawnBodyDownloadStage(s *StageState, d DownloaderGlue, pid string) error {
cont, err := d.SpawnBodyDownloadStage(pid, s.BlockNumber)
if err != nil {
return false, fmt.Errorf("getting Bodies stage progress: %w", err)
return err
}
return d.SpawnBodyDownloadStage(pid, origin)
if !cont {
s.Done()
}
return nil
}
func unwindBodyDownloadStage(db ethdb.Database, unwindPoint uint64) error {

View File

@ -73,11 +73,8 @@ func (l *progressLogger) Stop() {
const StateBatchSize = 50 * 1024 * 1024 // 50 Mb
const ChangeBatchSize = 1024 * 2014 // 1 Mb
func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit chan struct{}) (uint64, error) {
lastProcessedBlockNumber, err := stages.GetStageProgress(stateDB, stages.Execution)
if err != nil {
return 0, err
}
func spawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, blockchain BlockChain, quit chan struct{}) error {
lastProcessedBlockNumber := s.BlockNumber
nextBlockNumber := uint64(0)
@ -110,8 +107,8 @@ func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit
engine := blockchain.Engine()
vmConfig := blockchain.GetVMConfig()
for {
if err = common.Stopped(quit); err != nil {
return 0, err
if err := common.Stopped(quit); err != nil {
return err
}
blockNum := atomic.LoadUint64(&nextBlockNumber)
@ -154,13 +151,13 @@ func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit
stateWriter.SetCodeSizeCache(codeSizeCache)
// where the magic happens
err = core.ExecuteBlockEuphemerally(chainConfig, vmConfig, blockchain, engine, block, stateReader, stateWriter)
err := core.ExecuteBlockEuphemerally(chainConfig, vmConfig, blockchain, engine, block, stateReader, stateWriter)
if err != nil {
return 0, err
return err
}
if err = stages.SaveStageProgress(stateBatch, stages.Execution, blockNum); err != nil {
return 0, err
if err = s.Update(stateBatch, blockNum); err != nil {
return err
}
atomic.AddUint64(&nextBlockNumber, 1)
@ -168,13 +165,13 @@ func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit
if stateBatch.BatchSize() >= StateBatchSize {
start := time.Now()
if _, err = stateBatch.Commit(); err != nil {
return 0, err
return err
}
log.Info("State batch committed", "in", time.Since(start))
}
if changeBatch.BatchSize() >= ChangeBatchSize {
if _, err = changeBatch.Commit(); err != nil {
return 0, err
return err
}
}
/*
@ -185,18 +182,16 @@ func spawnExecuteBlocksStage(stateDB ethdb.Database, blockchain BlockChain, quit
*/
}
// the last processed block
syncHeadNumber := atomic.LoadUint64(&nextBlockNumber) - 1
_, err = stateBatch.Commit()
_, err := stateBatch.Commit()
if err != nil {
return syncHeadNumber, fmt.Errorf("sync Execute: failed to write state batch commit: %v", err)
return fmt.Errorf("sync Execute: failed to write state batch commit: %v", err)
}
_, err = changeBatch.Commit()
if err != nil {
return syncHeadNumber, fmt.Errorf("sync Execute: failed to write change batch commit: %v", err)
return fmt.Errorf("sync Execute: failed to write change batch commit: %v", err)
}
return syncHeadNumber, nil
s.Done()
return nil
}
func unwindExecutionStage(unwindPoint uint64, stateDB ethdb.Database) error {

View File

@ -19,8 +19,10 @@ import (
var cbor codec.CborHandle
func SpawnCheckFinalHashStage(stateDB ethdb.Database, syncHeadNumber uint64, datadir string, quit chan struct{}) error {
hashProgress, err := stages.GetStageProgress(stateDB, stages.HashCheck)
func SpawnCheckFinalHashStage(s *StageState, stateDB ethdb.Database, datadir string, quit chan struct{}) error {
hashProgress := s.BlockNumber
syncHeadNumber, err := s.ExecutionAt(stateDB)
if err != nil {
return err
}
@ -28,12 +30,13 @@ func SpawnCheckFinalHashStage(stateDB ethdb.Database, syncHeadNumber uint64, dat
if hashProgress == syncHeadNumber {
// we already did hash check for this block
// we don't do the obvious `if hashProgress > syncHeadNumber` to support reorgs more naturally
s.Done()
return nil
}
if core.UsePlainStateExecution {
log.Info("Promoting plain state", "from", hashProgress, "to", syncHeadNumber)
err = promoteHashedState(stateDB, hashProgress, datadir, quit)
err := promoteHashedState(stateDB, hashProgress, datadir, quit)
if err != nil {
return err
}
@ -58,7 +61,7 @@ func SpawnCheckFinalHashStage(stateDB ethdb.Database, syncHeadNumber uint64, dat
return fmt.Errorf("wrong trie root: %x, expected (from header): %x", subTries.Hashes[0], syncHeadBlock.Root())
}
return stages.SaveStageProgress(stateDB, stages.HashCheck, blockNr)
return s.DoneAndUpdate(stateDB, blockNr)
}
func unwindHashCheckStage(unwindPoint uint64, stateDB ethdb.Database) error {

View File

@ -0,0 +1,57 @@
package stagedsync
import (
"fmt"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
)
func DownloadHeaders(s *StageState, d DownloaderGlue, stateDB ethdb.Database, headersFetchers []func() error, quitCh chan struct{}) error {
if false {
err := d.SpawnSync(headersFetchers)
if err != nil {
return err
}
log.Info("Checking for unwinding...")
// Check unwinds backwards and if they are outstanding, invoke corresponding functions
for stage := stages.Finish - 1; stage > stages.Headers; stage-- {
unwindPoint, err := stages.GetStageUnwind(stateDB, stage)
if err != nil {
return err
}
if unwindPoint == 0 {
continue
}
switch stage {
case stages.Bodies:
err = unwindBodyDownloadStage(stateDB, unwindPoint)
case stages.Senders:
err = unwindSendersStage(stateDB, unwindPoint)
case stages.Execution:
err = unwindExecutionStage(unwindPoint, stateDB)
case stages.HashCheck:
err = unwindHashCheckStage(unwindPoint, stateDB)
case stages.AccountHistoryIndex:
err = unwindAccountHistoryIndex(unwindPoint, stateDB, core.UsePlainStateExecution, quitCh)
case stages.StorageHistoryIndex:
err = unwindStorageHistoryIndex(unwindPoint, stateDB, core.UsePlainStateExecution, quitCh)
default:
return fmt.Errorf("unrecognized stage for unwinding: %d", stage)
}
if err != nil {
return fmt.Errorf("error unwinding stage: %d: %w", stage, err)
}
}
log.Info("Checking for unwinding... Complete!")
}
s.Done()
return nil
}

View File

@ -1,24 +1,19 @@
package stagedsync
import (
"fmt"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
)
func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool, quitCh chan struct{}) error {
func spawnAccountHistoryIndex(s *StageState, db ethdb.Database, datadir string, plainState bool, quitCh chan struct{}) error {
var blockNum uint64
if lastProcessedBlockNumber, err := stages.GetStageProgress(db, stages.AccountHistoryIndex); err == nil {
if lastProcessedBlockNumber > 0 {
blockNum = lastProcessedBlockNumber + 1
}
} else {
return fmt.Errorf("reading account history process: %v", err)
lastProcessedBlockNumber := s.BlockNumber
if lastProcessedBlockNumber > 0 {
blockNum = lastProcessedBlockNumber + 1
}
log.Info("Account history index generation started", "from", blockNum)
ig := core.NewIndexGenerator(db, quitCh)
@ -33,21 +28,14 @@ func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool
return err
}
if err := stages.SaveStageProgress(db, stages.AccountHistoryIndex, blockNum); err != nil {
return err
}
return nil
return s.DoneAndUpdate(db, blockNum)
}
func spawnStorageHistoryIndex(db ethdb.Database, datadir string, plainState bool, quitCh chan struct{}) error {
func spawnStorageHistoryIndex(s *StageState, db ethdb.Database, datadir string, plainState bool, quitCh chan struct{}) error {
var blockNum uint64
if lastProcessedBlockNumber, err := stages.GetStageProgress(db, stages.StorageHistoryIndex); err == nil {
if lastProcessedBlockNumber > 0 {
blockNum = lastProcessedBlockNumber + 1
}
} else {
return fmt.Errorf("reading storage history process: %v", err)
lastProcessedBlockNumber := s.BlockNumber
if lastProcessedBlockNumber > 0 {
blockNum = lastProcessedBlockNumber + 1
}
ig := core.NewIndexGenerator(db, quitCh)
ig.TempDir = datadir
@ -61,11 +49,7 @@ func spawnStorageHistoryIndex(db ethdb.Database, datadir string, plainState bool
return err
}
if err = stages.SaveStageProgress(db, stages.StorageHistoryIndex, blockNum); err != nil {
return err
}
return nil
return s.DoneAndUpdate(db, blockNum)
}
func unwindAccountHistoryIndex(unwindPoint uint64, db ethdb.Database, plainState bool, quitCh chan struct{}) error {

View File

@ -34,12 +34,8 @@ func init() {
}
}
func spawnRecoverSendersStage(stateDB ethdb.Database, config *params.ChainConfig, quitCh chan struct{}) error {
lastProcessedBlockNumber, err := stages.GetStageProgress(stateDB, stages.Senders)
if err != nil {
return err
}
func spawnRecoverSendersStage(s *StageState, stateDB ethdb.Database, config *params.ChainConfig, quitCh chan struct{}) error {
lastProcessedBlockNumber := s.BlockNumber
nextBlockNumber := lastProcessedBlockNumber + 1
mutation := stateDB.NewBatch()
@ -71,7 +67,7 @@ func spawnRecoverSendersStage(stateDB ethdb.Database, config *params.ChainConfig
needExit := false
for !needExit {
if err = common.Stopped(quitCh); err != nil {
if err := common.Stopped(quitCh); err != nil {
return err
}
@ -104,19 +100,20 @@ func spawnRecoverSendersStage(stateDB ethdb.Database, config *params.ChainConfig
rawdb.WriteBody(context.Background(), mutation, j.hash, j.nextBlockNumber, j.blockBody)
}
if err = stages.SaveStageProgress(mutation, stages.Senders, nextBlockNumber); err != nil {
if err := s.Update(mutation, nextBlockNumber); err != nil {
return err
}
log.Info("Recovered for blocks:", "blockNumber", nextBlockNumber)
if mutation.BatchSize() >= mutation.IdealBatchSize() {
if _, err = mutation.Commit(); err != nil {
if _, err := mutation.Commit(); err != nil {
return err
}
mutation = stateDB.NewBatch()
}
}
s.Done()
return nil
}

View File

@ -19,128 +19,99 @@ func DoStagedSyncWithFetchers(
quitCh chan struct{},
headersFetchers []func() error,
) error {
var err error
defer log.Info("Staged sync finished")
/*
* Stage 1. Download Headers
*/
log.Info("Sync stage 1/7. Downloading headers...")
err = DownloadHeaders(d, stateDB, headersFetchers, quitCh)
if err != nil {
return err
stages := []*Stage{
{
ID: stages.Headers,
Description: "Downloading headers",
ExecFunc: func(s *StageState) error {
return DownloadHeaders(s, d, stateDB, headersFetchers, quitCh)
},
},
{
ID: stages.Bodies,
Description: "Downloading block bodiess",
ExecFunc: func(s *StageState) error {
return spawnBodyDownloadStage(s, d, pid)
},
},
{
ID: stages.Senders,
Description: "Recovering senders from tx signatures",
ExecFunc: func(s *StageState) error {
return spawnRecoverSendersStage(s, stateDB, blockchain.Config(), quitCh)
},
},
{
ID: stages.Execution,
Description: "Executing blocks w/o hash checks",
ExecFunc: func(s *StageState) error {
return spawnExecuteBlocksStage(s, stateDB, blockchain, quitCh)
},
},
{
ID: stages.HashCheck,
Description: "Validating final hash",
ExecFunc: func(s *StageState) error {
return SpawnCheckFinalHashStage(s, stateDB, datadir, quitCh)
},
},
{
ID: stages.AccountHistoryIndex,
Description: "Generating account history index",
Disabled: !history,
DisabledDescription: "Enable by adding `h` to --storage-mode",
ExecFunc: func(s *StageState) error {
return spawnAccountHistoryIndex(s, stateDB, datadir, core.UsePlainStateExecution, quitCh)
},
},
{
ID: stages.StorageHistoryIndex,
Description: "Generating storage history index",
Disabled: !history,
DisabledDescription: "Enable by adding `h` to --storage-mode",
ExecFunc: func(s *StageState) error {
return spawnStorageHistoryIndex(s, stateDB, datadir, core.UsePlainStateExecution, quitCh)
},
},
}
/*
* Stage 2. Download Block bodies
*/
log.Info("Sync stage 2/7. Downloading block bodies...")
cont := true
state := NewState(stages)
for cont && err == nil {
cont, err = spawnBodyDownloadStage(stateDB, d, pid)
if err != nil {
return err
}
}
for !state.IsDone() {
index, stage := state.CurrentStage()
log.Info("Sync stage 2/7. Downloading block bodies... Complete!")
if stage.Disabled {
message := fmt.Sprintf(
"Sync stage %d/%d. %v disabled. %s",
index+1,
state.Len(),
stage.Description,
stage.DisabledDescription,
)
/*
* Stage 3. Recover senders from tx signatures
*/
log.Info("Sync stage 3/7. Recovering senders from tx signatures...")
log.Info(message)
if err = spawnRecoverSendersStage(stateDB, blockchain.Config(), quitCh); err != nil {
return err
}
log.Info("Sync stage 3/7. Recovering senders from tx signatures... Complete!")
/*
* Stage 4. Execute block bodies w/o calculating trie roots
*/
log.Info("Sync stage 4/7. Executing blocks w/o hash checks...")
syncHeadNumber, err := spawnExecuteBlocksStage(stateDB, blockchain, quitCh)
if err != nil {
return err
}
log.Info("Sync stage 4/7. Executing blocks w/o hash checks... Complete!")
// Further stages go there
log.Info("Sync stage 5/7. Validating final hash")
err = SpawnCheckFinalHashStage(stateDB, syncHeadNumber, datadir, quitCh)
if err != nil {
return err
}
log.Info("Sync stage 5/7. Validating final hash... Complete!")
if history {
log.Info("Sync stage 6/7. Generating account history index")
err = spawnAccountHistoryIndex(stateDB, datadir, core.UsePlainStateExecution, quitCh)
if err != nil {
return err
}
log.Info("Sync stage 6/7. Generating account history index... Complete!")
} else {
log.Info("Sync stage 6/7, generating account history index is disabled. Enable by adding `h` to --storage-mode")
}
if history {
log.Info("Sync stage 7/7. Generating storage history index")
err = spawnStorageHistoryIndex(stateDB, datadir, core.UsePlainStateExecution, quitCh)
if err != nil {
return err
}
log.Info("Sync stage 7/7. Generating storage history index... Complete!")
} else {
log.Info("Sync stage 7/7, generating storage history index is disabled. Enable by adding `h` to --storage-mode")
}
return err
}
func DownloadHeaders(d DownloaderGlue, stateDB ethdb.Database, headersFetchers []func() error, quitCh chan struct{}) error {
err := d.SpawnSync(headersFetchers)
if err != nil {
return err
}
log.Info("Sync stage 1/7. Downloading headers... Complete!")
log.Info("Checking for unwinding...")
// Check unwinds backwards and if they are outstanding, invoke corresponding functions
for stage := stages.Finish - 1; stage > stages.Headers; stage-- {
unwindPoint, err := stages.GetStageUnwind(stateDB, stage)
if err != nil {
return err
}
if unwindPoint == 0 {
state.NextStage()
continue
}
switch stage {
case stages.Bodies:
err = unwindBodyDownloadStage(stateDB, unwindPoint)
case stages.Senders:
err = unwindSendersStage(stateDB, unwindPoint)
case stages.Execution:
err = unwindExecutionStage(unwindPoint, stateDB)
case stages.HashCheck:
err = unwindHashCheckStage(unwindPoint, stateDB)
case stages.AccountHistoryIndex:
err = unwindAccountHistoryIndex(unwindPoint, stateDB, core.UsePlainStateExecution, quitCh)
case stages.StorageHistoryIndex:
err = unwindStorageHistoryIndex(unwindPoint, stateDB, core.UsePlainStateExecution, quitCh)
default:
return fmt.Errorf("unrecognized stage for unwinding: %d", stage)
stageState, err := state.StageState(stage.ID, stateDB)
if err != nil {
return err
}
message := fmt.Sprintf("Sync stage %d/%d. %v...", index+1, state.Len(), stage.Description)
log.Info(message)
err = stage.ExecFunc(stageState)
if err != nil {
return fmt.Errorf("error unwinding stage: %d: %w", stage, err)
return err
}
log.Info(fmt.Sprintf("%s DONE!", message))
}
log.Info("Checking for unwinding... Complete!")
return nil
}

70
eth/stagedsync/state.go Normal file
View File

@ -0,0 +1,70 @@
package stagedsync
import (
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
)
type State struct {
stages []*Stage
currentStage uint
}
func (s *State) Len() int {
return len(s.stages)
}
func (s *State) NextStage() {
s.currentStage++
}
func (s *State) IsDone() bool {
return s.currentStage >= uint(len(s.stages))
}
func (s *State) CurrentStage() (uint, *Stage) {
return s.currentStage, s.stages[s.currentStage]
}
func NewState(stages []*Stage) *State {
return &State{
stages: stages,
currentStage: 0,
}
}
func (s *State) StageState(stage stages.SyncStage, db ethdb.Getter) (*StageState, error) {
blockNum, err := stages.GetStageProgress(db, stage)
if err != nil {
return nil, err
}
return &StageState{s, stage, blockNum}, nil
}
type StageState struct {
state *State
Stage stages.SyncStage
BlockNumber uint64
}
func (s *StageState) Update(db ethdb.Putter, newBlockNum uint64) error {
return stages.SaveStageProgress(db, s.Stage, newBlockNum)
}
func (s *StageState) Done() {
if s.state != nil {
s.state.NextStage()
}
}
func (s *StageState) ExecutionAt(db ethdb.Getter) (uint64, error) {
return stages.GetStageProgress(db, stages.Execution)
}
func (s *StageState) DoneAndUpdate(db ethdb.Putter, newBlockNum uint64) error {
err := stages.SaveStageProgress(db, s.Stage, newBlockNum)
if s.state != nil {
s.state.NextStage()
}
return err
}