Merge pull request #373 from ledgerwatch/migrations

migrations
This commit is contained in:
b00ris 2020-02-26 00:41:11 +03:00 committed by GitHub
commit 71b355bb6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 877 additions and 15 deletions

View File

@ -1246,7 +1246,8 @@ func readAccount(chaindata string, account common.Address, block uint64, rewind
var printed bool
encodedTS := dbutils.EncodeTimestamp(timestamp)
changeSetKey := dbutils.CompositeChangeSetKey(encodedTS, dbutils.StorageHistoryBucket)
v, err = ethDb.Get(dbutils.ChangeSetBucket, changeSetKey)
v, err = ethDb.Get(dbutils.StorageChangeSetBucket, changeSetKey)
check(err)
if v != nil {
err = changeset.StorageChangeSetBytes(v).Walk(func(key, value []byte) error {
if bytes.HasPrefix(key, secKey) {

View File

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"github.com/ledgerwatch/turbo-geth/migrations"
"os"
"time"
@ -169,7 +170,7 @@ func newBucketWriter(db ethdb.Database, bucket []byte) *bucketWriter {
}
func copyDatabase(fromDB ethdb.Database, toDB ethdb.Database) error {
for _, bucket := range [][]byte{dbutils.AccountsBucket, dbutils.StorageBucket, dbutils.CodeBucket} {
for _, bucket := range [][]byte{dbutils.AccountsBucket, dbutils.StorageBucket, dbutils.CodeBucket, dbutils.DatabaseInfoBucket} {
fmt.Printf(" - copying bucket '%s'...\n", string(bucket))
writer := newBucketWriter(toDB, bucket)
@ -207,6 +208,8 @@ func loadSnapshot(db ethdb.Database, filename string, createDb CreateDbFunc) {
err = copyDatabase(diskDb, db)
check(err)
err = migrations.NewMigrator().Apply(diskDb, false, false, false, false, false)
check(err)
}
func loadCodes(db *bolt.DB, codeDb ethdb.Database) error {

View File

@ -95,6 +95,13 @@ func TestCopyDatabase(t *testing.T) {
string(dbutils.CodeBucket): generateData(string(dbutils.CodeBucket)),
})
doTestcase(t, map[string]testData{
string(dbutils.AccountsBucket): generateData(string(dbutils.AccountsBucket)),
string(dbutils.StorageBucket): generateData(string(dbutils.StorageBucket)),
string(dbutils.CodeBucket): generateData(string(dbutils.CodeBucket)),
string(dbutils.DatabaseInfoBucket): generateData(string(dbutils.DatabaseInfoBucket)),
})
}
func doTestcase(t *testing.T, testCase map[string]testData) {

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"github.com/ledgerwatch/turbo-geth/common"
"reflect"
"sort"
)
@ -143,6 +144,10 @@ func (s *ChangeSet) ChangedKeys() map[string]struct{} {
return m
}
func (s *ChangeSet) Equals(s2 *ChangeSet) bool {
return reflect.DeepEqual(s.Changes, s2.Changes)
}
// Encoded Method
func Len(b []byte) int {

View File

@ -93,7 +93,6 @@ func EncodeStorage(s *ChangeSet) ([]byte, error) {
}
byt := buf.Bytes()
fmt.Println("enc storage", len(byt), len(s.Changes))
return byt, nil
}
@ -123,7 +122,7 @@ func DecodeStorage(b []byte) (*ChangeSet, error) {
//parse not default incarnations
incarnationsLength := len(b[incarnationPosition:])
notDefaultIncarnation := make(map[uint32]uint64, 0)
notDefaultIncarnation := make(map[uint32]uint64)
var (
id uint32
inc uint64
@ -186,11 +185,10 @@ func (b StorageChangeSetBytes) Walk(f func(k, v []byte) error) error {
incarnationPosition := storageEnodingStartElem + numOfItems*(3*common.HashLength)
if uint32(len(b)) < incarnationPosition {
fmt.Println("WalkStorage", numOfItems)
return fmt.Errorf("decode: input too short (%d bytes, expected at least %d bytes)", len(b), incarnationPosition)
}
incarnationsLength := len(b[incarnationPosition:])
notDefaultIncarnation := make(map[uint32]uint64, 0)
notDefaultIncarnation := make(map[uint32]uint64)
var (
id uint32
inc uint64
@ -250,12 +248,11 @@ func (b StorageChangeSetBytes) FindLast(k []byte) ([]byte, error) {
incarnationPosition := storageEnodingStartElem + numOfItems*(3*common.HashLength)
if uint32(len(b)) < incarnationPosition {
fmt.Println("FindLast storage")
return nil, fmt.Errorf("decode: input too short (%d bytes, expected at least %d bytes)", len(b), incarnationPosition)
}
incarnationsLength := len(b[incarnationPosition:])
notDefaultIncarnation := make(map[uint32]uint64, 0)
notDefaultIncarnation := make(map[uint32]uint64)
var (
id uint32
inc uint64

View File

@ -36,16 +36,22 @@ var (
//value - code hash
ContractCodeBucket = []byte("contractCode")
// key - encoded timestamp(block number) + history bucket(hAT/hST)
// value - encoded AccountChangeSet{k - addrHash|compositeKey(for storage) v - account(encoded) | originalValue(common.Hash)}
ChangeSetBucket = []byte("AccountChangeSet")
//AccountChangeSetBucket keeps changesets of accounts
// key - encoded timestamp(block number)
// value - encoded ChangeSet{k - addrHash v - account(encoded).
AccountChangeSetBucket = []byte("ACS")
// StorageChangeSetBucket keeps changesets of storage
// key - encoded timestamp(block number)
// value - encoded ChangeSet{k - compositeKey(for storage) v - originalValue(common.Hash)}.
StorageChangeSetBucket = []byte("SCS")
// some_prefix_of(hash_of_address_of_account) => hash_of_subtrie
IntermediateTrieHashBucket = []byte("iTh")
// DatabaseInfoBucket is used to store information about data layout.
DatabaseInfoBucket = []byte("DBINFO")
// databaseVerisionKey tracks the current database version.
DatabaseVerisionKey = []byte("DatabaseVersion")
@ -85,4 +91,20 @@ var (
// last block that was pruned
// it's saved one in 5 minutes
LastPrunedBlockKey = []byte("LastPrunedBlock")
// LastAppliedMigration keep the name of tle last applied migration.
LastAppliedMigration = []byte("lastAppliedMigration")
//StorageModeHistory - does node save history.
StorageModeHistory = []byte("smHistory")
//StorageModeReceipts - does node save receipts.
StorageModeReceipts = []byte("smReceipts")
//StorageModeTxIndex - does node save transactions index.
StorageModeTxIndex = []byte("smTxIndex")
//StorageModePreImages - does node save hash to value mapping
StorageModePreImages = []byte("smPreImages")
//StorageModeThinHistory - does thin history mode enabled
StorageModeThinHistory = []byte("smThinHistory")
//StorageModeIntermediateTrieHash - does IntermediateTrieHash feature enabled
StorageModeIntermediateTrieHash = []byte("smIntermediateTrieHash")
)

View File

@ -78,8 +78,8 @@ func (hi *HistoryIndexBytes) Remove(v uint64) *HistoryIndexBytes {
var currentElement uint64
var elemEnd uint32
var itemLen uint32
var itemLen = uint32(8)
Loop:
for i := numOfElements; i > 0; i-- {
if i > numOfUint32Elements {
@ -117,7 +117,7 @@ func (hi *HistoryIndexBytes) Search(v uint64) (uint64, bool) {
}
numOfElements := binary.LittleEndian.Uint32((*hi)[0:LenBytes])
numOfUint32Elements := binary.LittleEndian.Uint32((*hi)[LenBytes : 2*LenBytes])
var itemLen = uint32(8)
var itemLen uint32
if numOfElements == 0 {
fmt.Println(2)

View File

@ -20,7 +20,10 @@ package eth
import (
"errors"
"fmt"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/migrations"
"math/big"
"reflect"
"runtime"
"sync"
"sync/atomic"
@ -171,6 +174,32 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion)
}
}
err = setStorageModeIfNotExist(chainDb, config.StorageMode)
if err != nil {
return nil, err
}
sm, err := getStorageModeFromDB(chainDb)
if err != nil {
return nil, err
}
if !reflect.DeepEqual(sm, config.StorageMode) {
return nil, errors.New("mode is " + config.StorageMode.ToString() + " original mode is " + sm.ToString())
}
err = migrations.NewMigrator().Apply(
chainDb,
config.StorageMode.History,
config.StorageMode.Receipts,
config.StorageMode.TxIndex,
config.StorageMode.Preimages,
config.StorageMode.ThinHistory,
)
if err != nil {
return nil, err
}
var (
vmConfig = vm.Config{
EnablePreimageRecording: config.EnablePreimageRecording,
@ -592,3 +621,103 @@ func (s *Ethereum) Stop() error {
close(s.shutdownChan)
return nil
}
func setStorageModeIfNotExist(db ethdb.Database, sm StorageMode) error {
var (
err error
)
err = setModeOnEmpty(db, dbutils.StorageModeHistory, sm.History)
if err != nil {
return err
}
err = setModeOnEmpty(db, dbutils.StorageModePreImages, sm.Preimages)
if err != nil {
return err
}
err = setModeOnEmpty(db, dbutils.StorageModeReceipts, sm.Receipts)
if err != nil {
return err
}
err = setModeOnEmpty(db, dbutils.StorageModeTxIndex, sm.TxIndex)
if err != nil {
return err
}
err = setModeOnEmpty(db, dbutils.StorageModeThinHistory, sm.ThinHistory)
if err != nil {
return err
}
err = setModeOnEmpty(db, dbutils.StorageModeIntermediateTrieHash, sm.IntermediateTrieHash)
if err != nil {
return err
}
return nil
}
func setModeOnEmpty(db ethdb.Database, key []byte, currentValue bool) error {
_, err := db.Get(dbutils.DatabaseInfoBucket, key)
if err != nil && err != ethdb.ErrKeyNotFound {
return err
}
if err == ethdb.ErrKeyNotFound {
val := []byte{}
if currentValue {
val = []byte{1}
}
if err = db.Put(dbutils.DatabaseInfoBucket, key, val); err != nil {
return err
}
}
return nil
}
func getStorageModeFromDB(db ethdb.Database) (StorageMode, error) {
var (
sm StorageMode
v []byte
err error
)
v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeHistory)
if err != nil && err != ethdb.ErrKeyNotFound {
return StorageMode{}, err
}
sm.History = len(v) > 0
v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModePreImages)
if err != nil && err != ethdb.ErrKeyNotFound {
return StorageMode{}, err
}
sm.Preimages = len(v) > 0
v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeReceipts)
if err != nil && err != ethdb.ErrKeyNotFound {
return StorageMode{}, err
}
sm.Receipts = len(v) > 0
v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeTxIndex)
if err != nil && err != ethdb.ErrKeyNotFound {
return StorageMode{}, err
}
sm.TxIndex = len(v) > 0
v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeThinHistory)
if err != nil && err != ethdb.ErrKeyNotFound {
return StorageMode{}, err
}
sm.ThinHistory = len(v) > 0
v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeIntermediateTrieHash)
if err != nil && err != ethdb.ErrKeyNotFound {
return StorageMode{}, err
}
sm.IntermediateTrieHash = len(v) > 0
return sm, nil
}

50
eth/backend_test.go Normal file
View File

@ -0,0 +1,50 @@
package eth
import (
"github.com/davecgh/go-spew/spew"
"github.com/ledgerwatch/turbo-geth/ethdb"
"reflect"
"testing"
)
func TestSetStorageModeIfNotExist(t *testing.T) {
db := ethdb.NewMemDatabase()
sm, err := getStorageModeFromDB(db)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(sm, StorageMode{}) {
t.Fatal()
}
err = setStorageModeIfNotExist(db, StorageMode{
true,
true,
true,
true,
true,
true,
})
if err != nil {
t.Fatal(err)
}
sm, err = getStorageModeFromDB(db)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(sm, StorageMode{
true,
true,
true,
true,
true,
true,
}) {
spew.Dump(sm)
t.Fatal("not equal")
}
}

View File

@ -18,6 +18,7 @@ package eth
import (
"fmt"
"github.com/ledgerwatch/turbo-geth/common/debug"
"math/big"
"os"
"os/user"
@ -92,9 +93,10 @@ type StorageMode struct {
TxIndex bool
Preimages bool
IntermediateTrieHash bool
ThinHistory bool
}
var DefaultStorageMode = StorageMode{History: true, Receipts: false, TxIndex: true, Preimages: true}
var DefaultStorageMode = StorageMode{History: true, Receipts: false, TxIndex: true, Preimages: true, ThinHistory: false}
func (m StorageMode) ToString() string {
modeString := ""
@ -113,6 +115,9 @@ func (m StorageMode) ToString() string {
if m.IntermediateTrieHash {
modeString += "i"
}
if m.ThinHistory {
modeString += "n"
}
return modeString
}
@ -128,12 +133,21 @@ func StorageModeFromString(flags string) (StorageMode, error) {
mode.TxIndex = true
case 'p':
mode.Preimages = true
case 'n':
mode.ThinHistory = true
case 'i':
mode.IntermediateTrieHash = true
default:
return mode, fmt.Errorf("unexpected flag found: %c", flag)
}
}
if mode.ThinHistory {
debug.ThinHistory = true
}
if debug.IsThinHistory() {
mode.ThinHistory = true
}
return mode, nil
}

View File

@ -0,0 +1,178 @@
package migrations
import (
"bytes"
"errors"
"github.com/ledgerwatch/bolt"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/changeset"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"time"
)
var (
//ChangeSetBucket - key - encoded timestamp(block number) + history bucket(hAT/hST)
// value - encoded ChangeSet{k - addrHash|compositeKey(for storage) v - account(encoded) | originalValue(common.Hash)}
ChangeSetBucket = []byte("ChangeSet")
//LastBatchKey - last inserted key
LastBatchKey = []byte("lastBatchKeyForSplitChangesetMigration")
)
const splitChangesetBatchSize = 5000
func splitChangeSetMigration(batchSize int) Migration {
return Migration{
Name: "split_changeset",
Up: func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error {
boltDB, ok := db.(*ethdb.BoltDatabase)
if !ok {
return errors.New("only boltdb migration")
}
var rowNum int
changesetsToRemove := make([][]byte, 0)
accChangesets := make([][]byte, 0)
storageChangesets := make([][]byte, 0)
var (
currentKey, currentValue []byte
done bool
)
currentKey, err := db.Get(dbutils.DatabaseInfoBucket, LastBatchKey)
if err != nil && err != ethdb.ErrKeyNotFound {
return err
}
startTime := time.Now()
for !done {
err := boltDB.DB().Update(func(tx *bolt.Tx) error {
changesetBucket := tx.Bucket(ChangeSetBucket)
dbInfoBucket, err := tx.CreateBucketIfNotExists(dbutils.DatabaseInfoBucket, false)
if err != nil {
return err
}
if changesetBucket == nil {
done = true
return nil
}
changesetCursor := changesetBucket.Cursor()
if currentKey == nil {
currentKey, currentValue = changesetCursor.First()
} else {
currentKey, currentValue = changesetCursor.Seek(currentKey)
}
for currentKey != nil {
changesetsToRemove = append(changesetsToRemove, currentKey)
ts, bucket := dbutils.DecodeTimestamp(currentKey)
encTS := dbutils.EncodeTimestamp(ts)
switch {
case bytes.Equal(dbutils.AccountsHistoryBucket, bucket):
if thinHistory {
cs, innerErr := changeset.DecodeChangeSet(currentValue)
if innerErr != nil {
return innerErr
}
v, innerErr := changeset.EncodeAccounts(cs)
if innerErr != nil {
return innerErr
}
accChangesets = append(accChangesets, encTS, v)
} else {
accChangesets = append(accChangesets, encTS, currentValue)
}
case bytes.Equal(dbutils.StorageHistoryBucket, bucket):
if thinHistory {
cs, innerErr := changeset.DecodeChangeSet(currentValue)
if innerErr != nil {
return innerErr
}
v, innerErr := changeset.EncodeStorage(cs)
if innerErr != nil {
log.Error("Error on encode storage changeset", "err", innerErr)
return innerErr
}
storageChangesets = append(storageChangesets, encTS, v)
} else {
storageChangesets = append(storageChangesets, encTS, currentValue)
}
}
currentKey, currentValue = changesetCursor.Next()
if rowNum >= batchSize || currentKey == nil {
commTime := time.Now()
if len(storageChangesets) > 0 {
storageCSBucket, innerErr := tx.CreateBucketIfNotExists(dbutils.StorageChangeSetBucket, false)
if innerErr != nil {
return innerErr
}
innerErr = storageCSBucket.MultiPut(storageChangesets...)
if innerErr != nil {
return innerErr
}
}
if len(accChangesets) > 0 {
accCSBucket, innerErr := tx.CreateBucketIfNotExists(dbutils.AccountChangeSetBucket, false)
if innerErr != nil {
return innerErr
}
innerErr = accCSBucket.MultiPut(accChangesets...)
if innerErr != nil {
return innerErr
}
}
if len(changesetsToRemove) > 0 {
for _, v := range changesetsToRemove {
innerErr := changesetBucket.Delete(v)
if innerErr != nil {
return innerErr
}
}
}
log.Warn("Commit", "block", ts, "commit time", time.Since(commTime), "migration time", time.Since(startTime))
accChangesets = make([][]byte, 0)
storageChangesets = make([][]byte, 0)
changesetsToRemove = make([][]byte, 0)
rowNum = 0
break
} else {
rowNum++
}
}
if currentKey == nil {
done = true
err = dbInfoBucket.Delete(LastBatchKey)
if err != nil {
return err
}
} else {
currentKey = common.CopyBytes(currentKey)
err = dbInfoBucket.Put(LastBatchKey, currentKey)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
}
return nil
},
}
}

View File

@ -0,0 +1,321 @@
package migrations
import (
"fmt"
"github.com/davecgh/go-spew/spew"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/changeset"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb"
"testing"
)
func TestChangeSetMigrationSuccess(t *testing.T) {
f := func(t *testing.T, n, batchSize int) {
db := ethdb.NewMemDatabase()
accCS, storageCS := generateChangeSets(t, n)
for i, v := range accCS {
enc, err := changeset.EncodeChangeSet(v)
if err != nil {
t.Error(err)
}
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.AccountsHistoryBucket), enc)
if err != nil {
t.Error(err)
}
}
for i, v := range storageCS {
enc, err := changeset.EncodeChangeSet(v)
if err != nil {
t.Error(err)
}
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.StorageHistoryBucket), enc)
if err != nil {
t.Error(err)
}
}
migrator := NewMigrator()
migrator.Migrations = []Migration{splitChangeSetMigration(batchSize)}
err := migrator.Apply(db, false, false, false, false, false)
if err != nil {
t.Error(err)
}
err = db.Walk(ChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
ts, bucket := dbutils.DecodeTimestamp(k)
return false, fmt.Errorf("changeset bucket is not empty block %v bucket %v", ts, string(bucket))
})
if err != nil {
t.Error(err)
}
err = db.Walk(dbutils.AccountChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
blockNum, _ := dbutils.DecodeTimestamp(k)
cs, innerErr := changeset.DecodeChangeSet(v)
if innerErr != nil {
return false, fmt.Errorf("decode fails block - %v err: %v", blockNum, innerErr)
}
if !cs.Equals(accCS[blockNum]) {
spew.Dump(cs)
spew.Dump(accCS[blockNum])
return false, fmt.Errorf("not equal (%v)", blockNum)
}
return true, nil
})
if err != nil {
t.Error(err)
}
err = db.Walk(dbutils.StorageHistoryBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
blockNum, _ := dbutils.DecodeTimestamp(k)
cs, innerErr := changeset.DecodeChangeSet(v)
if innerErr != nil {
return false, fmt.Errorf("decode fails block - %v err: %v", blockNum, innerErr)
}
if !cs.Equals(storageCS[blockNum]) {
spew.Dump(cs)
spew.Dump(storageCS[blockNum])
return false, fmt.Errorf("not equal (%v)", blockNum)
}
return true, nil
})
if err != nil {
t.Error(err)
}
}
t.Run("less batch", func(t *testing.T) {
f(t, 50, 60)
})
t.Run("more batch", func(t *testing.T) {
f(t, 100, 60)
})
t.Run("two batches", func(t *testing.T) {
f(t, 100, 50)
})
}
func TestChangeSetMigrationThinHistorySuccess(t *testing.T) {
f := func(t *testing.T, n, batchSize int) {
db := ethdb.NewMemDatabase()
accCS, storageCS := generateChangeSets(t, n)
for i, v := range accCS {
enc, err := changeset.EncodeChangeSet(v)
if err != nil {
t.Error(err)
}
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.AccountsHistoryBucket), enc)
if err != nil {
t.Error(err)
}
}
for i, v := range storageCS {
enc, err := changeset.EncodeChangeSet(v)
if err != nil {
t.Error(err)
}
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.StorageHistoryBucket), enc)
if err != nil {
t.Error(err)
}
}
migrator := NewMigrator()
migrator.Migrations = []Migration{splitChangeSetMigration(batchSize)}
err := migrator.Apply(db, false, false, false, false, true)
if err != nil {
t.Error(err)
}
err = db.Walk(ChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
ts, bucket := dbutils.DecodeTimestamp(k)
return false, fmt.Errorf("changeset bucket is not empty block %v bucket %v", ts, string(bucket))
})
if err != nil {
t.Error(err)
}
err = db.Walk(dbutils.AccountChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
blockNum, _ := dbutils.DecodeTimestamp(k)
cs, innerErr := changeset.DecodeAccounts(v)
if innerErr != nil {
return false, innerErr
}
if !cs.Equals(accCS[blockNum]) {
spew.Dump(cs)
spew.Dump(accCS[blockNum])
return false, fmt.Errorf("not equal %v", blockNum)
}
return true, nil
})
if err != nil {
t.Error(err)
}
err = db.Walk(dbutils.StorageHistoryBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
blockNum, _ := dbutils.DecodeTimestamp(k)
cs, innerErr := changeset.DecodeStorage(v)
if innerErr != nil {
t.Error(innerErr, blockNum)
}
if !cs.Equals(storageCS[blockNum]) {
spew.Dump(cs)
spew.Dump(storageCS[blockNum])
return false, fmt.Errorf("not equal (%v)", blockNum)
}
return true, nil
})
if err != nil {
t.Error(err)
}
}
t.Run("less batch", func(t *testing.T) {
f(t, 50, 60)
})
t.Run("more batch", func(t *testing.T) {
f(t, 100, 60)
})
t.Run("two batches", func(t *testing.T) {
f(t, 100, 50)
})
}
func TestChangeSetMigrationFail(t *testing.T) {
db := ethdb.NewMemDatabase()
accCS, storageCS := generateChangeSets(t, 50)
for i, v := range accCS {
enc, err := changeset.EncodeChangeSet(v)
if err != nil {
t.Error(err)
}
if i == 25 {
//incorrect value
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.AccountsHistoryBucket), enc[:5])
if err != nil {
t.Error(err)
}
} else {
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.AccountsHistoryBucket), enc)
if err != nil {
t.Error(err)
}
}
}
for i, v := range storageCS {
enc, err := changeset.EncodeChangeSet(v)
if err != nil {
t.Error(err)
}
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.StorageHistoryBucket), enc)
if err != nil {
t.Error(err)
}
}
migrator := NewMigrator()
migrator.Migrations = []Migration{splitChangeSetMigration(20)}
err := migrator.Apply(db, false, false, false, false, true)
if err == nil {
t.Error("should fail")
}
_, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration)
if err == nil {
t.Error("should fail")
}
//fix incorrect changeset
enc, err := changeset.EncodeChangeSet(accCS[25])
if err != nil {
t.Error(err)
}
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(25)), dbutils.AccountsHistoryBucket), enc)
if err != nil {
t.Error(err)
}
err = migrator.Apply(db, false, false, false, false, true)
if err != nil {
t.Error(err)
}
err = db.Walk(ChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
ts, bucket := dbutils.DecodeTimestamp(k)
return false, fmt.Errorf("changeset bucket is not empty block %v bucket %v", ts, string(bucket))
})
if err != nil {
t.Error(err)
}
err = db.Walk(dbutils.AccountChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
blockNum, _ := dbutils.DecodeTimestamp(k)
cs, innerErr := changeset.DecodeAccounts(v)
if innerErr != nil {
return false, innerErr
}
if !cs.Equals(accCS[blockNum]) {
spew.Dump(cs)
spew.Dump(accCS[blockNum])
return false, fmt.Errorf("not equal %v", blockNum)
}
return true, nil
})
if err != nil {
t.Error(err)
}
err = db.Walk(dbutils.StorageHistoryBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
blockNum, _ := dbutils.DecodeTimestamp(k)
cs, innerErr := changeset.DecodeStorage(v)
if innerErr != nil {
t.Error(innerErr, blockNum)
}
if !cs.Equals(storageCS[blockNum]) {
spew.Dump(cs)
spew.Dump(storageCS[blockNum])
return false, fmt.Errorf("not equal (%v)", blockNum)
}
return true, nil
})
if err != nil {
t.Error(err)
}
}
func generateChangeSets(t *testing.T, n int) ([]*changeset.ChangeSet, []*changeset.ChangeSet) {
t.Helper()
accChangeset := make([]*changeset.ChangeSet, n)
storageChangeset := make([]*changeset.ChangeSet, n)
for i := 0; i < n; i++ {
csAcc := changeset.NewChangeSet()
hash := common.Hash{uint8(i)}
err := csAcc.Add(hash.Bytes(), hash.Bytes())
if err != nil {
t.Fatal(err)
}
accChangeset[i] = csAcc
csStorage := changeset.NewChangeSet()
err = csStorage.Add(
dbutils.GenerateCompositeStorageKey(
hash,
^uint64(1),
hash,
),
hash.Bytes(),
)
if err != nil {
t.Fatal(err)
}
storageChangeset[i] = csStorage
}
return accChangeset, storageChangeset
}

