From e943a73ceb08729c7a8a013fa6c4ae81bf4718fe Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Tue, 26 Jan 2021 12:19:53 +0700 Subject: [PATCH] improve integration (#1456) --- cmd/integration/commands/state_stages.go | 184 ++++++++++++++++++----- eth/stagedsync/state.go | 11 ++ 2 files changed, 161 insertions(+), 34 deletions(-) diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index be515883d..ce722dc83 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -5,13 +5,14 @@ import ( "context" "errors" "fmt" + "path" "sort" "github.com/c2h5oh/datasize" "github.com/ledgerwatch/turbo-geth/cmd/utils" - "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/changeset" "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/common/etl" "github.com/ledgerwatch/turbo-geth/core/state" "github.com/ledgerwatch/turbo-geth/eth/stagedsync" "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" @@ -51,6 +52,22 @@ var stateStags = &cobra.Command{ }, } +var loopIhCmd = &cobra.Command{ + Use: "loop_ih", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := utils.RootContext() + db := openDatabase(chaindata, true) + defer db.Close() + + if err := loopIh(db, ctx); err != nil { + log.Error("Error", "err", err) + return err + } + + return nil + }, +} + func init() { withChaindata(stateStags) withReferenceChaindata(stateStags) @@ -60,13 +77,19 @@ func init() { withBatchSize(stateStags) rootCmd.AddCommand(stateStags) + + withChaindata(loopIhCmd) + withBatchSize(loopIhCmd) + + rootCmd.AddCommand(loopIhCmd) } func syncBySmallSteps(db ethdb.Database, ctx context.Context) error { - sm, err := ethdb.GetStorageModeFromDB(db) - if err != nil { - panic(err) + sm, err1 := ethdb.GetStorageModeFromDB(db) + if err1 != nil { + panic(err1) } + must(clearUnwindStack(db, ctx)) ch := ctx.Done() @@ -95,24 +118,31 @@ func syncBySmallSteps(db ethdb.Database, ctx context.Context) error { defer bc.Stop() cc.SetDB(tx) - tx, err = tx.Begin(ctx, ethdb.RW) - if err != nil { - return err + tx, err1 = tx.Begin(ctx, ethdb.RW) + if err1 != nil { + return err1 } st.DisableStages(stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders) _ = st.SetCurrentStage(stages.Execution) - senderStageProgress := progress(stages.Senders).BlockNumber + senderAtBlock := progress(stages.Senders).BlockNumber + execAtBlock := progress(stages.Execution).BlockNumber - var stopAt = senderStageProgress - if block > 0 && block < senderStageProgress { + var stopAt = senderAtBlock + onlyOneUnwind := block == 0 && unwindEvery == 0 && unwind > 0 + backward := unwindEvery < unwind + if onlyOneUnwind { + stopAt = progress(stages.Execution).BlockNumber - unwind + } else if block > 0 && block < senderAtBlock { stopAt = block + } else if backward { + stopAt = 1 } var batchSize datasize.ByteSize must(batchSize.UnmarshalText([]byte(batchSizeStr))) - for progress(stages.Execution).BlockNumber < stopAt || ((unwind <= unwindEvery) && unwind != 0) { + for (!backward && execAtBlock < stopAt) || (backward && execAtBlock > stopAt) { select { case <-ctx.Done(): return nil @@ -124,15 +154,26 @@ func syncBySmallSteps(db ethdb.Database, ctx context.Context) error { } // All stages forward to `execStage + unwindEvery` block - execAtBlock := progress(stages.Execution).BlockNumber + execAtBlock = progress(stages.Execution).BlockNumber execToBlock := block - if unwindEvery != 0 || unwind != 0 { - execToBlock = execAtBlock + unwindEvery + if unwindEvery > 0 || unwind > 0 { + if execAtBlock+unwindEvery > unwind { + execToBlock = execAtBlock + unwindEvery - unwind + } else { + break + } } - if execToBlock > stopAt { - execToBlock = stopAt + 1 - unwind = 0 + if backward { + if execToBlock < stopAt { + execToBlock = stopAt + } + } else { + if execToBlock > stopAt { + execToBlock = stopAt + 1 + unwind = 0 + } } + fmt.Printf("alex: %d\n", execToBlock) // set block limit of execute stage st.MockExecFunc(stages.Execution, func(stageState *stagedsync.StageState, unwinder stagedsync.Unwinder) error { @@ -154,19 +195,7 @@ func syncBySmallSteps(db ethdb.Database, ctx context.Context) error { if err := st.Run(db, tx); err != nil { return err } - - for blockN := range expectedAccountChanges { - if err := checkChangeSet(tx, blockN, expectedAccountChanges[blockN], expectedStorageChanges[blockN]); err != nil { - return err - } - delete(expectedAccountChanges, blockN) - delete(expectedStorageChanges, blockN) - } - - if err := checkHistory(tx, dbutils.AccountChangeSetBucket, execAtBlock); err != nil { - return err - } - if err := checkHistory(tx, dbutils.StorageChangeSetBucket, execAtBlock); err != nil { + if err := checkChanges(expectedAccountChanges, tx, expectedStorageChanges, execAtBlock); err != nil { return err } @@ -174,14 +203,17 @@ func syncBySmallSteps(db ethdb.Database, ctx context.Context) error { return err } + execAtBlock = progress(stages.Execution).BlockNumber + if execAtBlock == stopAt { + break + } + // Unwind all stages to `execStage - unwind` block if unwind == 0 { continue } - execStage := progress(stages.Execution) - to := execStage.BlockNumber - unwind - + to := execAtBlock - unwind if err := st.UnwindTo(to, tx); err != nil { return err } @@ -194,6 +226,90 @@ func syncBySmallSteps(db ethdb.Database, ctx context.Context) error { return nil } +func checkChanges(expectedAccountChanges map[uint64]*changeset.ChangeSet, db ethdb.Database, expectedStorageChanges map[uint64]*changeset.ChangeSet, execAtBlock uint64) error { + for blockN := range expectedAccountChanges { + if err := checkChangeSet(db, blockN, expectedAccountChanges[blockN], expectedStorageChanges[blockN]); err != nil { + return err + } + delete(expectedAccountChanges, blockN) + delete(expectedStorageChanges, blockN) + } + + if err := checkHistory(db, dbutils.AccountChangeSetBucket, execAtBlock); err != nil { + return err + } + if err := checkHistory(db, dbutils.StorageChangeSetBucket, execAtBlock); err != nil { + return err + } + return nil +} + +func loopIh(db ethdb.Database, ctx context.Context) error { + ch := ctx.Done() + var tx ethdb.DbWithPendingMutations = ethdb.NewTxDbWithoutTransaction(db, ethdb.RW) + defer tx.Rollback() + + cc, bc, st, progress := newSync(ch, db, tx, nil) + defer bc.Stop() + cc.SetDB(tx) + + var err error + tx, err = tx.Begin(ctx, ethdb.RW) + if err != nil { + return err + } + + _ = clearUnwindStack(tx, context.Background()) + st.DisableStages(stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders, stages.Execution, stages.AccountHistoryIndex, stages.StorageHistoryIndex, stages.TxPool, stages.TxLookup, stages.Finish) + if err = st.Run(db, tx); err != nil { + return err + } + execStage := progress(stages.HashState) + to := execStage.BlockNumber - 10 + _ = st.SetCurrentStage(stages.HashState) + u := &stagedsync.UnwindState{Stage: stages.HashState, UnwindPoint: to} + if err = stagedsync.UnwindHashStateStage(u, progress(stages.HashState), tx, path.Join(datadir, etl.TmpDirName), ch); err != nil { + return err + } + _ = st.SetCurrentStage(stages.IntermediateHashes) + u = &stagedsync.UnwindState{Stage: stages.IntermediateHashes, UnwindPoint: to} + if err = stagedsync.UnwindIntermediateHashesStage(u, progress(stages.IntermediateHashes), tx, path.Join(datadir, etl.TmpDirName), ch); err != nil { + return err + } + _ = clearUnwindStack(tx, context.Background()) + _ = tx.CommitAndBegin(context.Background()) + _ = printAllStages(tx, context.Background()) + + st.DisableStages(stages.IntermediateHashes) + _ = st.SetCurrentStage(stages.HashState) + if err = st.Run(db, tx); err != nil { + return err + } + _ = tx.CommitAndBegin(context.Background()) + _ = printAllStages(tx, context.Background()) + + st.DisableStages(stages.HashState) + st.EnableStages(stages.IntermediateHashes) + + for { + select { + case <-ctx.Done(): + return nil + default: + } + + _ = st.SetCurrentStage(stages.IntermediateHashes) + if err = st.Run(db, tx); err != nil { + return err + } + tx.Rollback() + tx, err = tx.Begin(ctx, ethdb.RW) + if err != nil { + return err + } + } +} + func checkChangeSet(db ethdb.Database, blockNum uint64, expectedAccountChanges *changeset.ChangeSet, expectedStorageChanges *changeset.ChangeSet) error { i := 0 sort.Sort(expectedAccountChanges) @@ -261,7 +377,7 @@ func checkHistory(db ethdb.Database, changeSetBucket string, blockNum uint64) er return false, innerErr } if !bm.Contains(uint32(blockN)) { - return false, fmt.Errorf("%v,%v", blockN, common.Bytes2Hex(k)) + return false, fmt.Errorf("checkHistory failed: block=%d,addr=%x", blockN, k) } return true, nil }); err != nil { diff --git a/eth/stagedsync/state.go b/eth/stagedsync/state.go index 64c5b34cb..f5a6c107f 100644 --- a/eth/stagedsync/state.go +++ b/eth/stagedsync/state.go @@ -279,6 +279,17 @@ func (s *State) DisableStages(ids ...stages.SyncStage) { } } +func (s *State) EnableStages(ids ...stages.SyncStage) { + for i := range s.stages { + for _, id := range ids { + if !bytes.Equal(s.stages[i].ID, id) { + continue + } + s.stages[i].Disabled = false + } + } +} + func (s *State) MockExecFunc(id stages.SyncStage, f ExecFunc) { for i := range s.stages { if bytes.Equal(s.stages[i].ID, id) {