Limit incremental step size of stages 5 and 6. (#754)

This commit is contained in:
Alex Sharov 2020-07-17 12:52:09 +07:00 committed by GitHub
parent 4d54f0ec0c
commit c8a594dec6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 87 additions and 47 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
@ -45,7 +46,7 @@ func resetState(_ context.Context) error {
if err := resetExec(db); err != nil {
return err
}
if err := resetHashState(db); err != nil {
if err := stagedsync.ResetHashState(db); err != nil {
return err
}
if err := resetHistory(db); err != nil {
@ -106,29 +107,6 @@ func resetExec(db *ethdb.ObjectDatabase) error {
return nil
}
func resetHashState(db *ethdb.ObjectDatabase) error {
if err := db.ClearBuckets(
dbutils.CurrentStateBucket,
dbutils.ContractCodeBucket,
dbutils.IntermediateTrieHashBucket,
); err != nil {
return err
}
if err := stages.SaveStageProgress(db, stages.IntermediateHashes, 0, nil); err != nil {
return err
}
if err := stages.SaveStageProgress(db, stages.HashState, 0, nil); err != nil {
return err
}
if err := stages.SaveStageUnwind(db, stages.IntermediateHashes, 0, nil); err != nil {
return err
}
if err := stages.SaveStageUnwind(db, stages.HashState, 0, nil); err != nil {
return err
}
return nil
}
func resetHistory(db *ethdb.ObjectDatabase) error {
if err := db.ClearBuckets(
dbutils.AccountsHistoryBucket,

View File

@ -229,7 +229,7 @@ func stage5(ctx context.Context) error {
defer bc.Stop()
if reset {
if err := resetHashState(db); err != nil {
if err := stagedsync.ResetHashState(db); err != nil {
return err
}
}
@ -257,7 +257,7 @@ func stage6(ctx context.Context) error {
defer bc.Stop()
if reset {
if err := resetHashState(db); err != nil {
if err := stagedsync.ResetHashState(db); err != nil {
return err
}
}

View File

@ -41,6 +41,10 @@ func (s *StageState) ExecutionAt(db ethdb.Getter) (uint64, error) {
return execution, err
}
func (s *StageState) WasInterrupted() bool {
return len(s.StageData) > 0
}
func (s *StageState) DoneAndUpdate(db ethdb.Putter, newBlockNum uint64) error {
err := stages.SaveStageProgress(db, s.Stage, newBlockNum, nil)
s.state.NextStage()

View File

@ -20,41 +20,47 @@ import (
)
func SpawnIntermediateHashesStage(s *StageState, db ethdb.Database, datadir string, quit <-chan struct{}) error {
syncHeadNumber, _, err := stages.GetStageProgress(db, stages.Execution)
to, err := s.ExecutionAt(db)
if err != nil {
return err
}
if s.BlockNumber == syncHeadNumber {
if s.BlockNumber == to {
// we already did hash check for this block
// we don't do the obvious `if s.BlockNumber > syncHeadNumber` to support reorgs more naturally
// we don't do the obvious `if s.BlockNumber > to` to support reorgs more naturally
s.Done()
return nil
}
if s.BlockNumber == 0 {
// Special case - if this is the first cycle, we need to produce hashed state first
log.Info("Initial hashing plain state", "to", syncHeadNumber)
if err := promoteHashedStateCleanly(s, db, syncHeadNumber, datadir, quit); err != nil {
fromScratch := s.BlockNumber == 0 || s.WasInterrupted()
if fromScratch {
log.Info("Initial hashing plain state", "to", to)
if err := ResetHashState(db); err != nil {
return err
}
if err := promoteHashedStateCleanly(s, db, to, datadir, quit); err != nil {
return err
}
}
log.Info("Generating intermediate hashes", "from", s.BlockNumber, "to", syncHeadNumber)
if err := updateIntermediateHashes(s, db, s.BlockNumber, syncHeadNumber, datadir, quit); err != nil {
return err
}
return s.DoneAndUpdate(db, syncHeadNumber)
}
func updateIntermediateHashes(s *StageState, db ethdb.Database, from, to uint64, datadir string, quit <-chan struct{}) error {
hash := rawdb.ReadCanonicalHash(db, to)
syncHeadHeader := rawdb.ReadHeader(db, hash, to)
expectedRootHash := syncHeadHeader.Root
if s.BlockNumber == 0 {
return regenerateIntermediateHashes(db, datadir, expectedRootHash, quit)
if fromScratch {
log.Info("Initial generating intermediate hashes", "to", to)
if err := regenerateIntermediateHashes(db, datadir, expectedRootHash, quit); err != nil {
return err
}
} else {
log.Info("Generating intermediate hashes", "from", s.BlockNumber, "to", to)
if err := incrementIntermediateHashes(s, db, to, datadir, expectedRootHash, quit); err != nil {
return err
}
}
return incrementIntermediateHashes(s, db, from, to, datadir, expectedRootHash, quit)
return s.DoneAndUpdate(db, to)
}
func regenerateIntermediateHashes(db ethdb.Database, datadir string, expectedRootHash common.Hash, quit <-chan struct{}) error {
@ -351,14 +357,14 @@ func (p *HashPromoter) Unwind(s *StageState, u *UnwindState, storage bool, index
return nil
}
func incrementIntermediateHashes(s *StageState, db ethdb.Database, from, to uint64, datadir string, expectedRootHash common.Hash, quit <-chan struct{}) error {
func incrementIntermediateHashes(s *StageState, db ethdb.Database, to uint64, datadir string, expectedRootHash common.Hash, quit <-chan struct{}) error {
p := NewHashPromoter(db, quit)
p.TempDir = datadir
r := NewReceiver(quit)
if err := p.Promote(s, from, to, false /* storage */, 0x01, r); err != nil {
if err := p.Promote(s, s.BlockNumber, to, false /* storage */, 0x01, r); err != nil {
return err
}
if err := p.Promote(s, from, to, true /* storage */, 0x02, r); err != nil {
if err := p.Promote(s, s.BlockNumber, to, true /* storage */, 0x02, r); err != nil {
return err
}
for ks, acc := range r.accountMap {
@ -416,6 +422,23 @@ func incrementIntermediateHashes(s *StageState, db ethdb.Database, from, to uint
}
func UnwindIntermediateHashesStage(u *UnwindState, s *StageState, db ethdb.Database, datadir string, quit <-chan struct{}) error {
fromScratch := u.UnwindPoint == 0 || u.WasInterrupted()
if fromScratch {
if err := ResetHashState(db); err != nil {
return err
}
to, err := s.ExecutionAt(db)
if err != nil {
return err
}
if err := promoteHashedStateCleanly(s, db, to, datadir, quit); err != nil {
return err
}
return u.Done(db)
}
hash := rawdb.ReadCanonicalHash(db, u.UnwindPoint)
syncHeadHeader := rawdb.ReadHeader(db, hash, u.UnwindPoint)
expectedRootHash := syncHeadHeader.Root
@ -489,3 +512,30 @@ func unwindIntermediateHashesStageImpl(u *UnwindState, s *StageState, db ethdb.D
}
return nil
}
func ResetHashState(db ethdb.Database) error {
if err := db.(ethdb.NonTransactional).ClearBuckets(
dbutils.CurrentStateBucket,
dbutils.ContractCodeBucket,
dbutils.IntermediateTrieHashBucket,
); err != nil {
return err
}
batch := db.NewBatch()
if err := stages.SaveStageProgress(batch, stages.IntermediateHashes, 0, nil); err != nil {
return err
}
if err := stages.SaveStageProgress(batch, stages.HashState, 0, nil); err != nil {
return err
}
if err := stages.SaveStageUnwind(batch, stages.IntermediateHashes, 0, nil); err != nil {
return err
}
if err := stages.SaveStageUnwind(batch, stages.HashState, 0, nil); err != nil {
return err
}
if _, err := batch.Commit(); err != nil {
return err
}
return nil
}

View File

@ -31,6 +31,10 @@ func (u *UnwindState) Skip(db ethdb.Putter) error {
return stages.SaveStageUnwind(db, u.Stage, 0, nil)
}
func (u *UnwindState) WasInterrupted() bool {
return len(u.StageData) > 0
}
type PersistentUnwindStack struct {
unwindStack []UnwindState
}

View File

@ -115,4 +115,8 @@ type HasNetInterface interface {
DB() Database
}
type NonTransactional interface {
ClearBuckets(buckets ...[]byte) error
}
var errNotSupported = errors.New("not supported")