Add commands "compare_states", add command "stage9", fix unwind of stage9 (it walked over whole db) (#722)

* ./cmd/integration compare_states, ./cmd/integration stage9

* TxLookup fix unwind - it walked over all db
This commit is contained in:
Alex Sharov 2020-07-07 17:07:14 +07:00 committed by GitHub
parent fb933bc9e3
commit 495f95f688
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 173 additions and 38 deletions

View File

@ -5,11 +5,29 @@ import (
"context"
"fmt"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/spf13/cobra"
)
var stateBuckets = [][]byte{
dbutils.CurrentStateBucket,
dbutils.AccountChangeSetBucket,
dbutils.StorageChangeSetBucket,
dbutils.ContractCodeBucket,
dbutils.PlainStateBucket,
dbutils.PlainAccountChangeSetBucket,
dbutils.PlainStorageChangeSetBucket,
dbutils.PlainContractCodeBucket,
dbutils.IncarnationMapBucket,
dbutils.CodeBucket,
dbutils.IntermediateTrieHashBucket,
dbutils.AccountsHistoryBucket,
dbutils.StorageHistoryBucket,
dbutils.TxLookupPrefix,
}
var cmdCompareBucket = &cobra.Command{
Use: "compare_bucket",
Short: "compare bucket to the same bucket in '--reference_chaindata'",
@ -18,7 +36,24 @@ var cmdCompareBucket = &cobra.Command{
if referenceChaindata == "" {
referenceChaindata = chaindata + "-copy"
}
err := compareBucket(ctx, chaindata, referenceChaindata, bucket)
err := compareBucketBetweenDatabases(ctx, chaindata, referenceChaindata, bucket)
if err != nil {
log.Error(err.Error())
return err
}
return nil
},
}
var cmdCompareStates = &cobra.Command{
Use: "compare_states",
Short: "compare state buckets to buckets in '--reference_chaindata'",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := rootContext()
if referenceChaindata == "" {
referenceChaindata = chaindata + "-copy"
}
err := compareStates(ctx, chaindata, referenceChaindata)
if err != nil {
log.Error(err.Error())
return err
@ -33,47 +68,78 @@ func init() {
withBucket(cmdCompareBucket)
rootCmd.AddCommand(cmdCompareBucket)
withChaindata(cmdCompareStates)
withReferenceChaindata(cmdCompareStates)
withBucket(cmdCompareStates)
rootCmd.AddCommand(cmdCompareStates)
}
func compareBucket(ctx context.Context, chaindata string, referenceChaindata string, bucket string) error {
func compareStates(ctx context.Context, chaindata string, referenceChaindata string) error {
db := ethdb.MustOpen(chaindata)
defer db.Close()
refDB := ethdb.MustOpen(referenceChaindata)
defer refDB.Close()
tx, err := db.KV().Begin(context.Background(), false)
if err != nil {
if err := db.KV().View(context.Background(), func(tx ethdb.Tx) error {
if err := refDB.KV().View(context.Background(), func(refTX ethdb.Tx) error {
for _, bucket := range stateBuckets {
fmt.Printf("\nBucket: %s\n", bucket)
if err := compareBuckets(ctx, tx.Bucket(bucket), refTX.Bucket(bucket)); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
return nil
}); err != nil {
return err
}
defer tx.Rollback()
refTX, err := refDB.KV().Begin(context.Background(), false)
if err != nil {
return nil
}
func compareBucketBetweenDatabases(ctx context.Context, chaindata string, referenceChaindata string, bucket string) error {
db := ethdb.MustOpen(chaindata)
defer db.Close()
refDB := ethdb.MustOpen(referenceChaindata)
defer refDB.Close()
if err := db.KV().View(context.Background(), func(tx ethdb.Tx) error {
return refDB.KV().View(context.Background(), func(refTX ethdb.Tx) error {
return compareBuckets(ctx, tx.Bucket([]byte(bucket)), refTX.Bucket([]byte(bucket)))
})
}); err != nil {
return err
}
defer refTX.Rollback()
return nil
}
func compareBuckets(ctx context.Context, b ethdb.Bucket, refB ethdb.Bucket) error {
count := 0
c := tx.Bucket([]byte(bucket)).Cursor()
c := b.Cursor()
k, v, e := c.First()
if e != nil {
return e
}
refC := refTX.Bucket([]byte(bucket)).Cursor()
refC := refB.Cursor()
refK, refV, revErr := refC.First()
if revErr != nil {
return revErr
}
for k != nil || refK != nil {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
count++
if count%100000 == 0 {
if count%100_000 == 0 {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
fmt.Printf("Compared %d records\n", count)
}
if k == nil {

View File

@ -26,7 +26,7 @@ func withChaindata(cmd *cobra.Command) {
func withReferenceChaindata(cmd *cobra.Command) {
cmd.Flags().StringVar(&referenceChaindata, "reference_chaindata", "", "path to the 2nd (reference/etalon) db")
must(cmd.MarkFlagDirname("referenceChaindata"))
must(cmd.MarkFlagDirname("reference_chaindata"))
}
func withBlock(cmd *cobra.Command) {

View File

@ -51,6 +51,9 @@ func resetState(_ context.Context) error {
if err := resetHistory(db); err != nil {
return err
}
if err := resetTxLookup(db); err != nil {
return err
}
// set genesis after reset all buckets
if _, _, err := core.DefaultGenesisBlock().CommitGenesisState(db, false); err != nil {
@ -63,6 +66,7 @@ func resetState(_ context.Context) error {
}
return nil
}
func resetSenders(db *ethdb.ObjectDatabase) error {
if err := db.ClearBuckets(
dbutils.Senders,
@ -124,6 +128,18 @@ func resetHistory(db *ethdb.ObjectDatabase) error {
return nil
}
func resetTxLookup(db *ethdb.ObjectDatabase) error {
if err := db.ClearBuckets(
dbutils.TxLookupPrefix,
); err != nil {
return err
}
if err := stages.SaveStageProgress(db, stages.TxLookup, 0, nil); err != nil {
return err
}
return nil
}
func printStages(db *ethdb.ObjectDatabase) error {
var err error
var progress uint64

View File

@ -81,6 +81,18 @@ var cmdStage78 = &cobra.Command{
},
}
var cmdStage9 = &cobra.Command{
Use: "stage9",
Short: "",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := rootContext()
if err := stage9(ctx); err != nil {
log.Error("Error", "err", err)
return err
}
return nil
},
}
var cmdPrintStages = &cobra.Command{
Use: "print_stages",
Short: "",
@ -95,6 +107,9 @@ var cmdPrintStages = &cobra.Command{
}
func init() {
withChaindata(cmdPrintStages)
rootCmd.AddCommand(cmdPrintStages)
withChaindata(cmdStage3)
withReset(cmdStage3)
withBlock(cmdStage3)
@ -130,8 +145,12 @@ func init() {
rootCmd.AddCommand(cmdStage78)
withChaindata(cmdPrintStages)
rootCmd.AddCommand(cmdPrintStages)
withChaindata(cmdStage9)
withReset(cmdStage9)
withBlock(cmdStage9)
withUnwind(cmdStage9)
rootCmd.AddCommand(cmdStage9)
}
func stage3(_ context.Context) error {
@ -271,6 +290,29 @@ func stage78(ctx context.Context) error {
return nil
}
func stage9(ctx context.Context) error {
core.UsePlainStateExecution = true
db := ethdb.MustOpen(chaindata)
defer db.Close()
if reset {
if err := resetTxLookup(db); err != nil {
return err
}
}
stage9 := progress(db, stages.TxLookup)
log.Info("Stage9", "progress", stage9.BlockNumber)
ch := ctx.Done()
if unwind > 0 {
u := &stagedsync.UnwindState{Stage: stages.TxLookup, UnwindPoint: stage9.BlockNumber - unwind}
return stagedsync.UnwindTxLookup(u, db, "", ch)
}
return stagedsync.SpawnTxLookup(stage9, db, "", ch)
}
func printAllStages(_ context.Context) error {
db := ethdb.MustOpen(chaindata)
defer db.Close()

View File

@ -104,6 +104,13 @@ func syncBySmallSteps(ctx context.Context, chaindata string) error {
}
}
{
stage9 := progress(db, stages.TxLookup)
if err = stagedsync.SpawnTxLookup(stage9, db, "", ch); err != nil {
return fmt.Errorf("spawnTxLookup: %w", err)
}
}
// Unwind all stages to `execStage - unwind` block
if unwind == 0 {
continue
@ -111,6 +118,13 @@ func syncBySmallSteps(ctx context.Context, chaindata string) error {
execStage := progress(db, stages.Execution)
to := execStage.BlockNumber - unwind
{
u := &stagedsync.UnwindState{Stage: stages.TxLookup, UnwindPoint: to}
if err = stagedsync.UnwindTxLookup(u, db, "", ch); err != nil {
return fmt.Errorf("unwindTxLookup: %w", err)
}
}
{
u := &stagedsync.UnwindState{Stage: stages.StorageHistoryIndex, UnwindPoint: to}
if err = stagedsync.UnwindStorageHistoryIndex(u, db, ch); err != nil {

View File

@ -4,6 +4,8 @@ import (
"bytes"
"encoding/binary"
"fmt"
"math/big"
"github.com/golang/snappy"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
@ -13,10 +15,9 @@ import (
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/rlp"
"math/big"
)
func spawnTxLookup(s *StageState, db ethdb.Database, dataDir string, quitCh chan struct{}) error {
func SpawnTxLookup(s *StageState, db ethdb.Database, dataDir string, quitCh <-chan struct{}) error {
var blockNum uint64
var startKey []byte
@ -37,7 +38,7 @@ func spawnTxLookup(s *StageState, db ethdb.Database, dataDir string, quitCh chan
return s.DoneAndUpdate(db, syncHeadNumber)
}
func TxLookupTransform(db ethdb.Database, startKey, endKey []byte, quitCh chan struct{}, datadir string) error {
func TxLookupTransform(db ethdb.Database, startKey, endKey []byte, quitCh <-chan struct{}, datadir string) error {
return etl.Transform(db, dbutils.HeaderPrefix, dbutils.TxLookupPrefix, datadir, func(k []byte, v []byte, next etl.ExtractNextFunc) error {
if !dbutils.CheckCanonicalKey(k) {
return nil
@ -63,10 +64,10 @@ func TxLookupTransform(db ethdb.Database, startKey, endKey []byte, quitCh chan s
})
}
func unwindTxLookup(u *UnwindState, db ethdb.Database, quitCh chan struct{}) error {
var txsToRemove [][]byte
func UnwindTxLookup(u *UnwindState, db ethdb.Database, datadir string, quitCh <-chan struct{}) error {
collector := etl.NewCollector(datadir, etl.NewSortableBuffer(etl.BufferOptimalSize))
// Remove lookup entries for all blocks above unwindPoint
if err := db.Walk(dbutils.BlockBodyPrefix, dbutils.EncodeBlockNumber(u.UnwindPoint+1), 0, func(k, v []byte) (b bool, e error) {
if err := db.Walk(dbutils.BlockBodyPrefix, dbutils.EncodeBlockNumber(u.UnwindPoint+1), 8*8, func(k, v []byte) (b bool, e error) {
if err := common.Stopped(quitCh); err != nil {
return false, err
}
@ -83,21 +84,17 @@ func unwindTxLookup(u *UnwindState, db ethdb.Database, quitCh chan struct{}) err
return false, fmt.Errorf("unwindTxLookup, rlp decode err: %w", err)
}
for _, tx := range body.Transactions {
txsToRemove = append(txsToRemove, tx.Hash().Bytes())
if err := collector.Collect(tx.Hash().Bytes(), nil); err != nil {
return false, err
}
}
return true, nil
}); err != nil {
return err
}
// TODO: Do it in a batcn and update the progress
for _, v := range txsToRemove {
if err := db.Delete(dbutils.TxLookupPrefix, v); err != nil {
return err
}
if err := collector.Load(db, dbutils.TxLookupPrefix, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quitCh}); err != nil {
return err
}
if err := u.Done(db); err != nil {
return fmt.Errorf("unwind TxLookup: %w", err)
}
return nil
return u.Done(db)
}

View File

@ -130,10 +130,10 @@ func PrepareStagedSync(
Disabled: !storageMode.TxIndex,
DisabledDescription: "Enable by adding `t` to --storage-mode",
ExecFunc: func(s *StageState, u Unwinder) error {
return spawnTxLookup(s, stateDB, datadir, quitCh)
return SpawnTxLookup(s, stateDB, datadir, quitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return unwindTxLookup(u, stateDB, quitCh)
return UnwindTxLookup(u, stateDB, datadir, quitCh)
},
},
}