Merge pull request #356 from ledgerwatch/change_thin_history_index_encoding

Change thin history index encoding
This commit is contained in:
b00ris 2020-02-26 11:15:35 +03:00 committed by GitHub
commit 5c2d4dba3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1320 additions and 592 deletions

View File

@ -1246,7 +1246,8 @@ func readAccount(chaindata string, account common.Address, block uint64, rewind
var printed bool
encodedTS := dbutils.EncodeTimestamp(timestamp)
changeSetKey := dbutils.CompositeChangeSetKey(encodedTS, dbutils.StorageHistoryBucket)
v, err = ethDb.Get(dbutils.ChangeSetBucket, changeSetKey)
v, err = ethDb.Get(dbutils.StorageChangeSetBucket, changeSetKey)
check(err)
if v != nil {
err = changeset.StorageChangeSetBytes(v).Walk(func(key, value []byte) error {
if bytes.HasPrefix(key, secKey) {

View File

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"github.com/ledgerwatch/turbo-geth/migrations"
"os"
"time"
@ -169,7 +170,7 @@ func newBucketWriter(db ethdb.Database, bucket []byte) *bucketWriter {
}
func copyDatabase(fromDB ethdb.Database, toDB ethdb.Database) error {
for _, bucket := range [][]byte{dbutils.AccountsBucket, dbutils.StorageBucket, dbutils.CodeBucket} {
for _, bucket := range [][]byte{dbutils.AccountsBucket, dbutils.StorageBucket, dbutils.CodeBucket, dbutils.DatabaseInfoBucket} {
fmt.Printf(" - copying bucket '%s'...\n", string(bucket))
writer := newBucketWriter(toDB, bucket)
@ -207,6 +208,8 @@ func loadSnapshot(db ethdb.Database, filename string, createDb CreateDbFunc) {
err = copyDatabase(diskDb, db)
check(err)
err = migrations.NewMigrator().Apply(diskDb, false, false, false, false, false)
check(err)
}
func loadCodes(db *bolt.DB, codeDb ethdb.Database) error {

View File

@ -95,6 +95,13 @@ func TestCopyDatabase(t *testing.T) {
string(dbutils.CodeBucket): generateData(string(dbutils.CodeBucket)),
})
doTestcase(t, map[string]testData{
string(dbutils.AccountsBucket): generateData(string(dbutils.AccountsBucket)),
string(dbutils.StorageBucket): generateData(string(dbutils.StorageBucket)),
string(dbutils.CodeBucket): generateData(string(dbutils.CodeBucket)),
string(dbutils.DatabaseInfoBucket): generateData(string(dbutils.DatabaseInfoBucket)),
})
}
func doTestcase(t *testing.T, testCase map[string]testData) {

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"github.com/ledgerwatch/turbo-geth/common"
"reflect"
"sort"
)
@ -143,6 +144,10 @@ func (s *ChangeSet) ChangedKeys() map[string]struct{} {
return m
}
func (s *ChangeSet) Equals(s2 *ChangeSet) bool {
return reflect.DeepEqual(s.Changes, s2.Changes)
}
// Encoded Method
func Len(b []byte) int {

View File

@ -93,7 +93,6 @@ func EncodeStorage(s *ChangeSet) ([]byte, error) {
}
byt := buf.Bytes()
fmt.Println("enc storage", len(byt), len(s.Changes))
return byt, nil
}
@ -123,7 +122,7 @@ func DecodeStorage(b []byte) (*ChangeSet, error) {
//parse not default incarnations
incarnationsLength := len(b[incarnationPosition:])
notDefaultIncarnation := make(map[uint32]uint64, 0)
notDefaultIncarnation := make(map[uint32]uint64)
var (
id uint32
inc uint64
@ -186,11 +185,10 @@ func (b StorageChangeSetBytes) Walk(f func(k, v []byte) error) error {
incarnationPosition := storageEnodingStartElem + numOfItems*(3*common.HashLength)
if uint32(len(b)) < incarnationPosition {
fmt.Println("WalkStorage", numOfItems)
return fmt.Errorf("decode: input too short (%d bytes, expected at least %d bytes)", len(b), incarnationPosition)
}
incarnationsLength := len(b[incarnationPosition:])
notDefaultIncarnation := make(map[uint32]uint64, 0)
notDefaultIncarnation := make(map[uint32]uint64)
var (
id uint32
inc uint64
@ -250,12 +248,11 @@ func (b StorageChangeSetBytes) FindLast(k []byte) ([]byte, error) {
incarnationPosition := storageEnodingStartElem + numOfItems*(3*common.HashLength)
if uint32(len(b)) < incarnationPosition {
fmt.Println("FindLast storage")
return nil, fmt.Errorf("decode: input too short (%d bytes, expected at least %d bytes)", len(b), incarnationPosition)
}
incarnationsLength := len(b[incarnationPosition:])
notDefaultIncarnation := make(map[uint32]uint64, 0)
notDefaultIncarnation := make(map[uint32]uint64)
var (
id uint32
inc uint64

View File

@ -36,16 +36,22 @@ var (
//value - code hash
ContractCodeBucket = []byte("contractCode")
// key - encoded timestamp(block number) + history bucket(hAT/hST)
// value - encoded AccountChangeSet{k - addrHash|compositeKey(for storage) v - account(encoded) | originalValue(common.Hash)}
ChangeSetBucket = []byte("AccountChangeSet")
//AccountChangeSetBucket keeps changesets of accounts
// key - encoded timestamp(block number)
// value - encoded ChangeSet{k - addrHash v - account(encoded).
AccountChangeSetBucket = []byte("ACS")
// StorageChangeSetBucket keeps changesets of storage
// key - encoded timestamp(block number)
// value - encoded ChangeSet{k - compositeKey(for storage) v - originalValue(common.Hash)}.
StorageChangeSetBucket = []byte("SCS")
// some_prefix_of(hash_of_address_of_account) => hash_of_subtrie
IntermediateTrieHashBucket = []byte("iTh")
// DatabaseInfoBucket is used to store information about data layout.
DatabaseInfoBucket = []byte("DBINFO")
// databaseVerisionKey tracks the current database version.
DatabaseVerisionKey = []byte("DatabaseVersion")
@ -85,4 +91,20 @@ var (
// last block that was pruned
// it's saved one in 5 minutes
LastPrunedBlockKey = []byte("LastPrunedBlock")
// LastAppliedMigration keep the name of tle last applied migration.
LastAppliedMigration = []byte("lastAppliedMigration")
//StorageModeHistory - does node save history.
StorageModeHistory = []byte("smHistory")
//StorageModeReceipts - does node save receipts.
StorageModeReceipts = []byte("smReceipts")
//StorageModeTxIndex - does node save transactions index.
StorageModeTxIndex = []byte("smTxIndex")
//StorageModePreImages - does node save hash to value mapping
StorageModePreImages = []byte("smPreImages")
//StorageModeThinHistory - does thin history mode enabled
StorageModeThinHistory = []byte("smThinHistory")
//StorageModeIntermediateTrieHash - does IntermediateTrieHash feature enabled
StorageModeIntermediateTrieHash = []byte("smIntermediateTrieHash")
)

View File

@ -0,0 +1,168 @@
package dbutils
import (
"encoding/binary"
"fmt"
"github.com/ledgerwatch/turbo-geth/common/math"
)
const (
LenBytes = 4
ItemLen = 8
)
func NewHistoryIndex() *HistoryIndexBytes {
b := make(HistoryIndexBytes, LenBytes*2, 16)
return &b
}
func WrapHistoryIndex(b []byte) *HistoryIndexBytes {
index := HistoryIndexBytes(b)
if len(index) == 0 {
index = make(HistoryIndexBytes, LenBytes*2, 16)
}
return &index
}
type HistoryIndexBytes []byte
func (hi *HistoryIndexBytes) Decode() ([]uint64, error) {
if hi == nil {
return []uint64{}, nil
}
if len(*hi) <= LenBytes*2 {
return []uint64{}, nil
}
numOfElements := binary.LittleEndian.Uint32((*hi)[0:LenBytes])
numOfUint32Elements := binary.LittleEndian.Uint32((*hi)[LenBytes : 2*LenBytes])
decoded := make([]uint64, numOfElements)
for i := uint32(0); i < numOfElements; i++ {
if i < numOfUint32Elements {
decoded[i] = uint64(binary.LittleEndian.Uint32((*hi)[LenBytes*2+i*4 : LenBytes*2+i*4+4]))
} else {
decoded[i] = binary.LittleEndian.Uint64((*hi)[LenBytes*2+numOfUint32Elements*4+i*ItemLen : LenBytes*2+i*ItemLen+ItemLen])
}
}
return decoded, nil
}
func (hi *HistoryIndexBytes) Append(v uint64) *HistoryIndexBytes {
numOfElements := binary.LittleEndian.Uint32((*hi)[0:LenBytes])
numOfUint32Elements := binary.LittleEndian.Uint32((*hi)[LenBytes : 2*LenBytes])
var b []byte
if v < math.MaxUint32 {
b = make([]byte, 4)
numOfUint32Elements++
binary.LittleEndian.PutUint32(b, uint32(v))
} else {
b = make([]byte, ItemLen)
binary.LittleEndian.PutUint64(b, v)
}
*hi = append(*hi, b...)
binary.LittleEndian.PutUint32((*hi)[0:LenBytes], numOfElements+1)
binary.LittleEndian.PutUint32((*hi)[LenBytes:2*LenBytes], numOfUint32Elements)
return hi
}
func (hi *HistoryIndexBytes) Len() uint32 {
return binary.LittleEndian.Uint32((*hi)[0:LenBytes])
}
//most common operation is remove one from the tail
func (hi *HistoryIndexBytes) Remove(v uint64) *HistoryIndexBytes {
numOfElements := binary.LittleEndian.Uint32((*hi)[0:LenBytes])
numOfUint32Elements := binary.LittleEndian.Uint32((*hi)[LenBytes : 2*LenBytes])
var currentElement uint64
var elemEnd uint32
var itemLen uint32
Loop:
for i := numOfElements; i > 0; i-- {
if i > numOfUint32Elements {
elemEnd = LenBytes*2 + numOfUint32Elements*4 + (i-numOfUint32Elements)*8
currentElement = binary.LittleEndian.Uint64((*hi)[elemEnd-8 : elemEnd])
itemLen = 8
} else {
elemEnd = LenBytes*2 + i*4
currentElement = uint64(binary.LittleEndian.Uint32((*hi)[elemEnd-4 : elemEnd]))
itemLen = 4
}
switch {
case currentElement == v:
*hi = append((*hi)[:elemEnd-itemLen], (*hi)[elemEnd:]...)
numOfElements--
if itemLen == 4 {
numOfUint32Elements--
}
case currentElement < v:
break Loop
default:
continue
}
}
binary.LittleEndian.PutUint32((*hi)[0:LenBytes], numOfElements)
binary.LittleEndian.PutUint32((*hi)[LenBytes:2*LenBytes], numOfUint32Elements)
return hi
}
func (hi *HistoryIndexBytes) Search(v uint64) (uint64, bool) {
if len(*hi) < 4 {
fmt.Println(1)
return 0, false
}
numOfElements := binary.LittleEndian.Uint32((*hi)[0:LenBytes])
numOfUint32Elements := binary.LittleEndian.Uint32((*hi)[LenBytes : 2*LenBytes])
var itemLen uint32
if numOfElements == 0 {
fmt.Println(2)
return 0, false
}
//check last element
var lastElement uint64
if numOfUint32Elements < numOfElements {
lastElement = binary.LittleEndian.Uint64((*hi)[LenBytes*2+numOfUint32Elements*4+ItemLen*(numOfElements-numOfUint32Elements)-ItemLen : LenBytes*2+numOfUint32Elements*4+ItemLen*(numOfElements-numOfUint32Elements)])
} else {
lastElement = uint64(binary.LittleEndian.Uint32((*hi)[LenBytes*2+numOfUint32Elements*4-4 : LenBytes*2+numOfUint32Elements*4]))
}
if lastElement < v {
return 0, false
}
var currentElement uint64
var elemEnd uint32
for i := numOfElements - 1; i > 0; i-- {
if i > numOfUint32Elements {
elemEnd = LenBytes*2 + numOfUint32Elements*4 + (i-numOfUint32Elements)*8
currentElement = binary.LittleEndian.Uint64((*hi)[elemEnd-8 : elemEnd])
itemLen = 8
} else {
elemEnd = LenBytes*2 + i*4
currentElement = uint64(binary.LittleEndian.Uint32((*hi)[elemEnd-4 : elemEnd]))
itemLen = 4
}
switch {
case currentElement == v:
return v, true
case currentElement < v:
if itemLen == 4 {
return uint64(binary.LittleEndian.Uint32((*hi)[elemEnd : elemEnd+itemLen])), true
}
return binary.LittleEndian.Uint64((*hi)[elemEnd : elemEnd+itemLen]), true
default:
continue
}
}
if numOfUint32Elements == 0 {
return binary.LittleEndian.Uint64((*hi)[LenBytes*2 : 2*LenBytes+ItemLen]), true
}
return uint64(binary.LittleEndian.Uint32((*hi)[2*LenBytes : 2*LenBytes+4])), true
}

View File

@ -0,0 +1,112 @@
package dbutils
import (
"fmt"
"reflect"
"testing"
)
func TestHistoryIndex_Search1(t *testing.T) {
index := NewHistoryIndex()
index.Append(3).Append(5).Append(8)
fmt.Println(index.Decode())
v, _ := index.Search(1)
if v != 3 {
t.Fatal("must be 3 but", v)
}
v, _ = index.Search(3)
if v != 3 {
t.Fatal("must be 3")
}
v, _ = index.Search(4)
if v != 5 {
t.Fatal("must be 5")
}
v, _ = index.Search(5)
if v != 5 {
t.Fatal("must be 5")
}
v, _ = index.Search(7)
if v != 8 {
t.Fatal("must be 8")
}
v, _ = index.Search(8)
if v != 8 {
t.Fatal("must be 8")
}
_, b := index.Search(9)
if b {
t.Fatal("must be not found")
}
}
func TestHistoryIndex_Search_EmptyIndex(t *testing.T) {
index := &HistoryIndexBytes{}
_, b := index.Search(1)
if b {
t.FailNow()
}
}
func TestHistoryIndex_Append(t *testing.T) {
index := NewHistoryIndex()
for i := uint64(1); i < 10; i++ {
index.Append(i)
}
res, err := index.Decode()
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(res, []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9}) {
t.Fatal("Not equal")
}
if index.Len() != 9 {
t.Fatal()
}
index.Remove(9)
res, err = index.Decode()
if err != nil {
t.Fatal(err)
}
if index.Len() != 8 {
t.Fatal()
}
if !reflect.DeepEqual(res, []uint64{1, 2, 3, 4, 5, 6, 7, 8}) {
t.Fatal("Not equal")
}
index.Remove(5)
res, err = index.Decode()
if err != nil {
t.Fatal(err)
}
if index.Len() != 7 {
t.Fatal()
}
if !reflect.DeepEqual(res, []uint64{1, 2, 3, 4, 6, 7, 8}) {
t.Fatal("Not equal")
}
index.Remove(1)
res, err = index.Decode()
if err != nil {
t.Fatal(err)
}
if index.Len() != 6 {
t.Fatal()
}
if !reflect.DeepEqual(res, []uint64{2, 3, 4, 6, 7, 8}) {
t.Fatal("Not equal")
}
}

View File

@ -20,7 +20,10 @@ package eth
import (
"errors"
"fmt"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/migrations"
"math/big"
"reflect"
"runtime"
"sync"
"sync/atomic"
@ -171,6 +174,32 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion)
}
}
err = setStorageModeIfNotExist(chainDb, config.StorageMode)
if err != nil {
return nil, err
}
sm, err := getStorageModeFromDB(chainDb)
if err != nil {
return nil, err
}
if !reflect.DeepEqual(sm, config.StorageMode) {
return nil, errors.New("mode is " + config.StorageMode.ToString() + " original mode is " + sm.ToString())
}
err = migrations.NewMigrator().Apply(
chainDb,
config.StorageMode.History,
config.StorageMode.Receipts,
config.StorageMode.TxIndex,
config.StorageMode.Preimages,
config.StorageMode.ThinHistory,
)
if err != nil {
return nil, err
}
var (
vmConfig = vm.Config{
EnablePreimageRecording: config.EnablePreimageRecording,
@ -592,3 +621,103 @@ func (s *Ethereum) Stop() error {
close(s.shutdownChan)
return nil
}
func setStorageModeIfNotExist(db ethdb.Database, sm StorageMode) error {
var (
err error
)
err = setModeOnEmpty(db, dbutils.StorageModeHistory, sm.History)
if err != nil {
return err
}
err = setModeOnEmpty(db, dbutils.StorageModePreImages, sm.Preimages)
if err != nil {
return err
}
err = setModeOnEmpty(db, dbutils.StorageModeReceipts, sm.Receipts)
if err != nil {
return err
}
err = setModeOnEmpty(db, dbutils.StorageModeTxIndex, sm.TxIndex)
if err != nil {
return err
}
err = setModeOnEmpty(db, dbutils.StorageModeThinHistory, sm.ThinHistory)
if err != nil {
return err
}
err = setModeOnEmpty(db, dbutils.StorageModeIntermediateTrieHash, sm.IntermediateTrieHash)
if err != nil {
return err
}
return nil
}
func setModeOnEmpty(db ethdb.Database, key []byte, currentValue bool) error {
_, err := db.Get(dbutils.DatabaseInfoBucket, key)
if err != nil && err != ethdb.ErrKeyNotFound {
return err
}
if err == ethdb.ErrKeyNotFound {
val := []byte{}
if currentValue {
val = []byte{1}
}
if err = db.Put(dbutils.DatabaseInfoBucket, key, val); err != nil {
return err
}
}
return nil
}
func getStorageModeFromDB(db ethdb.Database) (StorageMode, error) {
var (
sm StorageMode
v []byte
err error
)
v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeHistory)
if err != nil && err != ethdb.ErrKeyNotFound {
return StorageMode{}, err
}
sm.History = len(v) > 0
v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModePreImages)
if err != nil && err != ethdb.ErrKeyNotFound {
return StorageMode{}, err
}
sm.Preimages = len(v) > 0
v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeReceipts)
if err != nil && err != ethdb.ErrKeyNotFound {
return StorageMode{}, err
}
sm.Receipts = len(v) > 0
v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeTxIndex)
if err != nil && err != ethdb.ErrKeyNotFound {
return StorageMode{}, err
}
sm.TxIndex = len(v) > 0
v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeThinHistory)
if err != nil && err != ethdb.ErrKeyNotFound {
return StorageMode{}, err
}
sm.ThinHistory = len(v) > 0
v, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.StorageModeIntermediateTrieHash)
if err != nil && err != ethdb.ErrKeyNotFound {
return StorageMode{}, err
}
sm.IntermediateTrieHash = len(v) > 0
return sm, nil
}

50
eth/backend_test.go Normal file
View File

@ -0,0 +1,50 @@
package eth
import (
"github.com/davecgh/go-spew/spew"
"github.com/ledgerwatch/turbo-geth/ethdb"
"reflect"
"testing"
)
func TestSetStorageModeIfNotExist(t *testing.T) {
db := ethdb.NewMemDatabase()
sm, err := getStorageModeFromDB(db)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(sm, StorageMode{}) {
t.Fatal()
}
err = setStorageModeIfNotExist(db, StorageMode{
true,
true,
true,
true,
true,
true,
})
if err != nil {
t.Fatal(err)
}
sm, err = getStorageModeFromDB(db)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(sm, StorageMode{
true,
true,
true,
true,
true,
true,
}) {
spew.Dump(sm)
t.Fatal("not equal")
}
}

View File

@ -18,6 +18,7 @@ package eth
import (
"fmt"
"github.com/ledgerwatch/turbo-geth/common/debug"
"math/big"
"os"
"os/user"
@ -92,9 +93,10 @@ type StorageMode struct {
TxIndex bool
Preimages bool
IntermediateTrieHash bool
ThinHistory bool
}
var DefaultStorageMode = StorageMode{History: true, Receipts: false, TxIndex: true, Preimages: true}
var DefaultStorageMode = StorageMode{History: true, Receipts: false, TxIndex: true, Preimages: true, ThinHistory: false}
func (m StorageMode) ToString() string {
modeString := ""
@ -113,6 +115,9 @@ func (m StorageMode) ToString() string {
if m.IntermediateTrieHash {
modeString += "i"
}
if m.ThinHistory {
modeString += "n"
}
return modeString
}
@ -128,12 +133,21 @@ func StorageModeFromString(flags string) (StorageMode, error) {
mode.TxIndex = true
case 'p':
mode.Preimages = true
case 'n':
mode.ThinHistory = true
case 'i':
mode.IntermediateTrieHash = true
default:
return mode, fmt.Errorf("unexpected flag found: %c", flag)
}
}
if mode.ThinHistory {
debug.ThinHistory = true
}
if debug.IsThinHistory() {
mode.ThinHistory = true
}
return mode, nil
}

View File

@ -19,6 +19,7 @@ package ethdb
import (
"bytes"
"github.com/ledgerwatch/turbo-geth/core/types/accounts"
"os"
"path"
@ -90,35 +91,22 @@ func (db *BoltDatabase) Put(bucket, key []byte, value []byte) error {
// hBucket (unless changeSetBucketOnly) and AccountChangeSet.
func (db *BoltDatabase) PutS(hBucket, key, value []byte, timestamp uint64, changeSetBucketOnly bool) error {
composite, encodedTS := dbutils.CompositeKeySuffix(key, timestamp)
changeSetKey := dbutils.CompositeChangeSetKey(encodedTS, hBucket)
changeSetKey := encodedTS
err := db.db.Update(func(tx *bolt.Tx) error {
if !changeSetBucketOnly {
hb, err := tx.CreateBucketIfNotExists(hBucket, true)
if err != nil {
return err
}
switch {
case debug.IsThinHistory() && bytes.Equal(hBucket, dbutils.AccountsHistoryBucket):
if debug.IsThinHistory() {
b, _ := hb.Get(key)
b, err = AppendToIndex(b, timestamp)
if err != nil {
log.Error("PutS AppendChangedOnIndex err", "err", err)
index := dbutils.WrapHistoryIndex(b)
index.Append(timestamp)
if err = hb.Put(key, *index); err != nil {
return err
}
if err = hb.Put(key, b); err != nil {
return err
}
case debug.IsThinHistory() && bytes.Equal(hBucket, dbutils.StorageHistoryBucket):
b, _ := hb.Get(key[:common.HashLength+common.IncarnationLength])
b, err = AppendToStorageIndex(b, key[common.HashLength+common.IncarnationLength:common.HashLength+common.IncarnationLength+common.HashLength], timestamp)
if err != nil {
log.Error("PutS AppendChangedOnIndex err", "err", err)
return err
}
if err = hb.Put(key, b); err != nil {
return err
}
default:
} else {
if err = hb.Put(composite, value); err != nil {
return err
}
@ -131,7 +119,7 @@ func (db *BoltDatabase) PutS(hBucket, key, value []byte, timestamp uint64, chang
}
dat, _ := sb.Get(changeSetKey)
dat, err = addToChangeSet(dat, key, value)
dat, err = addToChangeSet(hBucket, dat, key, value)
if err != nil {
log.Error("PutS DecodeChangeSet changeSet err", "err", err)
return err
@ -215,7 +203,8 @@ func (db *BoltDatabase) Get(bucket, key []byte) ([]byte, error) {
// getChangeSetByBlockNoLock returns changeset by block and bucket
func (db *BoltDatabase) GetChangeSetByBlock(hBucket []byte, timestamp uint64) ([]byte, error) {
key := dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(timestamp), hBucket)
key := dbutils.EncodeTimestamp(timestamp)
var dat []byte
err := db.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(dbutils.ChangeSetByIndexBucket(hBucket))
@ -240,8 +229,7 @@ func (db *BoltDatabase) GetChangeSetByBlock(hBucket []byte, timestamp uint64) ([
func (db *BoltDatabase) GetAsOf(bucket, hBucket, key []byte, timestamp uint64) ([]byte, error) {
var dat []byte
err := db.db.View(func(tx *bolt.Tx) error {
switch {
case debug.IsThinHistory() && bytes.Equal(hBucket, dbutils.AccountsHistoryBucket):
if debug.IsThinHistory() {
v, err := BoltDBFindByHistory(tx, hBucket, key, timestamp)
if err != nil {
log.Debug("BoltDB BoltDBFindByHistory err", "err", err)
@ -250,16 +238,7 @@ func (db *BoltDatabase) GetAsOf(bucket, hBucket, key []byte, timestamp uint64) (
copy(dat, v)
return nil
}
case debug.IsThinHistory() && bytes.Equal(hBucket, dbutils.StorageHistoryBucket):
v, err := BoltDBFindStorageByHistory(tx, hBucket, key, timestamp)
if err != nil {
log.Debug("BoltDB BoltDBFindStorageByHistory err", "err", err)
} else {
dat = make([]byte, len(v))
copy(dat, v)
return nil
}
default:
} else {
composite, _ := dbutils.CompositeKeySuffix(key, timestamp)
hB := tx.Bucket(hBucket)
if hB == nil {
@ -637,10 +616,25 @@ func (db *BoltDatabase) DeleteTimestamp(timestamp uint64) error {
if hb == nil {
return nil
}
err := changeset.Walk(v, func(kk, _ []byte) error {
kk = append(kk, encodedTS...)
return hb.Delete(kk)
})
var err error
if debug.IsThinHistory() {
if bytes.Equal(changeSetBucket, dbutils.AccountChangeSetBucket) {
err = changeset.AccountChangeSetBytes(v).Walk(func(kk, _ []byte) error {
kk = append(kk, encodedTS...)
return hb.Delete(kk)
})
} else {
err = changeset.StorageChangeSetBytes(v).Walk(func(kk, _ []byte) error {
kk = append(kk, encodedTS...)
return hb.Delete(kk)
})
}
} else {
err = changeset.Walk(v, func(kk, _ []byte) error {
kk = append(kk, encodedTS...)
return hb.Delete(kk)
})
}
if err != nil {
return err
}
@ -736,3 +730,62 @@ func NewDatabaseWithFreezer(db Database, dir, suffix string) (Database, error) {
// FIXME: implement freezer in Turbo-Geth
return db, nil
}
func BoltDBFindByHistory(tx *bolt.Tx, hBucket []byte, key []byte, timestamp uint64) ([]byte, error) {
//check
hB := tx.Bucket(hBucket)
if hB == nil {
return nil, ErrKeyNotFound
}
v, _ := hB.Get(key)
index := dbutils.WrapHistoryIndex(v)
changeSetBlock, ok := index.Search(timestamp)
if !ok {
return nil, ErrKeyNotFound
}
csB := tx.Bucket(dbutils.ChangeSetByIndexBucket(hBucket))
if csB == nil {
return nil, ErrKeyNotFound
}
csKey := dbutils.EncodeTimestamp(changeSetBlock)
changeSetData, _ := csB.Get(csKey)
var (
data []byte
err error
)
switch {
case debug.IsThinHistory() && bytes.Equal(dbutils.AccountsHistoryBucket, hBucket):
data, err = changeset.AccountChangeSetBytes(changeSetData).FindLast(key)
case debug.IsThinHistory() && bytes.Equal(dbutils.StorageHistoryBucket, hBucket):
data, err = changeset.StorageChangeSetBytes(changeSetData).FindLast(key)
default:
data, err = changeset.FindLast(changeSetData, key)
}
if err != nil {
return nil, ErrKeyNotFound
}
//restore codehash
if bytes.Equal(dbutils.AccountsHistoryBucket, hBucket) {
var acc accounts.Account
if err := acc.DecodeForStorage(data); err != nil {
return nil, err
}
if acc.Incarnation > 0 && acc.IsEmptyCodeHash() {
codeBucket := tx.Bucket(dbutils.ContractCodeBucket)
codeHash, _ := codeBucket.Get(dbutils.GenerateStoragePrefix(common.BytesToHash(key), acc.Incarnation))
if len(codeHash) > 0 {
acc.CodeHash = common.BytesToHash(codeHash)
}
data = make([]byte, acc.EncodingLengthForStorage())
acc.EncodeForStorage(data)
}
return data, nil
}
return data, nil
}

View File

@ -2,8 +2,8 @@ package ethdb
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/ledgerwatch/turbo-geth/common/changeset"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
)
// Maximum length (in bytes of encoded timestamp)
@ -86,57 +86,28 @@ func decode7to8(b []byte) []byte {
// In the production settings, ChangeSets encodings are never modified.
// In production settings (mutation.PutS) we always first populate AccountChangeSet object,
// then encode it once, and then only work with the encoding
func addToChangeSet(b []byte, key []byte, value []byte) ([]byte, error) {
var m int
var n int
func addToChangeSet(hb, b []byte, key []byte, value []byte) ([]byte, error) {
var (
cs *changeset.ChangeSet
err error
)
if len(b) == 0 {
m = len(key)
n = 0
if bytes.Equal(hb, dbutils.AccountsHistoryBucket) {
cs, err = changeset.DecodeAccounts(b)
} else {
n = int(binary.BigEndian.Uint32(b[0:4]))
m = int(binary.BigEndian.Uint32(b[4:8]))
if len(key) != m {
return nil, fmt.Errorf("wrong key size in AccountChangeSet: expected %d, actual %d", m, len(key))
}
cs, err = changeset.DecodeStorage(b)
}
pos := 4
var buffer bytes.Buffer
// Encode n
intArr := make([]byte, 4)
binary.BigEndian.PutUint32(intArr, uint32(n+1))
buffer.Write(intArr)
// KeySize should be the same
if n == 0 {
binary.BigEndian.PutUint32(intArr, uint32(len(key)))
buffer.Write(intArr)
} else {
buffer.Write(b[pos : pos+4])
if err != nil {
return nil, err
}
err = cs.Add(key, value)
if err != nil {
return nil, err
}
pos += 4
// append key
if n == 0 {
buffer.Write(key)
pos += len(key)
if bytes.Equal(hb, dbutils.AccountsHistoryBucket) {
return changeset.EncodeAccounts(cs)
} else {
buffer.Write(b[pos : pos+n*m])
buffer.Write(key)
pos += n * m
return changeset.EncodeStorage(cs)
}
// Append Index
if n == 0 {
binary.BigEndian.PutUint32(intArr, uint32(len(value)))
buffer.Write(intArr)
} else {
buffer.Write(b[pos : pos+4*n])
pos += 4 * n
prev := int(binary.BigEndian.Uint32(b[pos-4 : pos]))
binary.BigEndian.PutUint32(intArr, uint32(prev+len(value)))
buffer.Write(intArr)
buffer.Write(b[pos:])
}
// Append Value
buffer.Write(value)
return buffer.Bytes(), nil
}

View File

@ -1,207 +0,0 @@
package ethdb
import (
"bytes"
"github.com/ledgerwatch/turbo-geth/common/changeset"
"github.com/ledgerwatch/turbo-geth/common/debug"
"sort"
"github.com/ledgerwatch/bolt"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core/types/accounts"
"github.com/ledgerwatch/turbo-geth/rlp"
)
//
type HistoryIndex []uint64
func (hi *HistoryIndex) Encode() ([]byte, error) {
return rlp.EncodeToBytes(hi)
}
func (hi *HistoryIndex) Decode(s []byte) error {
if len(s) == 0 {
return nil
}
return rlp.DecodeBytes(s, &hi)
}
func (hi *HistoryIndex) Append(v uint64) *HistoryIndex {
*hi = append(*hi, v)
if !sort.SliceIsSorted(*hi, func(i, j int) bool {
return (*hi)[i] <= (*hi)[j]
}) {
sort.Slice(*hi, func(i, j int) bool {
return (*hi)[i] <= (*hi)[j]
})
}
return hi
}
//most common operation is remove one from the tail
func (hi *HistoryIndex) Remove(v uint64) *HistoryIndex {
for i := len(*hi) - 1; i >= 0; i-- {
if (*hi)[i] == v {
*hi = append((*hi)[:i], (*hi)[i+1:]...)
}
}
return hi
}
func (hi *HistoryIndex) Search(v uint64) (uint64, bool) {
ln := len(*hi)
if ln == 0 {
return 0, false
}
if (*hi)[ln-1] < v {
return 0, false
}
for i := ln - 1; i >= 0; i-- {
if v == (*hi)[i] {
return v, true
}
if (*hi)[i] < v {
return (*hi)[i+1], true
}
}
return (*hi)[0], true
}
func AppendToIndex(b []byte, timestamp uint64) ([]byte, error) {
v := new(HistoryIndex)
if err := v.Decode(b); err != nil {
return nil, err
}
v.Append(timestamp)
return v.Encode()
}
func RemoveFromIndex(b []byte, timestamp uint64) ([]byte, bool, error) {
v := new(HistoryIndex)
if err := v.Decode(b); err != nil {
return nil, false, err
}
v.Remove(timestamp)
res, err := v.Encode()
if len(*v) == 0 {
return res, true, err
}
return res, false, err
}
func BoltDBFindByHistory(tx *bolt.Tx, hBucket []byte, key []byte, timestamp uint64) ([]byte, error) {
//check
hB := tx.Bucket(hBucket)
if hB == nil {
return nil, ErrKeyNotFound
}
v, _ := hB.Get(key)
index := new(HistoryIndex)
err := index.Decode(v)
if err != nil {
return nil, err
}
changeSetBlock, ok := index.Search(timestamp)
if !ok {
return nil, ErrKeyNotFound
}
csB := tx.Bucket(dbutils.ChangeSetByIndexBucket(hBucket))
if csB == nil {
return nil, ErrKeyNotFound
}
changeSetData, _ := csB.Get(dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(changeSetBlock), hBucket))
var data []byte
switch {
case debug.IsThinHistory() && bytes.Equal(dbutils.AccountsHistoryBucket, hBucket):
data, err = changeset.AccountChangeSetBytes(changeSetData).FindLast(key)
case debug.IsThinHistory() && bytes.Equal(dbutils.StorageChangeSetBucket, hBucket):
data, err = changeset.StorageChangeSetBytes(changeSetData).FindLast(key)
default:
data, err = changeset.FindLast(changeSetData, key)
}
if err != nil {
return nil, ErrKeyNotFound
}
var acc accounts.Account
if err := acc.DecodeForStorage(data); err != nil {
return nil, err
}
if acc.Incarnation > 0 && acc.IsEmptyCodeHash() {
codeBucket := tx.Bucket(dbutils.ContractCodeBucket)
codeHash, _ := codeBucket.Get(dbutils.GenerateStoragePrefix(common.BytesToHash(key), acc.Incarnation))
if len(codeHash) > 0 {
acc.CodeHash = common.BytesToHash(codeHash)
}
data = make([]byte, acc.EncodingLengthForStorage())
acc.EncodeForStorage(data)
}
return data, nil
}
func BoltDBFindStorageByHistory(tx *bolt.Tx, hBucket []byte, key []byte, timestamp uint64) ([]byte, error) {
var k common.Hash
copy(k[:], key[common.HashLength+common.IncarnationLength:])
//check
hB := tx.Bucket(hBucket)
if hB == nil {
return nil, ErrKeyNotFound
}
v, _ := hB.Get(key)
index := NewStorageIndex()
err := index.Decode(v)
if err != nil {
return nil, err
}
changeSetBlock, ok := index.Search(k, timestamp)
if !ok {
return nil, ErrKeyNotFound
}
csB := tx.Bucket(dbutils.ChangeSetByIndexBucket(hBucket))
if csB == nil {
return nil, ErrKeyNotFound
}
cs, _ := csB.Get(dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(changeSetBlock), hBucket))
if err != nil {
return nil, err
}
var data []byte
data, err = changeset.FindLast(cs, key)
if err != nil {
return nil, ErrKeyNotFound
}
var acc accounts.Account
if err := acc.DecodeForStorage(data); err != nil {
return nil, err
}
if acc.Incarnation > 0 && acc.IsEmptyCodeHash() {
codeBucket := tx.Bucket(dbutils.ContractCodeBucket)
codeHash, _ := codeBucket.Get(dbutils.GenerateStoragePrefix(common.BytesToHash(key), acc.Incarnation))
if len(codeHash) > 0 {
acc.CodeHash = common.BytesToHash(codeHash)
}
data = make([]byte, acc.EncodingLengthForStorage())
acc.EncodeForStorage(data)
}
return data, nil
}

View File

@ -1,89 +0,0 @@
package ethdb
import (
"github.com/ledgerwatch/turbo-geth/common"
"reflect"
"testing"
)
func TestHistoryIndex_Search(t *testing.T) {
index := &HistoryIndex{3, 5, 8}
v, _ := index.Search(1)
if v != 3 {
t.Fatal("must be 3")
}
v, _ = index.Search(3)
if v != 3 {
t.Fatal("must be 3")
}
v, _ = index.Search(4)
if v != 5 {
t.Fatal("must be 5")
}
v, _ = index.Search(5)
if v != 5 {
t.Fatal("must be 5")
}
v, _ = index.Search(7)
if v != 8 {
t.Fatal("must be 8")
}
_, b := index.Search(9)
if b {
t.Fatal("must be not found")
}
}
func TestHistoryIndex_Search2(t *testing.T) {
index := &HistoryIndex{}
_, b := index.Search(1)
if b {
t.FailNow()
}
}
func TestStorageIndex_Append(t *testing.T) {
index := NewStorageIndex()
key1, key2 := common.Hash{1}, common.Hash{2}
index.Append(key1, 123)
index.Append(key2, 121)
index.Append(key2, 321)
index.Append(key2, 421)
if !reflect.DeepEqual(index, StorageIndex{
key1: &HistoryIndex{123},
key2: &HistoryIndex{121, 321, 421},
}) {
t.Fatal()
}
v, found := index.Search(key2, 121)
if !found || v != 121 {
t.Fatal(v, found)
}
index.Remove(key1, 123)
index.Remove(key2, 321)
if !reflect.DeepEqual(index, StorageIndex{
key2: &HistoryIndex{121, 421},
}) {
t.Fatal()
}
b, err := index.Encode()
if err != nil {
t.Fatal(err)
}
index2 := NewStorageIndex()
err = index2.Decode(b)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(index, index2) {
t.Fatal()
}
}

View File

@ -11,7 +11,6 @@ import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/common/debug"
"github.com/ledgerwatch/turbo-geth/log"
)
type puts map[string]putsBucket //map[bucket]putsBucket
@ -313,20 +312,14 @@ func (m *mutation) DeleteTimestamp(timestamp uint64) error {
if getErr != nil {
return nil
}
var (
v []byte
isEmpty bool
removeErr error
)
v, isEmpty, removeErr = RemoveFromIndex(indexBytes, timestamp)
if removeErr != nil {
return removeErr
}
if isEmpty {
index := dbutils.WrapHistoryIndex(indexBytes)
index.Remove(timestamp)
if index.Len() == 0 {
m.puts.DeleteStr(string(dbutils.AccountsHistoryBucket), kk)
} else {
m.puts.SetStr(string(dbutils.AccountsHistoryBucket), kk, v)
m.puts.SetStr(string(dbutils.AccountsHistoryBucket), kk, *index)
}
return nil
})
@ -342,20 +335,14 @@ func (m *mutation) DeleteTimestamp(timestamp uint64) error {
if getErr != nil {
return nil
}
var (
v []byte
isEmpty bool
removeErr error
)
v, isEmpty, removeErr = RemoveFromStorageIndex(indexBytes, timestamp)
if removeErr != nil {
return removeErr
}
if isEmpty {
index := dbutils.WrapHistoryIndex(indexBytes)
index.Remove(timestamp)
if index.Len() == 0 {
m.puts.DeleteStr(string(dbutils.StorageHistoryBucket), kk)
} else {
m.puts.SetStr(string(dbutils.StorageHistoryBucket), kk, v)
m.puts.SetStr(string(dbutils.StorageHistoryBucket), kk, *index)
}
return nil
})
@ -405,11 +392,7 @@ func (m *mutation) Commit() (uint64, error) {
if debug.IsThinHistory() {
changedKeys := changes.ChangedKeys()
for k := range changedKeys {
var (
v []byte
key []byte
err error
)
key := []byte(k)
value, err := m.getNoLock(dbutils.AccountsHistoryBucket, key)
if err == ErrKeyNotFound {
if m.db != nil {
@ -419,12 +402,9 @@ func (m *mutation) Commit() (uint64, error) {
}
}
}
v, err = AppendToIndex(value, timestamp)
if err != nil {
log.Error("mutation, append to index", "err", err, "timestamp", timestamp)
continue
}
m.puts.Set(dbutils.AccountsHistoryBucket, []byte(k), v)
index := dbutils.WrapHistoryIndex(value)
index.Append(timestamp)
m.puts.Set(dbutils.AccountsHistoryBucket, []byte(k), *index)
}
}
sort.Sort(changes)
@ -433,11 +413,6 @@ func (m *mutation) Commit() (uint64, error) {
err error
)
if debug.IsThinHistory() {
fmt.Println("AccountCommit")
for _, vv := range changes.Changes {
fmt.Println(common.Bytes2Hex(vv.Key), " - ", common.Bytes2Hex(vv.Value))
}
dat, err = changeset.EncodeAccounts(changes)
} else {
dat, err = changeset.EncodeChangeSet(changes)
@ -452,43 +427,30 @@ func (m *mutation) Commit() (uint64, error) {
}
if len(m.storageChangeSetByBlock) > 0 {
for timestamp, changes := range m.storageChangeSetByBlock {
if debug.IsThinHistory() {
changedKeys := changes.ChangedKeys()
for k := range changedKeys {
var (
v []byte
key []byte
err error
)
key = []byte(k)[:common.HashLength+common.IncarnationLength]
value, err := m.getNoLock(dbutils.StorageHistoryBucket, key)
if err == ErrKeyNotFound {
if m.db != nil {
value, err = m.db.Get(dbutils.StorageHistoryBucket, key)
if err != nil && err != ErrKeyNotFound {
return 0, fmt.Errorf("db.Get failed: %w", err)
}
}
}
v, err = AppendToStorageIndex(value, []byte(k)[common.HashLength+common.IncarnationLength:common.HashLength+common.IncarnationLength+common.HashLength], timestamp)
if err != nil {
log.Error("mutation, append to storage index", "err", err, "timestamp", timestamp)
continue
}
m.puts.Set(dbutils.StorageHistoryBucket, key, v)
}
}
sort.Sort(changes)
var (
dat []byte
err error
)
sort.Sort(changes)
if debug.IsThinHistory() {
fmt.Println("StorageCommit")
for _, vv := range changes.Changes {
fmt.Println(common.Bytes2Hex(vv.Key), " - ", common.Bytes2Hex(vv.Value))
changedKeys := changes.ChangedKeys()
for k := range changedKeys {
key := []byte(k)
value, innerErr := m.getNoLock(dbutils.StorageHistoryBucket, key)
if innerErr == ErrKeyNotFound {
if m.db != nil {
value, innerErr = m.db.Get(dbutils.StorageHistoryBucket, key)
if innerErr != nil && innerErr != ErrKeyNotFound {
return 0, fmt.Errorf("db.Get failed: %w", innerErr)
}
}
}
index := dbutils.WrapHistoryIndex(value)
index.Append(timestamp)
m.puts.Set(dbutils.StorageHistoryBucket, key, *index)
}
dat, err = changeset.EncodeStorage(changes)
if err != nil {
return 0, err
@ -621,7 +583,6 @@ type DBCounterStats struct {
func (d *RWCounterDecorator) Put(bucket, key, value []byte) error {
atomic.AddUint64(&d.DBCounterStats.Put, 1)
fmt.Println("PUT", string(bucket), key)
return d.Database.Put(bucket, key, value)
}

View File

@ -2,7 +2,6 @@ package ethdb
import (
"bytes"
"fmt"
"github.com/ledgerwatch/turbo-geth/common/changeset"
"math/big"
"math/rand"
@ -26,7 +25,6 @@ func TestMutation_DeleteTimestamp(t *testing.T) {
acc := make([]*accounts.Account, 10)
addrHashes := make([]common.Hash, 10)
for i := range acc {
fmt.Println(i)
acc[i], addrHashes[i] = randomAccount(t)
b := make([]byte, acc[i].EncodingLengthForStorage())
acc[i].EncodeForStorage(b)
@ -54,12 +52,12 @@ func TestMutation_DeleteTimestamp(t *testing.T) {
if err != nil {
t.Fatal(err)
}
index := new(HistoryIndex)
err = index.Decode(csData)
if err != nil {
t.Fatal(err)
index := dbutils.WrapHistoryIndex(csData)
parsed, innerErr := index.Decode()
if innerErr != nil {
t.Fatal(innerErr)
}
if (*index)[0] != 1 {
if parsed[0] != 1 {
t.Fatal("incorrect block num")
}
@ -185,7 +183,7 @@ func TestMutationCommit(t *testing.T) {
t.Fatal(err)
}
expectedChangeSet := changeset.NewChangeSet()
expectedChangeSet := changeset.NewAccountChangeSet()
for i := range addrHashes {
b := make([]byte, accHistory[i].EncodingLengthForStorage())
accHistory[i].EncodeForStorage(b)
@ -213,7 +211,7 @@ func TestMutationCommit(t *testing.T) {
t.FailNow()
}
expectedChangeSet = changeset.NewChangeSet()
expectedChangeSet = changeset.NewStorageChangeSet()
for i, addrHash := range addrHashes {
for j := 0; j < numOfStateKeys; j++ {
key := common.Hash{uint8(i*100 + j)}
@ -278,13 +276,13 @@ func TestMutationCommitThinHistory(t *testing.T) {
if err != nil {
t.Fatal("error on get account", i, err)
}
index := new(HistoryIndex)
err = index.Decode(b)
index := dbutils.WrapHistoryIndex(b)
parsedIndex, err := index.Decode()
if err != nil {
t.Fatal("error on get account", i, err)
}
if (*index)[0] != 1 && len(*index) != 1 {
if parsedIndex[0] != 1 && index.Len() != 1 {
t.Fatal("incorrect history index")
}
@ -300,29 +298,19 @@ func TestMutationCommitThinHistory(t *testing.T) {
if !reflect.DeepEqual(resAccStorage, accStateStorage[i]) {
spew.Dump("res", resAccStorage)
spew.Dump("expected", accStateStorage[i])
t.Log("incorrect storage", i)
t.Fatal("incorrect storage", i)
}
v, err := db.Get(dbutils.StorageHistoryBucket, dbutils.GenerateStoragePrefix(addrHash, acc.Incarnation))
if err != nil {
t.Fatal(err)
}
for k, v := range accHistoryStateStorage[i] {
res, err := db.GetAsOf(dbutils.StorageBucket, dbutils.StorageHistoryBucket, dbutils.GenerateCompositeStorageKey(addrHash, acc.Incarnation, k), 1)
if err != nil {
t.Fatal(err)
}
storageIndex := NewStorageIndex()
err = storageIndex.Decode(v)
if err != nil {
t.Fatal(err)
}
expectedIndex := NewStorageIndex()
for j := range accHistoryStateStorage[i] {
expectedIndex.Append(j, 1)
}
if !reflect.DeepEqual(expectedIndex, storageIndex) {
spew.Dump("res", storageIndex)
spew.Dump("expected", expectedIndex)
spew.Dump("orig", accHistoryStateStorage[i])
t.Fatal("incorrect history storage", i)
resultHash := common.BytesToHash(res)
if resultHash != v {
t.Fatal("incorrect storage history for ", addrHash.String(), v, resultHash)
}
}
}
@ -331,7 +319,7 @@ func TestMutationCommitThinHistory(t *testing.T) {
t.Fatal(err)
}
expectedChangeSet := changeset.NewChangeSet()
expectedChangeSet := changeset.NewAccountChangeSet()
for i := range addrHashes {
b := make([]byte, accHistory[i].EncodingLengthForStorage())
accHistory[i].EncodeForStorage(b)
@ -357,7 +345,7 @@ func TestMutationCommitThinHistory(t *testing.T) {
t.FailNow()
}
expectedChangeSet = changeset.NewChangeSet()
expectedChangeSet = changeset.NewStorageChangeSet()
for i, addrHash := range addrHashes {
for j := 0; j < numOfStateKeys; j++ {
key := common.Hash{uint8(i*100 + j)}

View File

@ -1,88 +0,0 @@
package ethdb
import (
"bytes"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ugorji/go/codec"
)
func NewStorageIndex() StorageIndex {
return make(StorageIndex)
}
type StorageIndex map[common.Hash]*HistoryIndex
func (si StorageIndex) Encode() ([]byte, error) {
var w bytes.Buffer
var handle codec.CborHandle
//handle.WriterBufferSize = 1024
encoder := codec.NewEncoder(&w, &handle)
err := encoder.Encode(si)
if err != nil {
return nil, err
}
return w.Bytes(), nil
}
func (si StorageIndex) Decode(s []byte) error {
if len(s) == 0 {
return nil
}
var handle codec.CborHandle
decoder := codec.NewDecoder(bytes.NewBuffer(s), &handle)
return decoder.Decode(si)
}
func (si StorageIndex) Append(key common.Hash, val uint64) {
if _, ok := si[key]; !ok {
si[key] = new(HistoryIndex)
}
si[key] = si[key].Append(val)
}
//most common operation is remove one from the tail
func (si StorageIndex) Remove(key common.Hash, val uint64) {
if v, ok := si[key]; ok && v != nil {
v = v.Remove(val)
if len(*v) == 0 {
delete(si, key)
} else {
si[key] = v
}
}
}
func (si StorageIndex) Search(key common.Hash, val uint64) (uint64, bool) {
if v, ok := si[key]; ok && v != nil {
return v.Search(val)
}
return 0, false
}
func AppendToStorageIndex(b []byte, key []byte, timestamp uint64) ([]byte, error) {
v := NewStorageIndex()
if err := v.Decode(b); err != nil {
return nil, err
}
v.Append(common.BytesToHash(key), timestamp)
return v.Encode()
}
func RemoveFromStorageIndex(b []byte, timestamp uint64) ([]byte, bool, error) {
v := NewStorageIndex()
if err := v.Decode(b); err != nil {
return nil, false, err
}
for key := range v {
v.Remove(key, timestamp)
}
res, err := v.Encode()
if len(v) == 0 {
return res, true, err
}
return res, false, err
}

View File

@ -49,10 +49,9 @@ func RewindData(db Getter, timestampSrc, timestampDst uint64, df func(bucket, ke
}
var innerErr error
fmt.Println("AccountChangeSetBytes walk")
if debug.IsThinHistory() {
innerErr = changeset.AccountChangeSetBytes(v).Walk(func(kk, vv []byte) error {
fmt.Println(common.Bytes2Hex(kk), " - ", common.Bytes2Hex(vv))
if _, ok = t[string(kk)]; !ok {
t[string(kk)] = vv
}
@ -90,11 +89,9 @@ func RewindData(db Getter, timestampSrc, timestampDst uint64, df func(bucket, ke
}
var innerErr error
fmt.Println("StorageChangeSetBytes walk")
v = common.CopyBytes(v) // Making copy because otherwise it will be invalid after the transaction
if debug.IsThinHistory() {
innerErr = changeset.StorageChangeSetBytes(v).Walk(func(kk, vv []byte) error {
fmt.Println(common.Bytes2Hex(kk), " - ", common.Bytes2Hex(vv))
if _, ok = t[string(kk)]; !ok {
t[string(kk)] = vv
}

View File

@ -0,0 +1,178 @@
package migrations
import (
"bytes"
"errors"
"github.com/ledgerwatch/bolt"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/changeset"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"time"
)
var (
//ChangeSetBucket - key - encoded timestamp(block number) + history bucket(hAT/hST)
// value - encoded ChangeSet{k - addrHash|compositeKey(for storage) v - account(encoded) | originalValue(common.Hash)}
ChangeSetBucket = []byte("ChangeSet")
//LastBatchKey - last inserted key
LastBatchKey = []byte("lastBatchKeyForSplitChangesetMigration")
)
const splitChangesetBatchSize = 5000
func splitChangeSetMigration(batchSize int) Migration {
return Migration{
Name: "split_changeset",
Up: func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error {
boltDB, ok := db.(*ethdb.BoltDatabase)
if !ok {
return errors.New("only boltdb migration")
}
var rowNum int
changesetsToRemove := make([][]byte, 0)
accChangesets := make([][]byte, 0)
storageChangesets := make([][]byte, 0)
var (
currentKey, currentValue []byte
done bool
)
currentKey, err := db.Get(dbutils.DatabaseInfoBucket, LastBatchKey)
if err != nil && err != ethdb.ErrKeyNotFound {
return err
}
startTime := time.Now()
for !done {
err := boltDB.DB().Update(func(tx *bolt.Tx) error {
changesetBucket := tx.Bucket(ChangeSetBucket)
dbInfoBucket, err := tx.CreateBucketIfNotExists(dbutils.DatabaseInfoBucket, false)
if err != nil {
return err
}
if changesetBucket == nil {
done = true
return nil
}
changesetCursor := changesetBucket.Cursor()
if currentKey == nil {
currentKey, currentValue = changesetCursor.First()
} else {
currentKey, currentValue = changesetCursor.Seek(currentKey)
}
for currentKey != nil {
changesetsToRemove = append(changesetsToRemove, currentKey)
ts, bucket := dbutils.DecodeTimestamp(currentKey)
encTS := dbutils.EncodeTimestamp(ts)
switch {
case bytes.Equal(dbutils.AccountsHistoryBucket, bucket):
if thinHistory {
cs, innerErr := changeset.DecodeChangeSet(currentValue)
if innerErr != nil {
return innerErr
}
v, innerErr := changeset.EncodeAccounts(cs)
if innerErr != nil {
return innerErr
}
accChangesets = append(accChangesets, encTS, v)
} else {
accChangesets = append(accChangesets, encTS, currentValue)
}
case bytes.Equal(dbutils.StorageHistoryBucket, bucket):
if thinHistory {
cs, innerErr := changeset.DecodeChangeSet(currentValue)
if innerErr != nil {
return innerErr
}
v, innerErr := changeset.EncodeStorage(cs)
if innerErr != nil {
log.Error("Error on encode storage changeset", "err", innerErr)
return innerErr
}
storageChangesets = append(storageChangesets, encTS, v)
} else {
storageChangesets = append(storageChangesets, encTS, currentValue)
}
}
currentKey, currentValue = changesetCursor.Next()
if rowNum >= batchSize || currentKey == nil {
commTime := time.Now()
if len(storageChangesets) > 0 {
storageCSBucket, innerErr := tx.CreateBucketIfNotExists(dbutils.StorageChangeSetBucket, false)
if innerErr != nil {
return innerErr
}
innerErr = storageCSBucket.MultiPut(storageChangesets...)
if innerErr != nil {
return innerErr
}
}
if len(accChangesets) > 0 {
accCSBucket, innerErr := tx.CreateBucketIfNotExists(dbutils.AccountChangeSetBucket, false)
if innerErr != nil {
return innerErr
}
innerErr = accCSBucket.MultiPut(accChangesets...)
if innerErr != nil {
return innerErr
}
}
if len(changesetsToRemove) > 0 {
for _, v := range changesetsToRemove {
innerErr := changesetBucket.Delete(v)
if innerErr != nil {
return innerErr
}
}
}
log.Warn("Commit", "block", ts, "commit time", time.Since(commTime), "migration time", time.Since(startTime))
accChangesets = make([][]byte, 0)
storageChangesets = make([][]byte, 0)
changesetsToRemove = make([][]byte, 0)
rowNum = 0
break
} else {
rowNum++
}
}
if currentKey == nil {
done = true
err = dbInfoBucket.Delete(LastBatchKey)
if err != nil {
return err
}
} else {
currentKey = common.CopyBytes(currentKey)
err = dbInfoBucket.Put(LastBatchKey, currentKey)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
}
return nil
},
}
}

View File

@ -0,0 +1,321 @@
package migrations
import (
"fmt"
"github.com/davecgh/go-spew/spew"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/changeset"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb"
"testing"
)
func TestChangeSetMigrationSuccess(t *testing.T) {
f := func(t *testing.T, n, batchSize int) {
db := ethdb.NewMemDatabase()
accCS, storageCS := generateChangeSets(t, n)
for i, v := range accCS {
enc, err := changeset.EncodeChangeSet(v)
if err != nil {
t.Error(err)
}
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.AccountsHistoryBucket), enc)
if err != nil {
t.Error(err)
}
}
for i, v := range storageCS {
enc, err := changeset.EncodeChangeSet(v)
if err != nil {
t.Error(err)
}
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.StorageHistoryBucket), enc)
if err != nil {
t.Error(err)
}
}
migrator := NewMigrator()
migrator.Migrations = []Migration{splitChangeSetMigration(batchSize)}
err := migrator.Apply(db, false, false, false, false, false)
if err != nil {
t.Error(err)
}
err = db.Walk(ChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
ts, bucket := dbutils.DecodeTimestamp(k)
return false, fmt.Errorf("changeset bucket is not empty block %v bucket %v", ts, string(bucket))
})
if err != nil {
t.Error(err)
}
err = db.Walk(dbutils.AccountChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
blockNum, _ := dbutils.DecodeTimestamp(k)
cs, innerErr := changeset.DecodeChangeSet(v)
if innerErr != nil {
return false, fmt.Errorf("decode fails block - %v err: %v", blockNum, innerErr)
}
if !cs.Equals(accCS[blockNum]) {
spew.Dump(cs)
spew.Dump(accCS[blockNum])
return false, fmt.Errorf("not equal (%v)", blockNum)
}
return true, nil
})
if err != nil {
t.Error(err)
}
err = db.Walk(dbutils.StorageHistoryBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
blockNum, _ := dbutils.DecodeTimestamp(k)
cs, innerErr := changeset.DecodeChangeSet(v)
if innerErr != nil {
return false, fmt.Errorf("decode fails block - %v err: %v", blockNum, innerErr)
}
if !cs.Equals(storageCS[blockNum]) {
spew.Dump(cs)
spew.Dump(storageCS[blockNum])
return false, fmt.Errorf("not equal (%v)", blockNum)
}
return true, nil
})
if err != nil {
t.Error(err)
}
}
t.Run("less batch", func(t *testing.T) {
f(t, 50, 60)
})
t.Run("more batch", func(t *testing.T) {
f(t, 100, 60)
})
t.Run("two batches", func(t *testing.T) {
f(t, 100, 50)
})
}
func TestChangeSetMigrationThinHistorySuccess(t *testing.T) {
f := func(t *testing.T, n, batchSize int) {
db := ethdb.NewMemDatabase()
accCS, storageCS := generateChangeSets(t, n)
for i, v := range accCS {
enc, err := changeset.EncodeChangeSet(v)
if err != nil {
t.Error(err)
}
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.AccountsHistoryBucket), enc)
if err != nil {
t.Error(err)
}
}
for i, v := range storageCS {
enc, err := changeset.EncodeChangeSet(v)
if err != nil {
t.Error(err)
}
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.StorageHistoryBucket), enc)
if err != nil {
t.Error(err)
}
}
migrator := NewMigrator()
migrator.Migrations = []Migration{splitChangeSetMigration(batchSize)}
err := migrator.Apply(db, false, false, false, false, true)
if err != nil {
t.Error(err)
}
err = db.Walk(ChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
ts, bucket := dbutils.DecodeTimestamp(k)
return false, fmt.Errorf("changeset bucket is not empty block %v bucket %v", ts, string(bucket))
})
if err != nil {
t.Error(err)
}
err = db.Walk(dbutils.AccountChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
blockNum, _ := dbutils.DecodeTimestamp(k)
cs, innerErr := changeset.DecodeAccounts(v)
if innerErr != nil {
return false, innerErr
}
if !cs.Equals(accCS[blockNum]) {
spew.Dump(cs)
spew.Dump(accCS[blockNum])
return false, fmt.Errorf("not equal %v", blockNum)
}
return true, nil
})
if err != nil {
t.Error(err)
}
err = db.Walk(dbutils.StorageHistoryBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
blockNum, _ := dbutils.DecodeTimestamp(k)
cs, innerErr := changeset.DecodeStorage(v)
if innerErr != nil {
t.Error(innerErr, blockNum)
}
if !cs.Equals(storageCS[blockNum]) {
spew.Dump(cs)
spew.Dump(storageCS[blockNum])
return false, fmt.Errorf("not equal (%v)", blockNum)
}
return true, nil
})
if err != nil {
t.Error(err)
}
}
t.Run("less batch", func(t *testing.T) {
f(t, 50, 60)
})
t.Run("more batch", func(t *testing.T) {
f(t, 100, 60)
})
t.Run("two batches", func(t *testing.T) {
f(t, 100, 50)
})
}
func TestChangeSetMigrationFail(t *testing.T) {
db := ethdb.NewMemDatabase()
accCS, storageCS := generateChangeSets(t, 50)
for i, v := range accCS {
enc, err := changeset.EncodeChangeSet(v)
if err != nil {
t.Error(err)
}
if i == 25 {
//incorrect value
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.AccountsHistoryBucket), enc[:5])
if err != nil {
t.Error(err)
}
} else {
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.AccountsHistoryBucket), enc)
if err != nil {
t.Error(err)
}
}
}
for i, v := range storageCS {
enc, err := changeset.EncodeChangeSet(v)
if err != nil {
t.Error(err)
}
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(i)), dbutils.StorageHistoryBucket), enc)
if err != nil {
t.Error(err)
}
}
migrator := NewMigrator()
migrator.Migrations = []Migration{splitChangeSetMigration(20)}
err := migrator.Apply(db, false, false, false, false, true)
if err == nil {
t.Error("should fail")
}
_, err = db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration)
if err == nil {
t.Error("should fail")
}
//fix incorrect changeset
enc, err := changeset.EncodeChangeSet(accCS[25])
if err != nil {
t.Error(err)
}
err = db.Put(ChangeSetBucket, dbutils.CompositeChangeSetKey(dbutils.EncodeTimestamp(uint64(25)), dbutils.AccountsHistoryBucket), enc)
if err != nil {
t.Error(err)
}
err = migrator.Apply(db, false, false, false, false, true)
if err != nil {
t.Error(err)
}
err = db.Walk(ChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
ts, bucket := dbutils.DecodeTimestamp(k)
return false, fmt.Errorf("changeset bucket is not empty block %v bucket %v", ts, string(bucket))
})
if err != nil {
t.Error(err)
}
err = db.Walk(dbutils.AccountChangeSetBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
blockNum, _ := dbutils.DecodeTimestamp(k)
cs, innerErr := changeset.DecodeAccounts(v)
if innerErr != nil {
return false, innerErr
}
if !cs.Equals(accCS[blockNum]) {
spew.Dump(cs)
spew.Dump(accCS[blockNum])
return false, fmt.Errorf("not equal %v", blockNum)
}
return true, nil
})
if err != nil {
t.Error(err)
}
err = db.Walk(dbutils.StorageHistoryBucket, []byte{}, 0, func(k, v []byte) (b bool, e error) {
blockNum, _ := dbutils.DecodeTimestamp(k)
cs, innerErr := changeset.DecodeStorage(v)
if innerErr != nil {
t.Error(innerErr, blockNum)
}
if !cs.Equals(storageCS[blockNum]) {
spew.Dump(cs)
spew.Dump(storageCS[blockNum])
return false, fmt.Errorf("not equal (%v)", blockNum)
}
return true, nil
})
if err != nil {
t.Error(err)
}
}
func generateChangeSets(t *testing.T, n int) ([]*changeset.ChangeSet, []*changeset.ChangeSet) {
t.Helper()
accChangeset := make([]*changeset.ChangeSet, n)
storageChangeset := make([]*changeset.ChangeSet, n)
for i := 0; i < n; i++ {
csAcc := changeset.NewChangeSet()
hash := common.Hash{uint8(i)}
err := csAcc.Add(hash.Bytes(), hash.Bytes())
if err != nil {
t.Fatal(err)
}
accChangeset[i] = csAcc
csStorage := changeset.NewChangeSet()
err = csStorage.Add(
dbutils.GenerateCompositeStorageKey(
hash,
^uint64(1),
hash,
),
hash.Bytes(),
)
if err != nil {
t.Fatal(err)
}
storageChangeset[i] = csStorage
}
return accChangeset, storageChangeset
}

59
migrations/migrations.go Normal file
View File

@ -0,0 +1,59 @@
package migrations
import (
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
)
type Migration struct {
Name string
Up func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error
}
func NewMigrator() *Migrator {
return &Migrator{
Migrations: migrations,
}
}
type Migrator struct {
Migrations []Migration
}
func (m *Migrator) Apply(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error {
if len(m.Migrations) == 0 {
return nil
}
lastApplied, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration)
if err != nil && err != ethdb.ErrKeyNotFound {
return err
}
i := len(m.Migrations) - 1
for ; i >= 0; i-- {
if m.Migrations[i].Name == string(lastApplied) {
break
}
}
m.Migrations = m.Migrations[i+1:]
for _, v := range m.Migrations {
log.Warn("Apply migration", v.Name)
err := v.Up(db, history, receipts, txIndex, preImages, thinHistory)
if err != nil {
return err
}
err = db.Put(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration, []byte(v.Name))
if err != nil {
return err
}
log.Warn("Applied migration", v.Name)
}
return nil
}
var migrations = []Migration{
splitChangeSetMigration(splitChangesetBatchSize),
}

View File

@ -0,0 +1,76 @@
package migrations
import (
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb"
"testing"
)
func TestApplyWithInit(t *testing.T) {
db := ethdb.NewMemDatabase()
migrations = []Migration{
{
"one",
func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error {
return nil
},
},
{
"two",
func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error {
return nil
},
},
}
migrator := NewMigrator()
migrator.Migrations = migrations
err := migrator.Apply(db, false, false, false, false, false)
if err != nil {
t.Fatal()
}
v, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration)
if err != nil {
t.Fatal(err)
}
if string(v) != migrations[1].Name {
t.Fatal()
}
}
func TestApplyWithoutInit(t *testing.T) {
db := ethdb.NewMemDatabase()
migrations = []Migration{
{
"one",
func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error {
t.Fatal("shouldn't been executed")
return nil
},
},
{
"two",
func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error {
return nil
},
},
}
err := db.Put(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration, []byte(migrations[0].Name))
if err != nil {
t.Fatal()
}
migrator := NewMigrator()
migrator.Migrations = migrations
err = migrator.Apply(db, false, false, false, false, false)
if err != nil {
t.Fatal()
}
v, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration)
if err != nil {
t.Fatal(err)
}
if string(v) != migrations[1].Name {
t.Fatal()
}
}