59
migrations/migrations.go Normal file
View File

@ -0,0 +1,59 @@
package migrations
import (
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
)
type Migration struct {
Name string
Up func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error
}
func NewMigrator() *Migrator {
return &Migrator{
Migrations: migrations,
}
}
type Migrator struct {
Migrations []Migration
}
func (m *Migrator) Apply(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error {
if len(m.Migrations) == 0 {
return nil
}
lastApplied, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration)
if err != nil && err != ethdb.ErrKeyNotFound {
return err
}
i := len(m.Migrations) - 1
for ; i >= 0; i-- {
if m.Migrations[i].Name == string(lastApplied) {
break
}
}
m.Migrations = m.Migrations[i+1:]
for _, v := range m.Migrations {
log.Warn("Apply migration", v.Name)
err := v.Up(db, history, receipts, txIndex, preImages, thinHistory)
if err != nil {
return err
}
err = db.Put(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration, []byte(v.Name))
if err != nil {
return err
}
log.Warn("Applied migration", v.Name)
}
return nil
}
var migrations = []Migration{
splitChangeSetMigration(splitChangesetBatchSize),
}

View File

@ -0,0 +1,76 @@
package migrations
import (
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb"
"testing"
)
func TestApplyWithInit(t *testing.T) {
db := ethdb.NewMemDatabase()
migrations = []Migration{
{
"one",
func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error {
return nil
},
},
{
"two",
func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error {
return nil
},
},
}
migrator := NewMigrator()
migrator.Migrations = migrations
err := migrator.Apply(db, false, false, false, false, false)
if err != nil {
t.Fatal()
}
v, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration)
if err != nil {
t.Fatal(err)
}
if string(v) != migrations[1].Name {
t.Fatal()
}
}
func TestApplyWithoutInit(t *testing.T) {
db := ethdb.NewMemDatabase()
migrations = []Migration{
{
"one",
func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error {
t.Fatal("shouldn't been executed")
return nil
},
},
{
"two",
func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error {
return nil
},
},
}
err := db.Put(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration, []byte(migrations[0].Name))
if err != nil {
t.Fatal()
}
migrator := NewMigrator()
migrator.Migrations = migrations
err = migrator.Apply(db, false, false, false, false, false)
if err != nil {
t.Fatal()
}
v, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration)
if err != nil {
t.Fatal(err)
}
if string(v) != migrations[1].Name {
t.Fatal()
}
}