mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
THIN_HISTORY - implement WalkAsOf and fix corresponding tests (#441)
* Implement WalkAsOf for THIN_HISTORY * Fix compile * Fix linter * Fix linter * Fix linter * Fix linter * Fix linter * Fix broken tests * Fix comment
This commit is contained in:
parent
535d73be5f
commit
2d7832c62e
@ -344,7 +344,7 @@ func (b StorageChangeSetBytes) Walk(f func(k, v []byte) error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b StorageChangeSetBytes) Find(k []byte) ([]byte, error) {
|
||||
func (b StorageChangeSetBytes) Find(addrHash []byte, keyHash []byte) ([]byte, error) {
|
||||
if len(b) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
@ -365,7 +365,7 @@ func (b StorageChangeSetBytes) Find(k []byte) ([]byte, error) {
|
||||
//todo[boris] here should be binary search
|
||||
for i := uint32(0); i < uint32(numOfUniqueItems); i++ {
|
||||
elemStart = storageEnodingLengthOfNumOfElements + storageEnodingLengthOfDict + i*common.HashLength
|
||||
if bytes.Equal(k[0:common.HashLength], b[elemStart:elemStart+common.HashLength]) {
|
||||
if bytes.Equal(addrHash, b[elemStart:elemStart+common.HashLength]) {
|
||||
found = true
|
||||
addHashID = i
|
||||
break
|
||||
@ -404,7 +404,7 @@ func (b StorageChangeSetBytes) Find(k []byte) ([]byte, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
if !bytes.Equal(k[common.HashLength+common.IncarnationLength:2*common.HashLength+common.IncarnationLength], b[elemStart+elemLength:elemStart+elemLength+common.HashLength]) {
|
||||
if !bytes.Equal(keyHash, b[elemStart+elemLength:elemStart+elemLength+common.HashLength]) {
|
||||
continue
|
||||
}
|
||||
return findVal(b[lenOfValsPos:valuesPos], b[valuesPos:], i, numOfUint8, numOfUint16, numOfUint32), nil
|
||||
|
@ -197,7 +197,7 @@ func TestEncodingStorageWithoutNotDefaultIncarnationFind(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, v := range ch.Changes {
|
||||
val, err := StorageChangeSetBytes(b).Find(v.Key)
|
||||
val, err := StorageChangeSetBytes(b).Find(v.Key[:common.HashLength], v.Key[common.HashLength+common.IncarnationLength:])
|
||||
if err != nil {
|
||||
t.Error(err, i)
|
||||
}
|
||||
@ -223,4 +223,3 @@ func TestEncodingStorageWithoutNotDefaultIncarnationFind(t *testing.T) {
|
||||
f(t, 10000)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
|
||||
"github.com/ledgerwatch/turbo-geth/common"
|
||||
"github.com/ledgerwatch/turbo-geth/common/dbutils"
|
||||
"github.com/ledgerwatch/turbo-geth/common/debug"
|
||||
"github.com/ledgerwatch/turbo-geth/core/types/accounts"
|
||||
"github.com/ledgerwatch/turbo-geth/ethdb"
|
||||
"github.com/ledgerwatch/turbo-geth/log"
|
||||
@ -69,8 +70,12 @@ func (dbs *DbState) ForEachStorage(addr common.Address, start []byte, cb func(ke
|
||||
st := llrb.New()
|
||||
var s [common.HashLength + common.IncarnationLength + common.HashLength]byte
|
||||
copy(s[:], addrHash[:])
|
||||
// TODO: [Issue 99] support incarnations
|
||||
binary.BigEndian.PutUint64(s[common.HashLength:], ^uint64(FirstContractIncarnation))
|
||||
accData, _ := dbs.db.GetAsOf(dbutils.AccountsBucket, dbutils.AccountsHistoryBucket, addrHash[:], dbs.blockNr+1)
|
||||
var acc accounts.Account
|
||||
if err = acc.DecodeForStorage(accData); err != nil {
|
||||
log.Error("Error decoding account", "error", err)
|
||||
}
|
||||
binary.BigEndian.PutUint64(s[common.HashLength:], ^acc.Incarnation)
|
||||
copy(s[common.HashLength+common.IncarnationLength:], start)
|
||||
var lastSecKey common.Hash
|
||||
overrideCounter := 0
|
||||
@ -89,7 +94,7 @@ func (dbs *DbState) ForEachStorage(addr common.Address, start []byte, cb func(ke
|
||||
})
|
||||
}
|
||||
numDeletes := st.Len() - overrideCounter
|
||||
err = dbs.db.WalkAsOf(dbutils.StorageBucket, dbutils.StorageHistoryBucket, s[:], 0, dbs.blockNr+1, func(ks, vs []byte) (bool, error) {
|
||||
err = dbs.db.WalkAsOf(dbutils.StorageBucket, dbutils.StorageHistoryBucket, s[:], 8*(common.HashLength+common.IncarnationLength), dbs.blockNr+1, func(ks, vs []byte) (bool, error) {
|
||||
if !bytes.HasPrefix(ks, addrHash[:]) {
|
||||
return false, nil
|
||||
}
|
||||
@ -97,7 +102,12 @@ func (dbs *DbState) ForEachStorage(addr common.Address, start []byte, cb func(ke
|
||||
// Skip deleted entries
|
||||
return true, nil
|
||||
}
|
||||
seckey := ks[common.HashLength+common.IncarnationLength:]
|
||||
var seckey []byte
|
||||
if debug.IsThinHistory() {
|
||||
seckey = ks[common.HashLength:]
|
||||
} else {
|
||||
seckey = ks[common.HashLength+common.IncarnationLength:]
|
||||
}
|
||||
//fmt.Printf("seckey: %x\n", seckey)
|
||||
si := storageItem{}
|
||||
copy(si.seckey[:], seckey)
|
||||
|
@ -29,7 +29,6 @@ import (
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/ledgerwatch/turbo-geth/common"
|
||||
"github.com/ledgerwatch/turbo-geth/common/debug"
|
||||
"github.com/ledgerwatch/turbo-geth/core/state"
|
||||
"github.com/ledgerwatch/turbo-geth/crypto"
|
||||
"github.com/ledgerwatch/turbo-geth/ethdb"
|
||||
@ -68,9 +67,6 @@ func (h resultHash) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
||||
func (h resultHash) Less(i, j int) bool { return bytes.Compare(h[i].Bytes(), h[j].Bytes()) < 0 }
|
||||
|
||||
func TestAccountRange(t *testing.T) {
|
||||
if debug.IsThinHistory() {
|
||||
t.Skip()
|
||||
}
|
||||
var (
|
||||
db = ethdb.NewMemDatabase()
|
||||
tds = state.NewTrieDbState(common.Hash{}, db, 0)
|
||||
@ -164,9 +160,6 @@ func TestAccountRange(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestEmptyAccountRange(t *testing.T) {
|
||||
if debug.IsThinHistory() {
|
||||
t.Skip()
|
||||
}
|
||||
var (
|
||||
statedb = state.NewDbState(ethdb.NewMemDatabase(), 0)
|
||||
)
|
||||
@ -184,9 +177,6 @@ func TestEmptyAccountRange(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStorageRangeAt(t *testing.T) {
|
||||
if debug.IsThinHistory() {
|
||||
t.Skip()
|
||||
}
|
||||
// Create a state where account 0x010000... has a few storage entries.
|
||||
var (
|
||||
db = ethdb.NewMemDatabase()
|
||||
|
256
ethdb/bolt_db.go
256
ethdb/bolt_db.go
@ -20,6 +20,7 @@ package ethdb
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
@ -334,13 +335,260 @@ func (db *BoltDatabase) MultiWalk(bucket []byte, startkeys [][]byte, fixedbits [
|
||||
return err
|
||||
}
|
||||
|
||||
func (db *BoltDatabase) walkAsOfThin(bucket, hBucket, startkey []byte, fixedbits uint, timestamp uint64, walker func(k []byte, v []byte) (bool, error)) error {
|
||||
panic("")
|
||||
func (db *BoltDatabase) walkAsOfThinAccounts(startkey []byte, fixedbits uint, timestamp uint64, walker func(k []byte, v []byte) (bool, error)) error {
|
||||
fixedbytes, mask := Bytesmask(fixedbits)
|
||||
err := db.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(dbutils.AccountsBucket)
|
||||
if b == nil {
|
||||
return fmt.Errorf("accountsBucket not found")
|
||||
}
|
||||
hB := tx.Bucket(dbutils.AccountsHistoryBucket)
|
||||
if hB == nil {
|
||||
return fmt.Errorf("accountsHistoryBucket not found")
|
||||
}
|
||||
csB := tx.Bucket(dbutils.AccountChangeSetBucket)
|
||||
if csB == nil {
|
||||
return fmt.Errorf("accountChangeBucket not found")
|
||||
}
|
||||
//for state
|
||||
mainCursor := b.Cursor()
|
||||
//for historic data
|
||||
historyCursor := hB.Cursor()
|
||||
k, v := mainCursor.Seek(startkey)
|
||||
hK, hV := historyCursor.Seek(startkey)
|
||||
goOn := true
|
||||
var err error
|
||||
for goOn {
|
||||
//exit or next conditions
|
||||
if k != nil && fixedbits > 0 && !bytes.Equal(k[:fixedbytes-1], startkey[:fixedbytes-1]) {
|
||||
k = nil
|
||||
}
|
||||
if k != nil && fixedbits > 0 && (k[fixedbytes-1]&mask) != (startkey[fixedbytes-1]&mask) {
|
||||
k = nil
|
||||
}
|
||||
if hK != nil && fixedbits > 0 && !bytes.Equal(hK[:fixedbytes-1], startkey[:fixedbytes-1]) {
|
||||
hK = nil
|
||||
}
|
||||
if hK != nil && fixedbits > 0 && (hK[fixedbytes-1]&mask) != (startkey[fixedbytes-1]&mask) {
|
||||
hK = nil
|
||||
}
|
||||
var cmp int
|
||||
if k == nil {
|
||||
if hK == nil {
|
||||
break
|
||||
} else {
|
||||
cmp = 1
|
||||
}
|
||||
} else if hK == nil {
|
||||
cmp = -1
|
||||
} else {
|
||||
cmp = bytes.Compare(k, hK)
|
||||
}
|
||||
if cmp < 0 {
|
||||
goOn, err = walker(k, v)
|
||||
} else {
|
||||
index := dbutils.WrapHistoryIndex(hV)
|
||||
if changeSetBlock, ok := index.Search(timestamp); ok {
|
||||
// Extract value from the changeSet
|
||||
csKey := dbutils.EncodeTimestamp(changeSetBlock)
|
||||
changeSetData, _ := csB.Get(csKey)
|
||||
if changeSetData != nil {
|
||||
return fmt.Errorf("could not find ChangeSet record for index entry %d (query timestamp %d)", changeSetBlock, timestamp)
|
||||
}
|
||||
data, err1 := changeset.AccountChangeSetBytes(changeSetData).FindLast(hK)
|
||||
if err1 != nil {
|
||||
return fmt.Errorf("could not find key %x in the ChangeSet record for index entry %d (query timestamp %d)",
|
||||
hK,
|
||||
changeSetBlock,
|
||||
timestamp,
|
||||
)
|
||||
}
|
||||
var acc accounts.Account
|
||||
if err2 := acc.DecodeForStorage(data); err2 != nil {
|
||||
return err2
|
||||
}
|
||||
if acc.Incarnation > 0 && acc.IsEmptyCodeHash() {
|
||||
codeBucket := tx.Bucket(dbutils.ContractCodeBucket)
|
||||
codeHash, _ := codeBucket.Get(dbutils.GenerateStoragePrefix(common.BytesToHash(hK), acc.Incarnation))
|
||||
if len(codeHash) > 0 {
|
||||
acc.CodeHash = common.BytesToHash(codeHash)
|
||||
}
|
||||
data = make([]byte, acc.EncodingLengthForStorage())
|
||||
acc.EncodeForStorage(data)
|
||||
}
|
||||
goOn, err = walker(hK, data)
|
||||
} else if cmp == 0 {
|
||||
goOn, err = walker(k, v)
|
||||
}
|
||||
}
|
||||
if goOn {
|
||||
if cmp <= 0 {
|
||||
k, v = mainCursor.Next()
|
||||
}
|
||||
if cmp >= 0 {
|
||||
hK, hV = historyCursor.Next()
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// splitCursor implements cursor with two keys
|
||||
// it is used to ignore incarnations in the middle
|
||||
// of composite storage key, but without
|
||||
// reconstructing the key
|
||||
// Instead, the key is split into two parts and
|
||||
// functions `Seek` and `Next` deliver both
|
||||
// parts as well as the corresponding value
|
||||
type splitCursor struct {
|
||||
c *bolt.Cursor // Unlerlying bolt cursor
|
||||
startkey []byte // Starting key (also contains bits that need to be preserved)
|
||||
matchBytes int
|
||||
mask uint8
|
||||
part1end int // Position in the key where the first part ends
|
||||
part2start int // Position in the key where the second part starts
|
||||
}
|
||||
|
||||
func newSplitCursor(b *bolt.Bucket, startkey []byte, matchBits uint, part1end, part2start int) *splitCursor {
|
||||
var sc splitCursor
|
||||
sc.c = b.Cursor()
|
||||
sc.startkey = startkey
|
||||
sc.part1end = part1end
|
||||
sc.part2start = part2start
|
||||
sc.matchBytes, sc.mask = Bytesmask(matchBits)
|
||||
return &sc
|
||||
}
|
||||
|
||||
func (sc *splitCursor) matchKey(k []byte) bool {
|
||||
if k == nil {
|
||||
return false
|
||||
}
|
||||
if sc.matchBytes == 0 {
|
||||
return true
|
||||
}
|
||||
if len(k) < sc.matchBytes {
|
||||
return false
|
||||
}
|
||||
if !bytes.Equal(k[:sc.matchBytes-1], sc.startkey[:sc.matchBytes-1]) {
|
||||
return false
|
||||
}
|
||||
return (k[sc.matchBytes-1] & sc.mask) == (sc.startkey[sc.matchBytes-1] & sc.mask)
|
||||
}
|
||||
|
||||
func (sc *splitCursor) Seek() (key1, key2, val []byte) {
|
||||
k, v := sc.c.Seek(sc.startkey)
|
||||
if !sc.matchKey(k) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
return k[:sc.part1end], k[sc.part2start:], v
|
||||
}
|
||||
|
||||
func (sc *splitCursor) Next() (key1, key2, val []byte) {
|
||||
k, v := sc.c.Next()
|
||||
if !sc.matchKey(k) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
return k[:sc.part1end], k[sc.part2start:], v
|
||||
}
|
||||
|
||||
func (db *BoltDatabase) walkAsOfThinStorage(startkey []byte, fixedbits uint, timestamp uint64, walker func(k1, k2, v []byte) (bool, error)) error {
|
||||
err := db.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(dbutils.StorageBucket)
|
||||
if b == nil {
|
||||
return fmt.Errorf("storageBucket not found")
|
||||
}
|
||||
hB := tx.Bucket(dbutils.StorageHistoryBucket)
|
||||
if hB == nil {
|
||||
return fmt.Errorf("storageHistoryBucket not found")
|
||||
}
|
||||
csB := tx.Bucket(dbutils.StorageChangeSetBucket)
|
||||
if csB == nil {
|
||||
return fmt.Errorf("storageChangeBucket not found")
|
||||
}
|
||||
//for storage
|
||||
mainCursor := newSplitCursor(
|
||||
b,
|
||||
startkey,
|
||||
fixedbits,
|
||||
common.HashLength, /* part1end */
|
||||
common.HashLength+common.IncarnationLength, /* part2start */
|
||||
)
|
||||
//for historic data
|
||||
historyCursor := newSplitCursor(
|
||||
hB,
|
||||
startkey,
|
||||
fixedbits,
|
||||
common.HashLength, /* part1end */
|
||||
common.HashLength+common.IncarnationLength, /* part2start */
|
||||
)
|
||||
addrHash, keyHash, v := mainCursor.Seek()
|
||||
hAddrHash, hKeyHash, hV := historyCursor.Seek()
|
||||
goOn := true
|
||||
var err error
|
||||
for goOn {
|
||||
var cmp int
|
||||
if keyHash == nil {
|
||||
if hKeyHash == nil {
|
||||
break
|
||||
} else {
|
||||
cmp = 1
|
||||
}
|
||||
} else if hKeyHash == nil {
|
||||
cmp = -1
|
||||
} else {
|
||||
cmp = bytes.Compare(keyHash, hKeyHash)
|
||||
}
|
||||
if cmp < 0 {
|
||||
goOn, err = walker(addrHash, keyHash, v)
|
||||
} else {
|
||||
index := dbutils.WrapHistoryIndex(hV)
|
||||
if changeSetBlock, ok := index.Search(timestamp); ok {
|
||||
// Extract value from the changeSet
|
||||
csKey := dbutils.EncodeTimestamp(changeSetBlock)
|
||||
changeSetData, _ := csB.Get(csKey)
|
||||
if changeSetData != nil {
|
||||
return fmt.Errorf("could not find ChangeSet record for index entry %d (query timestamp %d)", changeSetBlock, timestamp)
|
||||
}
|
||||
data, err1 := changeset.StorageChangeSetBytes(changeSetData).Find(hAddrHash, hKeyHash)
|
||||
if err1 != nil {
|
||||
return fmt.Errorf("could not find key %x%x in the ChangeSet record for index entry %d (query timestamp %d)",
|
||||
hAddrHash, hKeyHash,
|
||||
changeSetBlock,
|
||||
timestamp,
|
||||
)
|
||||
}
|
||||
goOn, err = walker(hAddrHash, hKeyHash, data)
|
||||
} else if cmp == 0 {
|
||||
goOn, err = walker(addrHash, keyHash, v)
|
||||
}
|
||||
}
|
||||
if goOn {
|
||||
if cmp <= 0 {
|
||||
addrHash, keyHash, v = mainCursor.Next()
|
||||
}
|
||||
if cmp >= 0 {
|
||||
hAddrHash, hKeyHash, hV = historyCursor.Next()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return err
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (db *BoltDatabase) WalkAsOf(bucket, hBucket, startkey []byte, fixedbits uint, timestamp uint64, walker func(k []byte, v []byte) (bool, error)) error {
|
||||
if debug.IsThinHistory() {
|
||||
return db.walkAsOfThin(bucket, hBucket, startkey, fixedbits, timestamp, walker)
|
||||
if bytes.Equal(bucket, dbutils.AccountsBucket) && bytes.Equal(hBucket, dbutils.AccountsHistoryBucket) {
|
||||
return db.walkAsOfThinAccounts(startkey, fixedbits, timestamp, walker)
|
||||
} else if bytes.Equal(bucket, dbutils.StorageBucket) && bytes.Equal(hBucket, dbutils.StorageHistoryBucket) {
|
||||
return db.walkAsOfThinStorage(startkey, fixedbits, timestamp, func(k1, k2, v []byte) (bool, error) {
|
||||
return walker(append(common.CopyBytes(k1), k2...), v)
|
||||
})
|
||||
}
|
||||
panic("Not implemented for arbitrary buckets")
|
||||
}
|
||||
|
||||
fixedbytes, mask := Bytesmask(fixedbits)
|
||||
@ -688,7 +936,7 @@ func BoltDBFindByHistory(tx *bolt.Tx, hBucket []byte, key []byte, timestamp uint
|
||||
case debug.IsThinHistory() && bytes.Equal(dbutils.AccountsHistoryBucket, hBucket):
|
||||
data, err = changeset.AccountChangeSetBytes(changeSetData).FindLast(key)
|
||||
case debug.IsThinHistory() && bytes.Equal(dbutils.StorageHistoryBucket, hBucket):
|
||||
data, err = changeset.StorageChangeSetBytes(changeSetData).Find(key)
|
||||
data, err = changeset.StorageChangeSetBytes(changeSetData).Find(key[:common.HashLength], key[common.HashLength+common.IncarnationLength:])
|
||||
default:
|
||||
data, err = changeset.FindLast(changeSetData, key)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user