erigon-pulse/cmd/integration/commands/refetence_db.go

457 lines
9.5 KiB
Go
Raw Normal View History

2020-07-14 01:56:29 +00:00
package commands
import (
2020-10-28 03:18:10 +00:00
"bufio"
"bytes"
"context"
"fmt"
2020-10-28 03:18:10 +00:00
"os"
"strings"
"time"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/ethdb/kv"
"github.com/ledgerwatch/erigon/ethdb/mdbx"
"github.com/ledgerwatch/erigon/log"
"github.com/spf13/cobra"
)
2020-08-10 23:55:32 +00:00
var stateBuckets = []string{
dbutils.HashedAccountsBucket,
dbutils.HashedStorageBucket,
dbutils.ContractCodeBucket,
dbutils.PlainStateBucket,
dbutils.AccountChangeSetBucket,
dbutils.StorageChangeSetBucket,
dbutils.PlainContractCodeBucket,
dbutils.IncarnationMapBucket,
dbutils.CodeBucket,
dbutils.TrieOfAccountsBucket,
dbutils.TrieOfStorageBucket,
dbutils.AccountsHistoryBucket,
dbutils.StorageHistoryBucket,
dbutils.TxLookupPrefix,
dbutils.ContractTEVMCodeBucket,
}
var cmdCompareBucket = &cobra.Command{
Use: "compare_bucket",
Short: "compare bucket to the same bucket in '--chaindata.reference'",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, _ := utils.RootContext()
if referenceChaindata == "" {
referenceChaindata = chaindata + "-copy"
}
err := compareBucketBetweenDatabases(ctx, chaindata, referenceChaindata, bucket)
if err != nil {
log.Error(err.Error())
return err
}
return nil
},
}
var cmdCompareStates = &cobra.Command{
Use: "compare_states",
Short: "compare state buckets to buckets in '--chaindata.reference'",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, _ := utils.RootContext()
if referenceChaindata == "" {
referenceChaindata = chaindata + "-copy"
}
err := compareStates(ctx, chaindata, referenceChaindata)
if err != nil {
log.Error(err.Error())
return err
}
return nil
},
}
2021-03-02 02:52:05 +00:00
var cmdMdbxToMdbx = &cobra.Command{
Use: "mdbx_to_mdbx",
Short: "copy data from '--chaindata' to '--chaindata.to'",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, _ := utils.RootContext()
2021-03-02 02:52:05 +00:00
err := mdbxToMdbx(ctx, chaindata, toChaindata)
if err != nil {
log.Error(err.Error())
return err
}
return nil
},
}
2020-10-28 03:18:10 +00:00
var cmdFToMdbx = &cobra.Command{
Use: "f_to_mdbx",
2021-03-02 02:52:05 +00:00
Short: "copy data from '--chaindata' to '--chaindata.to'",
2020-10-28 03:18:10 +00:00
RunE: func(cmd *cobra.Command, args []string) error {
ctx, _ := utils.RootContext()
2020-10-28 03:18:10 +00:00
err := fToMdbx(ctx, toChaindata)
if err != nil {
log.Error(err.Error())
return err
}
return nil
},
}
func init() {
withDatadir(cmdCompareBucket)
withReferenceChaindata(cmdCompareBucket)
withBucket(cmdCompareBucket)
rootCmd.AddCommand(cmdCompareBucket)
withDatadir(cmdCompareStates)
withReferenceChaindata(cmdCompareStates)
withBucket(cmdCompareStates)
rootCmd.AddCommand(cmdCompareStates)
2020-10-28 03:18:10 +00:00
withDatadir(cmdMdbxToMdbx)
2021-03-02 02:52:05 +00:00
withToChaindata(cmdMdbxToMdbx)
withBucket(cmdMdbxToMdbx)
rootCmd.AddCommand(cmdMdbxToMdbx)
2020-10-28 03:18:10 +00:00
withToChaindata(cmdFToMdbx)
withFile(cmdFToMdbx)
2020-10-28 03:18:10 +00:00
withBucket(cmdFToMdbx)
rootCmd.AddCommand(cmdFToMdbx)
}
func compareStates(ctx context.Context, chaindata string, referenceChaindata string) error {
db := kv.MustOpen(chaindata)
defer db.Close()
refDB := kv.MustOpen(referenceChaindata)
defer refDB.Close()
2021-03-30 09:53:54 +00:00
if err := db.RwKV().View(context.Background(), func(tx ethdb.Tx) error {
if err := refDB.RwKV().View(context.Background(), func(refTX ethdb.Tx) error {
for _, bucket := range stateBuckets {
fmt.Printf("\nBucket: %s\n", bucket)
if err := compareBuckets(ctx, tx, bucket, refTX, bucket); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
return nil
}); err != nil {
return err
}
return nil
}
func compareBucketBetweenDatabases(ctx context.Context, chaindata string, referenceChaindata string, bucket string) error {
db := kv.MustOpen(chaindata)
defer db.Close()
refDB := kv.MustOpen(referenceChaindata)
defer refDB.Close()
2021-03-30 09:53:54 +00:00
if err := db.RwKV().View(context.Background(), func(tx ethdb.Tx) error {
return refDB.RwKV().View(context.Background(), func(refTX ethdb.Tx) error {
return compareBuckets(ctx, tx, bucket, refTX, bucket)
})
}); err != nil {
return err
}
return nil
}
func compareBuckets(ctx context.Context, tx ethdb.Tx, b string, refTx ethdb.Tx, refB string) error {
count := 0
c, err := tx.Cursor(b)
if err != nil {
return err
}
k, v, e := c.First()
if e != nil {
return e
}
refC, err := refTx.Cursor(refB)
if err != nil {
return err
}
refK, refV, revErr := refC.First()
if revErr != nil {
return revErr
}
for k != nil || refK != nil {
count++
if count%10_000_000 == 0 {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
fmt.Printf("Compared %d records\n", count)
}
if k == nil {
fmt.Printf("Missing in db: %x [%x]\n", refK, refV)
refK, refV, revErr = refC.Next()
if revErr != nil {
return revErr
}
} else if refK == nil {
fmt.Printf("Missing refDB: %x [%x]\n", k, v)
k, v, e = c.Next()
if e != nil {
return e
}
} else {
switch bytes.Compare(k, refK) {
case -1:
fmt.Printf("Missing refDB: %x [%x]\n", k, v)
k, v, e = c.Next()
if e != nil {
return e
}
case 1:
fmt.Printf("Missing in db: %x [%x]\n", refK, refV)
refK, refV, revErr = refC.Next()
if revErr != nil {
return revErr
}
case 0:
if !bytes.Equal(v, refV) {
fmt.Printf("Different values for %x. db: [%x], refDB: [%x]\n", k, v, refV)
}
k, v, e = c.Next()
if e != nil {
return e
}
refK, refV, revErr = refC.Next()
if revErr != nil {
return revErr
}
default:
fmt.Printf("Unexpected result of bytes.Compare: %d\n", bytes.Compare(k, refK))
}
}
}
return nil
}
2020-10-28 03:18:10 +00:00
func fToMdbx(ctx context.Context, to string) error {
file, err := os.Open(file)
2020-10-28 03:18:10 +00:00
if err != nil {
panic(err)
2020-10-28 03:18:10 +00:00
}
defer file.Close()
dst := kv.NewMDBX().Path(to).MustOpen()
dstTx, err1 := dst.BeginRw(ctx)
2020-10-28 03:18:10 +00:00
if err1 != nil {
return err1
}
defer func() {
dstTx.Rollback()
}()
commitEvery := time.NewTicker(5 * time.Second)
2020-10-28 03:18:10 +00:00
defer commitEvery.Stop()
fileScanner := bufio.NewScanner(file)
endData := []byte("DATA=END")
endHeader := []byte("HEADER=END")
MainLoop:
for {
bucket := ""
for { // header
if !fileScanner.Scan() {
break
}
kk := fileScanner.Bytes()
if bytes.Equal(kk, endHeader) {
break
}
parts := strings.Split(string(kk), "=")
k, v := parts[0], parts[1]
if k == "database" {
bucket = v
}
}
err = fileScanner.Err()
if err != nil {
panic(err)
}
err = fileScanner.Err()
if err != nil {
panic(err)
}
if bucket == "" {
panic("bucket not parse")
2020-10-28 03:18:10 +00:00
}
c, err := dstTx.RwCursor(bucket)
if err != nil {
return err
}
for {
if !fileScanner.Scan() {
break MainLoop
}
k := common.CopyBytes(fileScanner.Bytes())
if bytes.Equal(k, endData) {
break
}
k = common.FromHex(string(k[1:]))
if !fileScanner.Scan() {
break MainLoop
}
v := common.CopyBytes(fileScanner.Bytes())
v = common.FromHex(string(v[1:]))
if casted, ok := c.(ethdb.RwCursorDupSort); ok {
2021-06-19 20:29:02 +00:00
if err = casted.AppendDup(k, v); err != nil {
panic(err)
}
} else {
if err = c.Append(k, v); err != nil {
panic(err)
}
}
select {
default:
case <-ctx.Done():
return ctx.Err()
case <-commitEvery.C:
log.Info("Progress", "bucket", bucket, "key", fmt.Sprintf("%x", k))
}
2020-10-28 03:18:10 +00:00
}
err = fileScanner.Err()
if err != nil {
panic(err)
}
}
2021-04-03 06:26:00 +00:00
err = dstTx.Commit()
if err != nil {
return err
2020-10-28 03:18:10 +00:00
}
dstTx, err = dst.BeginRw(ctx)
2020-10-28 03:18:10 +00:00
if err != nil {
return err
2020-10-28 03:18:10 +00:00
}
2021-04-03 06:26:00 +00:00
err = dstTx.Commit()
2020-10-28 03:18:10 +00:00
if err != nil {
return err
}
return nil
}
2021-03-02 02:52:05 +00:00
func mdbxToMdbx(ctx context.Context, from, to string) error {
_ = os.RemoveAll(to)
src := kv.NewMDBX().Path(from).Flags(func(flags uint) uint { return mdbx.Readonly | mdbx.Accede }).MustOpen()
dst := kv.NewMDBX().Path(to).MustOpen()
2021-03-02 02:52:05 +00:00
return kv2kv(ctx, src, dst)
}
2021-03-30 09:53:54 +00:00
func kv2kv(ctx context.Context, src, dst ethdb.RwKV) error {
2021-04-03 06:26:00 +00:00
srcTx, err1 := src.BeginRo(ctx)
2020-10-28 03:18:10 +00:00
if err1 != nil {
return err1
}
defer srcTx.Rollback()
dstTx, err1 := dst.BeginRw(ctx)
2020-10-28 03:18:10 +00:00
if err1 != nil {
return err1
}
defer func() {
dstTx.Rollback()
}()
commitEvery := time.NewTicker(30 * time.Second)
2020-10-28 03:18:10 +00:00
defer commitEvery.Stop()
for name, b := range src.AllBuckets() {
2020-10-28 03:18:10 +00:00
if b.IsDeprecated {
continue
}
c, err := dstTx.RwCursor(name)
if err != nil {
return err
}
srcC, err := srcTx.Cursor(name)
if err != nil {
return err
}
casted, isDupsort := c.(ethdb.RwCursorDupSort)
2021-01-28 02:37:24 +00:00
2020-10-28 03:18:10 +00:00
for k, v, err := srcC.First(); k != nil; k, v, err = srcC.Next() {
if err != nil {
return err
}
2021-01-28 02:37:24 +00:00
if isDupsort {
2021-06-19 20:29:02 +00:00
if err = casted.AppendDup(k, v); err != nil {
panic(err)
}
} else {
if err = c.Append(k, v); err != nil {
panic(err)
}
2020-10-28 03:18:10 +00:00
}
select {
case <-ctx.Done():
return ctx.Err()
2020-10-28 03:18:10 +00:00
case <-commitEvery.C:
log.Info("Progress", "bucket", name, "key", fmt.Sprintf("%x", k))
2021-04-03 06:26:00 +00:00
if err2 := dstTx.Commit(); err2 != nil {
2020-10-28 03:18:10 +00:00
return err2
}
dstTx, err = dst.BeginRw(ctx)
2020-10-28 03:18:10 +00:00
if err != nil {
return err
}
c, err = dstTx.RwCursor(name)
if err != nil {
return err
}
casted, isDupsort = c.(ethdb.RwCursorDupSort)
default:
2020-10-28 03:18:10 +00:00
}
}
2020-11-23 04:15:43 +00:00
// migrate bucket sequences to native mdbx implementation
//currentID, err := srcTx.Sequence(name, 0)
//if err != nil {
// return err
//}
//_, err = dstTx.Sequence(name, currentID)
//if err != nil {
// return err
//}
2020-10-28 03:18:10 +00:00
}
2021-04-03 06:26:00 +00:00
err := dstTx.Commit()
2020-10-28 03:18:10 +00:00
if err != nil {
return err
}
dstTx, err = dst.BeginRw(ctx)
if err != nil {
return err
}
2021-04-03 06:26:00 +00:00
err = dstTx.Commit()
if err != nil {
return err
}
2020-10-28 03:18:10 +00:00
srcTx.Rollback()
2021-03-02 02:52:05 +00:00
log.Info("done")
2020-10-28 03:18:10 +00:00
return nil
}