From c8a594dec6eaf0f31ee1d4d8586522fcb8e7b608 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Fri, 17 Jul 2020 12:52:09 +0700 Subject: [PATCH] Limit incremental step size of stages 5 and 6. (#754) --- cmd/integration/commands/reset_state.go | 26 +------ cmd/integration/commands/stages.go | 4 +- eth/stagedsync/stage.go | 4 ++ eth/stagedsync/stage_interhashes.go | 92 +++++++++++++++++++------ eth/stagedsync/unwind.go | 4 ++ ethdb/interface.go | 4 ++ 6 files changed, 87 insertions(+), 47 deletions(-) diff --git a/cmd/integration/commands/reset_state.go b/cmd/integration/commands/reset_state.go index 59609ef8e..ebacb464b 100644 --- a/cmd/integration/commands/reset_state.go +++ b/cmd/integration/commands/reset_state.go @@ -6,6 +6,7 @@ import ( "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/core" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync" "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" @@ -45,7 +46,7 @@ func resetState(_ context.Context) error { if err := resetExec(db); err != nil { return err } - if err := resetHashState(db); err != nil { + if err := stagedsync.ResetHashState(db); err != nil { return err } if err := resetHistory(db); err != nil { @@ -106,29 +107,6 @@ func resetExec(db *ethdb.ObjectDatabase) error { return nil } -func resetHashState(db *ethdb.ObjectDatabase) error { - if err := db.ClearBuckets( - dbutils.CurrentStateBucket, - dbutils.ContractCodeBucket, - dbutils.IntermediateTrieHashBucket, - ); err != nil { - return err - } - if err := stages.SaveStageProgress(db, stages.IntermediateHashes, 0, nil); err != nil { - return err - } - if err := stages.SaveStageProgress(db, stages.HashState, 0, nil); err != nil { - return err - } - if err := stages.SaveStageUnwind(db, stages.IntermediateHashes, 0, nil); err != nil { - return err - } - if err := stages.SaveStageUnwind(db, stages.HashState, 0, nil); err != nil { - return err - } - return nil -} - func resetHistory(db *ethdb.ObjectDatabase) error { if err := db.ClearBuckets( dbutils.AccountsHistoryBucket, diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 8e7392e11..8a8120fe5 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -229,7 +229,7 @@ func stage5(ctx context.Context) error { defer bc.Stop() if reset { - if err := resetHashState(db); err != nil { + if err := stagedsync.ResetHashState(db); err != nil { return err } } @@ -257,7 +257,7 @@ func stage6(ctx context.Context) error { defer bc.Stop() if reset { - if err := resetHashState(db); err != nil { + if err := stagedsync.ResetHashState(db); err != nil { return err } } diff --git a/eth/stagedsync/stage.go b/eth/stagedsync/stage.go index a4d2da5a4..4c839345c 100644 --- a/eth/stagedsync/stage.go +++ b/eth/stagedsync/stage.go @@ -41,6 +41,10 @@ func (s *StageState) ExecutionAt(db ethdb.Getter) (uint64, error) { return execution, err } +func (s *StageState) WasInterrupted() bool { + return len(s.StageData) > 0 +} + func (s *StageState) DoneAndUpdate(db ethdb.Putter, newBlockNum uint64) error { err := stages.SaveStageProgress(db, s.Stage, newBlockNum, nil) s.state.NextStage() diff --git a/eth/stagedsync/stage_interhashes.go b/eth/stagedsync/stage_interhashes.go index 185f08a6c..ec22d8646 100644 --- a/eth/stagedsync/stage_interhashes.go +++ b/eth/stagedsync/stage_interhashes.go @@ -20,41 +20,47 @@ import ( ) func SpawnIntermediateHashesStage(s *StageState, db ethdb.Database, datadir string, quit <-chan struct{}) error { - syncHeadNumber, _, err := stages.GetStageProgress(db, stages.Execution) + to, err := s.ExecutionAt(db) if err != nil { return err } - if s.BlockNumber == syncHeadNumber { + if s.BlockNumber == to { // we already did hash check for this block - // we don't do the obvious `if s.BlockNumber > syncHeadNumber` to support reorgs more naturally + // we don't do the obvious `if s.BlockNumber > to` to support reorgs more naturally s.Done() return nil } - if s.BlockNumber == 0 { - // Special case - if this is the first cycle, we need to produce hashed state first - log.Info("Initial hashing plain state", "to", syncHeadNumber) - if err := promoteHashedStateCleanly(s, db, syncHeadNumber, datadir, quit); err != nil { + fromScratch := s.BlockNumber == 0 || s.WasInterrupted() + if fromScratch { + log.Info("Initial hashing plain state", "to", to) + if err := ResetHashState(db); err != nil { + return err + } + + if err := promoteHashedStateCleanly(s, db, to, datadir, quit); err != nil { return err } } - log.Info("Generating intermediate hashes", "from", s.BlockNumber, "to", syncHeadNumber) - if err := updateIntermediateHashes(s, db, s.BlockNumber, syncHeadNumber, datadir, quit); err != nil { - return err - } - return s.DoneAndUpdate(db, syncHeadNumber) -} - -func updateIntermediateHashes(s *StageState, db ethdb.Database, from, to uint64, datadir string, quit <-chan struct{}) error { hash := rawdb.ReadCanonicalHash(db, to) syncHeadHeader := rawdb.ReadHeader(db, hash, to) expectedRootHash := syncHeadHeader.Root - if s.BlockNumber == 0 { - return regenerateIntermediateHashes(db, datadir, expectedRootHash, quit) + + if fromScratch { + log.Info("Initial generating intermediate hashes", "to", to) + if err := regenerateIntermediateHashes(db, datadir, expectedRootHash, quit); err != nil { + return err + } + } else { + log.Info("Generating intermediate hashes", "from", s.BlockNumber, "to", to) + if err := incrementIntermediateHashes(s, db, to, datadir, expectedRootHash, quit); err != nil { + return err + } } - return incrementIntermediateHashes(s, db, from, to, datadir, expectedRootHash, quit) + + return s.DoneAndUpdate(db, to) } func regenerateIntermediateHashes(db ethdb.Database, datadir string, expectedRootHash common.Hash, quit <-chan struct{}) error { @@ -351,14 +357,14 @@ func (p *HashPromoter) Unwind(s *StageState, u *UnwindState, storage bool, index return nil } -func incrementIntermediateHashes(s *StageState, db ethdb.Database, from, to uint64, datadir string, expectedRootHash common.Hash, quit <-chan struct{}) error { +func incrementIntermediateHashes(s *StageState, db ethdb.Database, to uint64, datadir string, expectedRootHash common.Hash, quit <-chan struct{}) error { p := NewHashPromoter(db, quit) p.TempDir = datadir r := NewReceiver(quit) - if err := p.Promote(s, from, to, false /* storage */, 0x01, r); err != nil { + if err := p.Promote(s, s.BlockNumber, to, false /* storage */, 0x01, r); err != nil { return err } - if err := p.Promote(s, from, to, true /* storage */, 0x02, r); err != nil { + if err := p.Promote(s, s.BlockNumber, to, true /* storage */, 0x02, r); err != nil { return err } for ks, acc := range r.accountMap { @@ -416,6 +422,23 @@ func incrementIntermediateHashes(s *StageState, db ethdb.Database, from, to uint } func UnwindIntermediateHashesStage(u *UnwindState, s *StageState, db ethdb.Database, datadir string, quit <-chan struct{}) error { + fromScratch := u.UnwindPoint == 0 || u.WasInterrupted() + if fromScratch { + if err := ResetHashState(db); err != nil { + return err + } + + to, err := s.ExecutionAt(db) + if err != nil { + return err + } + if err := promoteHashedStateCleanly(s, db, to, datadir, quit); err != nil { + return err + } + + return u.Done(db) + } + hash := rawdb.ReadCanonicalHash(db, u.UnwindPoint) syncHeadHeader := rawdb.ReadHeader(db, hash, u.UnwindPoint) expectedRootHash := syncHeadHeader.Root @@ -489,3 +512,30 @@ func unwindIntermediateHashesStageImpl(u *UnwindState, s *StageState, db ethdb.D } return nil } + +func ResetHashState(db ethdb.Database) error { + if err := db.(ethdb.NonTransactional).ClearBuckets( + dbutils.CurrentStateBucket, + dbutils.ContractCodeBucket, + dbutils.IntermediateTrieHashBucket, + ); err != nil { + return err + } + batch := db.NewBatch() + if err := stages.SaveStageProgress(batch, stages.IntermediateHashes, 0, nil); err != nil { + return err + } + if err := stages.SaveStageProgress(batch, stages.HashState, 0, nil); err != nil { + return err + } + if err := stages.SaveStageUnwind(batch, stages.IntermediateHashes, 0, nil); err != nil { + return err + } + if err := stages.SaveStageUnwind(batch, stages.HashState, 0, nil); err != nil { + return err + } + if _, err := batch.Commit(); err != nil { + return err + } + return nil +} diff --git a/eth/stagedsync/unwind.go b/eth/stagedsync/unwind.go index 6f83d3f5c..ae401af20 100644 --- a/eth/stagedsync/unwind.go +++ b/eth/stagedsync/unwind.go @@ -31,6 +31,10 @@ func (u *UnwindState) Skip(db ethdb.Putter) error { return stages.SaveStageUnwind(db, u.Stage, 0, nil) } +func (u *UnwindState) WasInterrupted() bool { + return len(u.StageData) > 0 +} + type PersistentUnwindStack struct { unwindStack []UnwindState } diff --git a/ethdb/interface.go b/ethdb/interface.go index e563e3d40..5a618dbf4 100644 --- a/ethdb/interface.go +++ b/ethdb/interface.go @@ -115,4 +115,8 @@ type HasNetInterface interface { DB() Database } +type NonTransactional interface { + ClearBuckets(buckets ...[]byte) error +} + var errNotSupported = errors.New("not supported")