improve integration (#1456)

This commit is contained in:
Alex Sharov 2021-01-26 12:19:53 +07:00 committed by GitHub
parent 7f2ceca0b7
commit e943a73ceb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 161 additions and 34 deletions

View File

@ -5,13 +5,14 @@ import (
"context"
"errors"
"fmt"
"path"
"sort"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/turbo-geth/cmd/utils"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/changeset"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/common/etl"
"github.com/ledgerwatch/turbo-geth/core/state"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
@ -51,6 +52,22 @@ var stateStags = &cobra.Command{
},
}
var loopIhCmd = &cobra.Command{
Use: "loop_ih",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := utils.RootContext()
db := openDatabase(chaindata, true)
defer db.Close()
if err := loopIh(db, ctx); err != nil {
log.Error("Error", "err", err)
return err
}
return nil
},
}
func init() {
withChaindata(stateStags)
withReferenceChaindata(stateStags)
@ -60,13 +77,19 @@ func init() {
withBatchSize(stateStags)
rootCmd.AddCommand(stateStags)
withChaindata(loopIhCmd)
withBatchSize(loopIhCmd)
rootCmd.AddCommand(loopIhCmd)
}
func syncBySmallSteps(db ethdb.Database, ctx context.Context) error {
sm, err := ethdb.GetStorageModeFromDB(db)
if err != nil {
panic(err)
sm, err1 := ethdb.GetStorageModeFromDB(db)
if err1 != nil {
panic(err1)
}
must(clearUnwindStack(db, ctx))
ch := ctx.Done()
@ -95,24 +118,31 @@ func syncBySmallSteps(db ethdb.Database, ctx context.Context) error {
defer bc.Stop()
cc.SetDB(tx)
tx, err = tx.Begin(ctx, ethdb.RW)
if err != nil {
return err
tx, err1 = tx.Begin(ctx, ethdb.RW)
if err1 != nil {
return err1
}
st.DisableStages(stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders)
_ = st.SetCurrentStage(stages.Execution)
senderStageProgress := progress(stages.Senders).BlockNumber
senderAtBlock := progress(stages.Senders).BlockNumber
execAtBlock := progress(stages.Execution).BlockNumber
var stopAt = senderStageProgress
if block > 0 && block < senderStageProgress {
var stopAt = senderAtBlock
onlyOneUnwind := block == 0 && unwindEvery == 0 && unwind > 0
backward := unwindEvery < unwind
if onlyOneUnwind {
stopAt = progress(stages.Execution).BlockNumber - unwind
} else if block > 0 && block < senderAtBlock {
stopAt = block
} else if backward {
stopAt = 1
}
var batchSize datasize.ByteSize
must(batchSize.UnmarshalText([]byte(batchSizeStr)))
for progress(stages.Execution).BlockNumber < stopAt || ((unwind <= unwindEvery) && unwind != 0) {
for (!backward && execAtBlock < stopAt) || (backward && execAtBlock > stopAt) {
select {
case <-ctx.Done():
return nil
@ -124,15 +154,26 @@ func syncBySmallSteps(db ethdb.Database, ctx context.Context) error {
}
// All stages forward to `execStage + unwindEvery` block
execAtBlock := progress(stages.Execution).BlockNumber
execAtBlock = progress(stages.Execution).BlockNumber
execToBlock := block
if unwindEvery != 0 || unwind != 0 {
execToBlock = execAtBlock + unwindEvery
if unwindEvery > 0 || unwind > 0 {
if execAtBlock+unwindEvery > unwind {
execToBlock = execAtBlock + unwindEvery - unwind
} else {
break
}
}
if execToBlock > stopAt {
execToBlock = stopAt + 1
unwind = 0
if backward {
if execToBlock < stopAt {
execToBlock = stopAt
}
} else {
if execToBlock > stopAt {
execToBlock = stopAt + 1
unwind = 0
}
}
fmt.Printf("alex: %d\n", execToBlock)
// set block limit of execute stage
st.MockExecFunc(stages.Execution, func(stageState *stagedsync.StageState, unwinder stagedsync.Unwinder) error {
@ -154,19 +195,7 @@ func syncBySmallSteps(db ethdb.Database, ctx context.Context) error {
if err := st.Run(db, tx); err != nil {
return err
}
for blockN := range expectedAccountChanges {
if err := checkChangeSet(tx, blockN, expectedAccountChanges[blockN], expectedStorageChanges[blockN]); err != nil {
return err
}
delete(expectedAccountChanges, blockN)
delete(expectedStorageChanges, blockN)
}
if err := checkHistory(tx, dbutils.AccountChangeSetBucket, execAtBlock); err != nil {
return err
}
if err := checkHistory(tx, dbutils.StorageChangeSetBucket, execAtBlock); err != nil {
if err := checkChanges(expectedAccountChanges, tx, expectedStorageChanges, execAtBlock); err != nil {
return err
}
@ -174,14 +203,17 @@ func syncBySmallSteps(db ethdb.Database, ctx context.Context) error {
return err
}
execAtBlock = progress(stages.Execution).BlockNumber
if execAtBlock == stopAt {
break
}
// Unwind all stages to `execStage - unwind` block
if unwind == 0 {
continue
}
execStage := progress(stages.Execution)
to := execStage.BlockNumber - unwind
to := execAtBlock - unwind
if err := st.UnwindTo(to, tx); err != nil {
return err
}
@ -194,6 +226,90 @@ func syncBySmallSteps(db ethdb.Database, ctx context.Context) error {
return nil
}
func checkChanges(expectedAccountChanges map[uint64]*changeset.ChangeSet, db ethdb.Database, expectedStorageChanges map[uint64]*changeset.ChangeSet, execAtBlock uint64) error {
for blockN := range expectedAccountChanges {
if err := checkChangeSet(db, blockN, expectedAccountChanges[blockN], expectedStorageChanges[blockN]); err != nil {
return err
}
delete(expectedAccountChanges, blockN)
delete(expectedStorageChanges, blockN)
}
if err := checkHistory(db, dbutils.AccountChangeSetBucket, execAtBlock); err != nil {
return err
}
if err := checkHistory(db, dbutils.StorageChangeSetBucket, execAtBlock); err != nil {
return err
}
return nil
}
func loopIh(db ethdb.Database, ctx context.Context) error {
ch := ctx.Done()
var tx ethdb.DbWithPendingMutations = ethdb.NewTxDbWithoutTransaction(db, ethdb.RW)
defer tx.Rollback()
cc, bc, st, progress := newSync(ch, db, tx, nil)
defer bc.Stop()
cc.SetDB(tx)
var err error
tx, err = tx.Begin(ctx, ethdb.RW)
if err != nil {
return err
}
_ = clearUnwindStack(tx, context.Background())
st.DisableStages(stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders, stages.Execution, stages.AccountHistoryIndex, stages.StorageHistoryIndex, stages.TxPool, stages.TxLookup, stages.Finish)
if err = st.Run(db, tx); err != nil {
return err
}
execStage := progress(stages.HashState)
to := execStage.BlockNumber - 10
_ = st.SetCurrentStage(stages.HashState)
u := &stagedsync.UnwindState{Stage: stages.HashState, UnwindPoint: to}
if err = stagedsync.UnwindHashStateStage(u, progress(stages.HashState), tx, path.Join(datadir, etl.TmpDirName), ch); err != nil {
return err
}
_ = st.SetCurrentStage(stages.IntermediateHashes)
u = &stagedsync.UnwindState{Stage: stages.IntermediateHashes, UnwindPoint: to}
if err = stagedsync.UnwindIntermediateHashesStage(u, progress(stages.IntermediateHashes), tx, path.Join(datadir, etl.TmpDirName), ch); err != nil {
return err
}
_ = clearUnwindStack(tx, context.Background())
_ = tx.CommitAndBegin(context.Background())
_ = printAllStages(tx, context.Background())
st.DisableStages(stages.IntermediateHashes)
_ = st.SetCurrentStage(stages.HashState)
if err = st.Run(db, tx); err != nil {
return err
}
_ = tx.CommitAndBegin(context.Background())
_ = printAllStages(tx, context.Background())
st.DisableStages(stages.HashState)
st.EnableStages(stages.IntermediateHashes)
for {
select {
case <-ctx.Done():
return nil
default:
}
_ = st.SetCurrentStage(stages.IntermediateHashes)
if err = st.Run(db, tx); err != nil {
return err
}
tx.Rollback()
tx, err = tx.Begin(ctx, ethdb.RW)
if err != nil {
return err
}
}
}
func checkChangeSet(db ethdb.Database, blockNum uint64, expectedAccountChanges *changeset.ChangeSet, expectedStorageChanges *changeset.ChangeSet) error {
i := 0
sort.Sort(expectedAccountChanges)
@ -261,7 +377,7 @@ func checkHistory(db ethdb.Database, changeSetBucket string, blockNum uint64) er
return false, innerErr
}
if !bm.Contains(uint32(blockN)) {
return false, fmt.Errorf("%v,%v", blockN, common.Bytes2Hex(k))
return false, fmt.Errorf("checkHistory failed: block=%d,addr=%x", blockN, k)
}
return true, nil
}); err != nil {

View File

@ -279,6 +279,17 @@ func (s *State) DisableStages(ids ...stages.SyncStage) {
}
}
func (s *State) EnableStages(ids ...stages.SyncStage) {
for i := range s.stages {
for _, id := range ids {
if !bytes.Equal(s.stages[i].ID, id) {
continue
}
s.stages[i].Disabled = false
}
}
}
func (s *State) MockExecFunc(id stages.SyncStage, f ExecFunc) {
for i := range s.stages {
if bytes.Equal(s.stages[i].ID, id) {