diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go index 1f743645e..579655ae5 100644 --- a/cmd/hack/hack.go +++ b/cmd/hack/hack.go @@ -1246,7 +1246,8 @@ func readAccount(chaindata string, account common.Address, block uint64, rewind var printed bool encodedTS := dbutils.EncodeTimestamp(timestamp) changeSetKey := dbutils.CompositeChangeSetKey(encodedTS, dbutils.StorageHistoryBucket) - v, err = ethDb.Get(dbutils.ChangeSetBucket, changeSetKey) + v, err = ethDb.Get(dbutils.StorageChangeSetBucket, changeSetKey) + check(err) if v != nil { err = changeset.StorageChangeSetBytes(v).Walk(func(key, value []byte) error { if bytes.HasPrefix(key, secKey) { diff --git a/cmd/state/stateless/state_snapshot.go b/cmd/state/stateless/state_snapshot.go index ea34a1070..1a68a3383 100644 --- a/cmd/state/stateless/state_snapshot.go +++ b/cmd/state/stateless/state_snapshot.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "fmt" + "github.com/ledgerwatch/turbo-geth/migrations" "os" "time" @@ -169,7 +170,7 @@ func newBucketWriter(db ethdb.Database, bucket []byte) *bucketWriter { } func copyDatabase(fromDB ethdb.Database, toDB ethdb.Database) error { - for _, bucket := range [][]byte{dbutils.AccountsBucket, dbutils.StorageBucket, dbutils.CodeBucket} { + for _, bucket := range [][]byte{dbutils.AccountsBucket, dbutils.StorageBucket, dbutils.CodeBucket, dbutils.DatabaseInfoBucket} { fmt.Printf(" - copying bucket '%s'...\n", string(bucket)) writer := newBucketWriter(toDB, bucket) @@ -207,6 +208,8 @@ func loadSnapshot(db ethdb.Database, filename string, createDb CreateDbFunc) { err = copyDatabase(diskDb, db) check(err) + err = migrations.NewMigrator().Apply(diskDb, false, false, false, false, false) + check(err) } func loadCodes(db *bolt.DB, codeDb ethdb.Database) error { diff --git a/cmd/state/stateless/state_snapshot_test.go b/cmd/state/stateless/state_snapshot_test.go index 274dfe668..a3543f085 100644 --- a/cmd/state/stateless/state_snapshot_test.go +++ b/cmd/state/stateless/state_snapshot_test.go @@ -95,6 +95,13 @@ func TestCopyDatabase(t *testing.T) { string(dbutils.CodeBucket): generateData(string(dbutils.CodeBucket)), }) + doTestcase(t, map[string]testData{ + string(dbutils.AccountsBucket): generateData(string(dbutils.AccountsBucket)), + string(dbutils.StorageBucket): generateData(string(dbutils.StorageBucket)), + string(dbutils.CodeBucket): generateData(string(dbutils.CodeBucket)), + string(dbutils.DatabaseInfoBucket): generateData(string(dbutils.DatabaseInfoBucket)), + }) + } func doTestcase(t *testing.T, testCase map[string]testData) { diff --git a/common/changeset/changeset.go b/common/changeset/changeset.go index 67db2f7cb..745083a6a 100644 --- a/common/changeset/changeset.go +++ b/common/changeset/changeset.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "github.com/ledgerwatch/turbo-geth/common" + "reflect" "sort" ) @@ -143,6 +144,10 @@ func (s *ChangeSet) ChangedKeys() map[string]struct{} { return m } +func (s *ChangeSet) Equals(s2 *ChangeSet) bool { + return reflect.DeepEqual(s.Changes, s2.Changes) +} + // Encoded Method func Len(b []byte) int { diff --git a/common/changeset/storage_changeset.go b/common/changeset/storage_changeset.go index 1b5d443b9..3467ad39c 100644 --- a/common/changeset/storage_changeset.go +++ b/common/changeset/storage_changeset.go @@ -93,7 +93,6 @@ func EncodeStorage(s *ChangeSet) ([]byte, error) { } byt := buf.Bytes() - fmt.Println("enc storage", len(byt), len(s.Changes)) return byt, nil } @@ -123,7 +122,7 @@ func DecodeStorage(b []byte) (*ChangeSet, error) { //parse not default incarnations incarnationsLength := len(b[incarnationPosition:]) - notDefaultIncarnation := make(map[uint32]uint64, 0) + notDefaultIncarnation := make(map[uint32]uint64) var ( id uint32 inc uint64 @@ -186,11 +185,10 @@ func (b StorageChangeSetBytes) Walk(f func(k, v []byte) error) error { incarnationPosition := storageEnodingStartElem + numOfItems*(3*common.HashLength) if uint32(len(b)) < incarnationPosition { - fmt.Println("WalkStorage", numOfItems) return fmt.Errorf("decode: input too short (%d bytes, expected at least %d bytes)", len(b), incarnationPosition) } incarnationsLength := len(b[incarnationPosition:]) - notDefaultIncarnation := make(map[uint32]uint64, 0) + notDefaultIncarnation := make(map[uint32]uint64) var ( id uint32 inc uint64 @@ -250,12 +248,11 @@ func (b StorageChangeSetBytes) FindLast(k []byte) ([]byte, error) { incarnationPosition := storageEnodingStartElem + numOfItems*(3*common.HashLength) if uint32(len(b)) < incarnationPosition { - fmt.Println("FindLast storage") return nil, fmt.Errorf("decode: input too short (%d bytes, expected at least %d bytes)", len(b), incarnationPosition) } incarnationsLength := len(b[incarnationPosition:]) - notDefaultIncarnation := make(map[uint32]uint64, 0) + notDefaultIncarnation := make(map[uint32]uint64) var ( id uint32 inc uint64 diff --git a/common/dbutils/bucket.go b/common/dbutils/bucket.go index 804265194..8f858df0a 100644 --- a/common/dbutils/bucket.go +++ b/common/dbutils/bucket.go @@ -36,16 +36,22 @@ var ( //value - code hash ContractCodeBucket = []byte("contractCode") - // key - encoded timestamp(block number) + history bucket(hAT/hST) - // value - encoded AccountChangeSet{k - addrHash|compositeKey(for storage) v - account(encoded) | originalValue(common.Hash)} - ChangeSetBucket = []byte("AccountChangeSet") - + //AccountChangeSetBucket keeps changesets of accounts + // key - encoded timestamp(block number) + // value - encoded ChangeSet{k - addrHash v - account(encoded). AccountChangeSetBucket = []byte("ACS") + + // StorageChangeSetBucket keeps changesets of storage + // key - encoded timestamp(block number) + // value - encoded ChangeSet{k - compositeKey(for storage) v - originalValue(common.Hash)}. StorageChangeSetBucket = []byte("SCS") // some_prefix_of(hash_of_address_of_account) => hash_of_subtrie IntermediateTrieHashBucket = []byte("iTh") + // DatabaseInfoBucket is used to store information about data layout. + DatabaseInfoBucket = []byte("DBINFO") + // databaseVerisionKey tracks the current database version. DatabaseVerisionKey = []byte("DatabaseVersion") @@ -85,4 +91,20 @@ var ( // last block that was pruned // it's saved one in 5 minutes LastPrunedBlockKey = []byte("LastPrunedBlock") + + // LastAppliedMigration keep the name of tle last applied migration. + LastAppliedMigration = []byte("lastAppliedMigration") + + //StorageModeHistory - does node save history. + StorageModeHistory = []byte("smHistory") + //StorageModeReceipts - does node save receipts. + StorageModeReceipts = []byte("smReceipts") + //StorageModeTxIndex - does node save transactions index. + StorageModeTxIndex = []byte("smTxIndex") + //StorageModePreImages - does node save hash to value mapping + StorageModePreImages = []byte("smPreImages") + //StorageModeThinHistory - does thin history mode enabled + StorageModeThinHistory = []byte("smThinHistory") + //StorageModeIntermediateTrieHash - does IntermediateTrieHash feature enabled + StorageModeIntermediateTrieHash = []byte("smIntermediateTrieHash") ) diff --git a/common/dbutils/history_index.go b/common/dbutils/history_index.go index 86b49e0a4..179bba6f7 100644 --- a/common/dbutils/history_index.go +++ b/common/dbutils/history_index.go @@ -78,8 +78,8 @@ func (hi *HistoryIndexBytes) Remove(v uint64) *HistoryIndexBytes { var currentElement uint64 var elemEnd uint32 + var itemLen uint32 - var itemLen = uint32(8) Loop: for i := numOfElements; i > 0; i-- { if i > numOfUint32Elements { @@ -117,7 +117,7 @@ func (hi *HistoryIndexBytes) Search(v uint64) (uint64, bool) { } numOfElements := binary.LittleEndian.Uint32((*hi)[0:LenBytes]) numOfUint32Elements := binary.LittleEndian.Uint32((*hi)[LenBytes : 2*LenBytes]) - var itemLen = uint32(8) + var itemLen uint32 if numOfElements == 0 { fmt.Println(2) diff --git a/eth/backend.go b/eth/backend.go index 8c339d56e..8ef5303bf 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -20,7 +20,10 @@ package eth import ( "errors" "fmt" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/migrations" "math/big" + "reflect" "runtime" "sync" "sync/atomic" @@ -171,6 +174,32 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion) } } + + err = setStorageModeIfNotExist(chainDb, config.StorageMode) + if err != nil { + return nil, err + } + + sm, err := getStorageModeFromDB(chainDb) + if err != nil { + return nil, err + } + if !reflect.DeepEqual(sm, config.StorageMode) { + return nil, errors.New("mode is " + config.StorageMode.ToString() + " original mode is " + sm.ToString()) + } + + err = migrations.NewMigrator().Apply( + chainDb, + config.StorageMode.History, + config.StorageMode.Receipts, + config.StorageMode.TxIndex, + config.StorageMode.Preimages, + config.StorageMode.ThinHistory, + ) + if err != nil { + return nil, err + } + var ( vmConfig = vm.Config{ EnablePreimageRecording: config.EnablePreimageRecording, @@ -592,3 +621,103 @@ func (s *Ethereum) Stop() error { close(s.shutdownChan) return nil } + +func setStorageModeIfNotExist(db ethdb.Database, sm StorageMode) error { + var ( + err error + ) + + err = setModeOnEmpty(db, dbutils.StorageModeHistory, sm.History) + if err != nil { + return err + } + + err = setModeOnEmpty(db, dbutils.StorageModePreImages, sm.Preimages) + if err != nil { + return err + } + + err = setModeOnEmpty(db, dbutils.StorageModeReceipts, sm.Receipts) + if err != nil { + return err + } + + err = setModeOnEmpty(db, dbutils.StorageModeTxIndex, sm.TxIndex) + if err != nil { + return err + } + + err = setModeOnEmpty(db, dbutils.StorageModeThinHistory, sm.ThinHistory) + if err != nil { + return err + } + + err = setModeOnEmpty(db, dbutils.StorageModeIntermediateTrieHash, sm.IntermediateTrieHash) + if err != nil { + return err + } + + return nil +} + +func setModeOnEmpty(db ethdb.Database, key []byte, currentValue bool) error { + _, err := db.Get(dbutils.DatabaseInfoBucket, key) + if err != nil && err != ethdb.ErrKeyNotFound { + return err + } + if err == ethdb.ErrKeyNotFound { + val := []byte{} + if currentValue { + val = []byte{1} + } + if err = db.Put(dbutils.DatabaseInfoBucket, key, val); err != nil { + return err + } + } + + return nil +} + +func getStorageModeFromDB(db ethdb.Database) (StorageMode, error) { + var ( + sm StorageMode + v []byte + err error + ) + v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeHistory) + if err != nil && err != ethdb.ErrKeyNotFound { + return StorageMode{}, err + } + sm.History = len(v) > 0 + + v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModePreImages) + if err != nil && err != ethdb.ErrKeyNotFound { + return StorageMode{}, err + } + sm.Preimages = len(v) > 0 + + v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeReceipts) + if err != nil && err != ethdb.ErrKeyNotFound { + return StorageMode{}, err + } + sm.Receipts = len(v) > 0 + + v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeTxIndex) + if err != nil && err != ethdb.ErrKeyNotFound { + return StorageMode{}, err + } + sm.TxIndex = len(v) > 0 + + v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeThinHistory) + if err != nil && err != ethdb.ErrKeyNotFound { + return StorageMode{}, err + } + sm.ThinHistory = len(v) > 0 + + v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeIntermediateTrieHash) + if err != nil && err != ethdb.ErrKeyNotFound { + return StorageMode{}, err + } + sm.IntermediateTrieHash = len(v) > 0 + return sm, nil +} diff --git a/eth/backend_test.go b/eth/backend_test.go new file mode 100644 index 000000000..89c52d76d --- /dev/null +++ b/eth/backend_test.go @@ -0,0 +1,50 @@ +package eth + +import ( + "github.com/davecgh/go-spew/spew" + "github.com/ledgerwatch/turbo-geth/ethdb" + "reflect" + "testing" +) + +func TestSetStorageModeIfNotExist(t *testing.T) { + db := ethdb.NewMemDatabase() + sm, err := getStorageModeFromDB(db) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(sm, StorageMode{}) { + t.Fatal() + } + + err = setStorageModeIfNotExist(db, StorageMode{ + true, + true, + true, + true, + true, + true, + }) + if err != nil { + t.Fatal(err) + } + + sm, err = getStorageModeFromDB(db) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(sm, StorageMode{ + true, + true, + true, + true, + true, + true, + }) { + spew.Dump(sm) + t.Fatal("not equal") + } + +} diff --git a/eth/config.go b/eth/config.go index 27b0176f6..1523c8305 100644 --- a/eth/config.go +++ b/eth/config.go @@ -18,6 +18,7 @@ package eth import ( "fmt" + "github.com/ledgerwatch/turbo-geth/common/debug" "math/big" "os" "os/user" @@ -92,9 +93,10 @@ type StorageMode struct { TxIndex bool Preimages bool IntermediateTrieHash bool + ThinHistory bool } -var DefaultStorageMode = StorageMode{History: true, Receipts: false, TxIndex: true, Preimages: true} +var DefaultStorageMode = StorageMode{History: true, Receipts: false, TxIndex: true, Preimages: true, ThinHistory: false} func (m StorageMode) ToString() string { modeString := "" @@ -113,6 +115,9 @@ func (m StorageMode) ToString() string { if m.IntermediateTrieHash { modeString += "i" } + if m.ThinHistory { + modeString += "n" + } return modeString } @@ -128,12 +133,21 @@ func StorageModeFromString(flags string) (StorageMode, error) { mode.TxIndex = true case 'p': mode.Preimages = true + case 'n': + mode.ThinHistory = true case 'i': mode.IntermediateTrieHash = true default: return mode, fmt.Errorf("unexpected flag found: %c", flag) } } + if mode.ThinHistory { + debug.ThinHistory = true + } + + if debug.IsThinHistory() { + mode.ThinHistory = true + } return mode, nil } diff --git a/migrations/changeset_migration.go b/migrations/changeset_migration.go new file mode 100644 index 000000000..63034055b --- /dev/null +++ b/migrations/changeset_migration.go @@ -0,0 +1,178 @@ +package migrations + +import ( + "bytes" + "errors" + "github.com/ledgerwatch/bolt" + "github.com/ledgerwatch/turbo-geth/common" + "github.com/ledgerwatch/turbo-geth/common/changeset" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/ledgerwatch/turbo-geth/log" + "time" +) + +var ( + //ChangeSetBucket - key - encoded timestamp(block number) + history bucket(hAT/hST) + // value - encoded ChangeSet{k - addrHash|compositeKey(for storage) v - account(encoded) | originalValue(common.Hash)} + ChangeSetBucket = []byte("ChangeSet") + //LastBatchKey - last inserted key + LastBatchKey = []byte("lastBatchKeyForSplitChangesetMigration") +) + +const splitChangesetBatchSize = 5000 + +func splitChangeSetMigration(batchSize int) Migration { + return Migration{ + Name: "split_changeset", + Up: func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error { + boltDB, ok := db.(*ethdb.BoltDatabase) + if !ok { + return errors.New("only boltdb migration") + } + + var rowNum int + changesetsToRemove := make([][]byte, 0) + accChangesets := make([][]byte, 0) + storageChangesets := make([][]byte, 0) + var ( + currentKey, currentValue []byte + done bool + ) + + currentKey, err := db.Get(dbutils.DatabaseInfoBucket, LastBatchKey) + if err != nil && err != ethdb.ErrKeyNotFound { + return err + } + + startTime := time.Now() + for !done { + err := boltDB.DB().Update(func(tx *bolt.Tx) error { + changesetBucket := tx.Bucket(ChangeSetBucket) + dbInfoBucket, err := tx.CreateBucketIfNotExists(dbutils.DatabaseInfoBucket, false) + if err != nil { + return err + } + if changesetBucket == nil { + done = true + return nil + } + changesetCursor := changesetBucket.Cursor() + + if currentKey == nil { + currentKey, currentValue = changesetCursor.First() + } else { + currentKey, currentValue = changesetCursor.Seek(currentKey) + } + + for currentKey != nil { + changesetsToRemove = append(changesetsToRemove, currentKey) + ts, bucket := dbutils.DecodeTimestamp(currentKey) + encTS := dbutils.EncodeTimestamp(ts) + + switch { + case bytes.Equal(dbutils.AccountsHistoryBucket, bucket): + if thinHistory { + cs, innerErr := changeset.DecodeChangeSet(currentValue) + if innerErr != nil { + return innerErr + } + v, innerErr := changeset.EncodeAccounts(cs) + if innerErr != nil { + return innerErr + } + + accChangesets = append(accChangesets, encTS, v) + } else { + accChangesets = append(accChangesets, encTS, currentValue) + } + + case bytes.Equal(dbutils.StorageHistoryBucket, bucket): + if thinHistory { + cs, innerErr := changeset.DecodeChangeSet(currentValue) + if innerErr != nil { + return innerErr + } + + v, innerErr := changeset.EncodeStorage(cs) + if innerErr != nil { + log.Error("Error on encode storage changeset", "err", innerErr) + return innerErr + } + storageChangesets = append(storageChangesets, encTS, v) + + } else { + storageChangesets = append(storageChangesets, encTS, currentValue) + } + } + + currentKey, currentValue = changesetCursor.Next() + if rowNum >= batchSize || currentKey == nil { + commTime := time.Now() + + if len(storageChangesets) > 0 { + storageCSBucket, innerErr := tx.CreateBucketIfNotExists(dbutils.StorageChangeSetBucket, false) + if innerErr != nil { + return innerErr + } + + innerErr = storageCSBucket.MultiPut(storageChangesets...) + if innerErr != nil { + return innerErr + } + } + + if len(accChangesets) > 0 { + accCSBucket, innerErr := tx.CreateBucketIfNotExists(dbutils.AccountChangeSetBucket, false) + if innerErr != nil { + return innerErr + } + innerErr = accCSBucket.MultiPut(accChangesets...) + if innerErr != nil { + return innerErr + } + } + + if len(changesetsToRemove) > 0 { + for _, v := range changesetsToRemove { + innerErr := changesetBucket.Delete(v) + if innerErr != nil { + return innerErr + } + } + } + + log.Warn("Commit", "block", ts, "commit time", time.Since(commTime), "migration time", time.Since(startTime)) + accChangesets = make([][]byte, 0) + storageChangesets = make([][]byte, 0) + changesetsToRemove = make([][]byte, 0) + rowNum = 0 + break + } else { + rowNum++ + } + } + + if currentKey == nil { + done = true + err = dbInfoBucket.Delete(LastBatchKey) + if err != nil { + return err + } + } else { + currentKey = common.CopyBytes(currentKey) + err = dbInfoBucket.Put(LastBatchKey, currentKey) + if err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } + } + return nil + }, + } +} diff --git a/migrations/changeset_migration_test.go b/migrations/changeset_migration_test.go new file mode 100644 index 000000000..1a634c311 --- /dev/null +++ b/migrations/changeset_migration_test.go @@ -0,0 +1,321 @@ +package migrations + +import ( + "fmt" + "github.com/davecgh/go-spew/spew" + "github.com/ledgerwatch/turbo-geth/common" + "github.com/ledgerwatch/turbo-geth/common/changeset" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/ethdb" + "testing" +) + +func TestChangeSetMigrationSuccess(t *testing.T) { + f := func(t *testing.T, n, batchSize int) { + db := ethdb.NewMemDatabase() + accCS, storageCS := generateChangeSets(t, n) + for i, v := range accCS { + enc, err := changeset.EncodeChangeSet(v) + if err != nil { + t.Error(err) + } + err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.AccountsHistoryBucket), enc) + if err != nil { + t.Error(err) + } + } + for i, v := range storageCS { + enc, err := changeset.EncodeChangeSet(v) + if err != nil { + t.Error(err) + } + err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.StorageHistoryBucket), enc) + if err != nil { + t.Error(err) + } + } + migrator := NewMigrator() + migrator.Migrations = []Migration{splitChangeSetMigration(batchSize)} + err := migrator.Apply(db, false, false, false, false, false) + if err != nil { + t.Error(err) + } + + err = db.Walk(ChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) { + ts, bucket := dbutils.DecodeTimestamp(k) + return false, fmt.Errorf("changeset bucket is not empty block %v bucket %v", ts, string(bucket)) + }) + if err != nil { + t.Error(err) + } + + err = db.Walk(dbutils.AccountChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) { + blockNum, _ := dbutils.DecodeTimestamp(k) + cs, innerErr := changeset.DecodeChangeSet(v) + if innerErr != nil { + return false, fmt.Errorf("decode fails block - %v err: %v", blockNum, innerErr) + } + if !cs.Equals(accCS[blockNum]) { + spew.Dump(cs) + spew.Dump(accCS[blockNum]) + return false, fmt.Errorf("not equal (%v)", blockNum) + } + return true, nil + }) + if err != nil { + t.Error(err) + } + + err = db.Walk(dbutils.StorageHistoryBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) { + blockNum, _ := dbutils.DecodeTimestamp(k) + cs, innerErr := changeset.DecodeChangeSet(v) + if innerErr != nil { + return false, fmt.Errorf("decode fails block - %v err: %v", blockNum, innerErr) + } + if !cs.Equals(storageCS[blockNum]) { + spew.Dump(cs) + spew.Dump(storageCS[blockNum]) + return false, fmt.Errorf("not equal (%v)", blockNum) + } + return true, nil + }) + if err != nil { + t.Error(err) + } + } + + t.Run("less batch", func(t *testing.T) { + f(t, 50, 60) + }) + t.Run("more batch", func(t *testing.T) { + f(t, 100, 60) + }) + t.Run("two batches", func(t *testing.T) { + f(t, 100, 50) + }) +} + +func TestChangeSetMigrationThinHistorySuccess(t *testing.T) { + f := func(t *testing.T, n, batchSize int) { + db := ethdb.NewMemDatabase() + accCS, storageCS := generateChangeSets(t, n) + + for i, v := range accCS { + enc, err := changeset.EncodeChangeSet(v) + if err != nil { + t.Error(err) + } + err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.AccountsHistoryBucket), enc) + if err != nil { + t.Error(err) + } + } + + for i, v := range storageCS { + enc, err := changeset.EncodeChangeSet(v) + if err != nil { + t.Error(err) + } + err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.StorageHistoryBucket), enc) + if err != nil { + t.Error(err) + } + } + + migrator := NewMigrator() + migrator.Migrations = []Migration{splitChangeSetMigration(batchSize)} + err := migrator.Apply(db, false, false, false, false, true) + if err != nil { + t.Error(err) + } + + err = db.Walk(ChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) { + ts, bucket := dbutils.DecodeTimestamp(k) + return false, fmt.Errorf("changeset bucket is not empty block %v bucket %v", ts, string(bucket)) + }) + if err != nil { + t.Error(err) + } + + err = db.Walk(dbutils.AccountChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) { + blockNum, _ := dbutils.DecodeTimestamp(k) + cs, innerErr := changeset.DecodeAccounts(v) + if innerErr != nil { + return false, innerErr + } + + if !cs.Equals(accCS[blockNum]) { + spew.Dump(cs) + spew.Dump(accCS[blockNum]) + return false, fmt.Errorf("not equal %v", blockNum) + } + return true, nil + }) + if err != nil { + t.Error(err) + } + + err = db.Walk(dbutils.StorageHistoryBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) { + blockNum, _ := dbutils.DecodeTimestamp(k) + cs, innerErr := changeset.DecodeStorage(v) + if innerErr != nil { + t.Error(innerErr, blockNum) + } + if !cs.Equals(storageCS[blockNum]) { + spew.Dump(cs) + spew.Dump(storageCS[blockNum]) + return false, fmt.Errorf("not equal (%v)", blockNum) + } + return true, nil + }) + if err != nil { + t.Error(err) + } + } + + t.Run("less batch", func(t *testing.T) { + f(t, 50, 60) + }) + t.Run("more batch", func(t *testing.T) { + f(t, 100, 60) + }) + t.Run("two batches", func(t *testing.T) { + f(t, 100, 50) + }) + +} + +func TestChangeSetMigrationFail(t *testing.T) { + db := ethdb.NewMemDatabase() + accCS, storageCS := generateChangeSets(t, 50) + + for i, v := range accCS { + enc, err := changeset.EncodeChangeSet(v) + if err != nil { + t.Error(err) + } + if i == 25 { + //incorrect value + err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.AccountsHistoryBucket), enc[:5]) + if err != nil { + t.Error(err) + } + } else { + err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.AccountsHistoryBucket), enc) + if err != nil { + t.Error(err) + } + } + } + + for i, v := range storageCS { + enc, err := changeset.EncodeChangeSet(v) + if err != nil { + t.Error(err) + } + err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.StorageHistoryBucket), enc) + if err != nil { + t.Error(err) + } + } + + migrator := NewMigrator() + migrator.Migrations = []Migration{splitChangeSetMigration(20)} + err := migrator.Apply(db, false, false, false, false, true) + if err == nil { + t.Error("should fail") + } + + _, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration) + if err == nil { + t.Error("should fail") + } + + //fix incorrect changeset + enc, err := changeset.EncodeChangeSet(accCS[25]) + if err != nil { + t.Error(err) + } + err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(25)), dbutils.AccountsHistoryBucket), enc) + if err != nil { + t.Error(err) + } + + err = migrator.Apply(db, false, false, false, false, true) + if err != nil { + t.Error(err) + } + + err = db.Walk(ChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) { + ts, bucket := dbutils.DecodeTimestamp(k) + return false, fmt.Errorf("changeset bucket is not empty block %v bucket %v", ts, string(bucket)) + }) + if err != nil { + t.Error(err) + } + + err = db.Walk(dbutils.AccountChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) { + blockNum, _ := dbutils.DecodeTimestamp(k) + cs, innerErr := changeset.DecodeAccounts(v) + if innerErr != nil { + return false, innerErr + } + + if !cs.Equals(accCS[blockNum]) { + spew.Dump(cs) + spew.Dump(accCS[blockNum]) + return false, fmt.Errorf("not equal %v", blockNum) + } + return true, nil + }) + if err != nil { + t.Error(err) + } + + err = db.Walk(dbutils.StorageHistoryBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) { + blockNum, _ := dbutils.DecodeTimestamp(k) + cs, innerErr := changeset.DecodeStorage(v) + if innerErr != nil { + t.Error(innerErr, blockNum) + } + if !cs.Equals(storageCS[blockNum]) { + spew.Dump(cs) + spew.Dump(storageCS[blockNum]) + return false, fmt.Errorf("not equal (%v)", blockNum) + } + return true, nil + }) + if err != nil { + t.Error(err) + } +} + +func generateChangeSets(t *testing.T, n int) ([]*changeset.ChangeSet, []*changeset.ChangeSet) { + t.Helper() + accChangeset := make([]*changeset.ChangeSet, n) + storageChangeset := make([]*changeset.ChangeSet, n) + for i := 0; i < n; i++ { + csAcc := changeset.NewChangeSet() + hash := common.Hash{uint8(i)} + err := csAcc.Add(hash.Bytes(), hash.Bytes()) + if err != nil { + t.Fatal(err) + } + accChangeset[i] = csAcc + + csStorage := changeset.NewChangeSet() + err = csStorage.Add( + dbutils.GenerateCompositeStorageKey( + hash, + ^uint64(1), + hash, + ), + hash.Bytes(), + ) + if err != nil { + t.Fatal(err) + } + storageChangeset[i] = csStorage + } + return accChangeset, storageChangeset +} diff --git a/migrations/migrations.go b/migrations/migrations.go new file mode 100644 index 000000000..ef453a3a2 --- /dev/null +++ b/migrations/migrations.go @@ -0,0 +1,59 @@ +package migrations + +import ( + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/ledgerwatch/turbo-geth/log" +) + +type Migration struct { + Name string + Up func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error +} + +func NewMigrator() *Migrator { + return &Migrator{ + Migrations: migrations, + } +} + +type Migrator struct { + Migrations []Migration +} + +func (m *Migrator) Apply(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error { + if len(m.Migrations) == 0 { + return nil + } + + lastApplied, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration) + if err != nil && err != ethdb.ErrKeyNotFound { + return err + } + + i := len(m.Migrations) - 1 + for ; i >= 0; i-- { + if m.Migrations[i].Name == string(lastApplied) { + break + } + } + + m.Migrations = m.Migrations[i+1:] + for _, v := range m.Migrations { + log.Warn("Apply migration", v.Name) + err := v.Up(db, history, receipts, txIndex, preImages, thinHistory) + if err != nil { + return err + } + err = db.Put(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration, []byte(v.Name)) + if err != nil { + return err + } + log.Warn("Applied migration", v.Name) + } + return nil +} + +var migrations = []Migration{ + splitChangeSetMigration(splitChangesetBatchSize), +} diff --git a/migrations/migrations_test.go b/migrations/migrations_test.go new file mode 100644 index 000000000..cb4c2c784 --- /dev/null +++ b/migrations/migrations_test.go @@ -0,0 +1,76 @@ +package migrations + +import ( + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/ethdb" + "testing" +) + +func TestApplyWithInit(t *testing.T) { + db := ethdb.NewMemDatabase() + migrations = []Migration{ + { + "one", + func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error { + return nil + }, + }, + { + "two", + func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error { + return nil + }, + }, + } + + migrator := NewMigrator() + migrator.Migrations = migrations + err := migrator.Apply(db, false, false, false, false, false) + if err != nil { + t.Fatal() + } + v, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration) + if err != nil { + t.Fatal(err) + } + if string(v) != migrations[1].Name { + t.Fatal() + } +} + +func TestApplyWithoutInit(t *testing.T) { + db := ethdb.NewMemDatabase() + migrations = []Migration{ + { + "one", + func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error { + t.Fatal("shouldn't been executed") + return nil + }, + }, + { + "two", + func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error { + return nil + }, + }, + } + err := db.Put(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration, []byte(migrations[0].Name)) + if err != nil { + t.Fatal() + } + + migrator := NewMigrator() + migrator.Migrations = migrations + err = migrator.Apply(db, false, false, false, false, false) + if err != nil { + t.Fatal() + } + v, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration) + if err != nil { + t.Fatal(err) + } + if string(v) != migrations[1].Name { + t.Fatal() + } +}