From 48733c31cd4fd2213017d214eeac6605ba43fecb Mon Sep 17 00:00:00 2001 From: b00ris Date: Thu, 13 May 2021 23:04:47 +0300 Subject: [PATCH] test fix (#1930) --- ethdb/kv_snapshot.go | 5 +- turbo/snapshotsync/snapshot_builder.go | 3 +- turbo/snapshotsync/snapshot_builder_test.go | 116 ++++++++++++-------- 3 files changed, 76 insertions(+), 48 deletions(-) diff --git a/ethdb/kv_snapshot.go b/ethdb/kv_snapshot.go index 11c06b9ae..2bf90be10 100644 --- a/ethdb/kv_snapshot.go +++ b/ethdb/kv_snapshot.go @@ -23,10 +23,13 @@ var ( type SnapshotUpdater interface { UpdateSnapshots(buckets []string, snapshotKV RoKV, done chan struct{}) - WriteDB() RwKV SnapshotKV(bucket string) RoKV } +type WriteDB interface { + WriteDB() RwKV +} + func NewSnapshotKV() snapshotOpts { return snapshotOpts{} } diff --git a/turbo/snapshotsync/snapshot_builder.go b/turbo/snapshotsync/snapshot_builder.go index 6b2f04e67..a07026a54 100644 --- a/turbo/snapshotsync/snapshot_builder.go +++ b/turbo/snapshotsync/snapshot_builder.go @@ -432,7 +432,8 @@ func RemoveHeadersData(db ethdb.Database, tx ethdb.RwTx, currentSnapshot, newSna } snapshotDB := ethdb.NewObjectDatabase(headerSnapshot.(ethdb.RwKV)) - c, err := tx.RwCursor(dbutils.HeadersBucket) + writeTX := tx.(ethdb.DBTX).DBTX() + c, err := writeTX.RwCursor(dbutils.HeadersBucket) if err != nil { return fmt.Errorf("get headers cursor %w", err) } diff --git a/turbo/snapshotsync/snapshot_builder_test.go b/turbo/snapshotsync/snapshot_builder_test.go index e3a9dc104..a3c67f3ca 100644 --- a/turbo/snapshotsync/snapshot_builder_test.go +++ b/turbo/snapshotsync/snapshot_builder_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/binary" "errors" + "github.com/stretchr/testify/require" "io/ioutil" "math" "os" @@ -18,7 +19,6 @@ import ( ) func TestSnapshotMigratorStage(t *testing.T) { - t.Skip("Cannot fix this") //log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) dir, err := ioutil.TempDir(os.TempDir(), "tst") if err != nil { @@ -26,7 +26,11 @@ func TestSnapshotMigratorStage(t *testing.T) { } defer func() { - if err = os.RemoveAll(dir); err != nil { + if err != nil { + t.Log(err, dir) + } + err = os.RemoveAll(dir) + if err != nil { t.Log(err) } @@ -48,67 +52,78 @@ func TestSnapshotMigratorStage(t *testing.T) { sb := &SnapshotMigrator{ snapshotsDir: snapshotsDir, } - generateChan := make(chan int) currentSnapshotBlock := uint64(10) - tx, err := db.Begin(context.Background(), ethdb.RW) + tx, err := db.RwKV().BeginRw(context.Background()) if err != nil { - t.Fatal(err) + t.Error(err) + panic(err) } defer tx.Rollback() err = GenerateHeaderData(tx, 0, 11) if err != nil { - t.Fatal(err) + t.Error(err) + panic(err) } err = tx.Commit() if err != nil { - t.Fatal(err) + t.Error(err) + panic(err) } - - for !sb.Finished(10) { - tx, err := db.Begin(context.Background(), ethdb.RW) - if err != nil { - tx.Rollback() - t.Error(err) - } - var dbtx ethdb.RwTx - if hasTx, ok := tx.(ethdb.HasTx); ok { - dbtx = hasTx.Tx().(ethdb.RwTx) - } - - select { - case newHeight := <-generateChan: - err = GenerateHeaderData(tx, int(currentSnapshotBlock), newHeight) + generateChan := make(chan int) + go func() { + //this gorutine emulates staged sync. + for { + tx, err := db.RwKV().BeginRw(context.Background()) if err != nil { tx.Rollback() - t.Fatal(err) + t.Error(err) } - currentSnapshotBlock = CalculateEpoch(uint64(newHeight), 10) - default: - } + select { + case newHeight := <-generateChan: + err = GenerateHeaderData(tx, int(currentSnapshotBlock), newHeight) + if err != nil { + t.Error(err) + tx.Rollback() + } + currentSnapshotBlock = CalculateEpoch(uint64(newHeight), 10) + default: - err = sb.Migrate(db, dbtx, currentSnapshotBlock, btCli) - if err != nil { + } + + err = sb.Migrate(db, tx, currentSnapshotBlock, btCli) + if err != nil { + tx.Rollback() + t.Error(err) + panic(err) + } + + err = tx.Commit() + if err != nil { + t.Error(err) + panic(err) + } tx.Rollback() - t.Fatal(err) + time.Sleep(time.Second) } - err = tx.Commit() - if err != nil { - t.Fatal(err) - } - tx.Rollback() + }() + + for !(sb.Finished(10)) { + time.Sleep(time.Second) } - sa := db.RwKV().(ethdb.SnapshotUpdater) - wodb := ethdb.NewObjectDatabase(sa.WriteDB()) - + sa := db.RwKV().(ethdb.WriteDB) + rotx, err := sa.WriteDB().BeginRo(context.Background()) + require.NoError(t, err) + defer rotx.Rollback() + roc, err := rotx.Cursor(dbutils.HeadersBucket) + require.NoError(t, err) var headerNumber uint64 headerNumber = 11 - err = wodb.Walk(dbutils.HeadersBucket, []byte{}, 0, func(k, v []byte) (bool, error) { - if !bytes.Equal(k, dbutils.HeaderKey(headerNumber, common.Hash{uint8(headerNumber)})) { - t.Errorf("wrong header key: %x, expected %x", k, dbutils.HeaderKey(headerNumber, common.Hash{uint8(headerNumber)})) - } + + err = ethdb.Walk(roc, []byte{}, 0, func(k, v []byte) (bool, error) { + require.Equal(t, dbutils.HeaderKey(headerNumber, common.Hash{uint8(headerNumber)}), k) headerNumber++ return true, nil }) @@ -116,10 +131,10 @@ func TestSnapshotMigratorStage(t *testing.T) { t.Fatal(err) } if headerNumber != 12 { - t.Fatalf("wrong number of headers: %d, expected %d", headerNumber, 12) + t.Fatal(headerNumber) } - snodb := ethdb.NewObjectDatabase(sa.SnapshotKV(dbutils.HeadersBucket).(ethdb.RwKV)) + snodb := ethdb.NewObjectDatabase(db.RwKV().(ethdb.SnapshotUpdater).SnapshotKV(dbutils.HeadersBucket).(ethdb.RwKV)) headerNumber = 0 err = snodb.Walk(dbutils.HeadersBucket, []byte{}, 0, func(k, v []byte) (bool, error) { if !bytes.Equal(k, dbutils.HeaderKey(headerNumber, common.Hash{uint8(headerNumber)})) { @@ -199,7 +214,13 @@ func TestSnapshotMigratorStage(t *testing.T) { t.Fatal("it's not possible to close db without rollback. something went wrong") } - err = wodb.Walk(dbutils.HeadersBucket, []byte{}, 0, func(k, v []byte) (bool, error) { + rotx, err = sa.WriteDB().BeginRo(context.Background()) + require.NoError(t, err) + defer rotx.Rollback() + roc, err = rotx.Cursor(dbutils.HeadersBucket) + require.NoError(t, err) + + err = ethdb.Walk(roc, []byte{}, 0, func(k, v []byte) (bool, error) { t.Fatal("main db must be empty here") return true, nil }) @@ -208,7 +229,7 @@ func TestSnapshotMigratorStage(t *testing.T) { } headerNumber = 0 - err = ethdb.NewObjectDatabase(sa.SnapshotKV(dbutils.HeadersBucket).(ethdb.RwKV)).Walk(dbutils.HeadersBucket, []byte{}, 0, func(k, v []byte) (bool, error) { + err = ethdb.NewObjectDatabase(db.RwKV().(ethdb.SnapshotUpdater).SnapshotKV(dbutils.HeadersBucket).(ethdb.RwKV)).Walk(dbutils.HeadersBucket, []byte{}, 0, func(k, v []byte) (bool, error) { if !bytes.Equal(k, dbutils.HeaderKey(headerNumber, common.Hash{uint8(headerNumber)})) { t.Fatal(k) } @@ -261,11 +282,14 @@ func TestSnapshotMigratorStage(t *testing.T) { if _, err = os.Stat(SnapshotName(snapshotsDir, "headers", 10)); os.IsExist(err) { t.Fatal("snapshot exsists") + } else { + //just not to confuse defer + err = nil } } -func GenerateHeaderData(tx ethdb.DbWithPendingMutations, from, to int) error { +func GenerateHeaderData(tx ethdb.RwTx, from, to int) error { var err error if to > math.MaxInt8 { return errors.New("greater than uint8")