[erigon2] Switch back to MDBX (#388)

* Switch back to MDBX

* Fix test

* No restore

* Fix overwrite

* Fix change file opening

* Verify state vs change files

* Add WriteMap

* Print

* Not to go to the end of the file when reading

* Fix rebuild

* prefixLen

* Print

* Print

* Print

* Fix for rebuild

* Remove prints

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
This commit is contained in:
ledgerwatch 2022-03-23 14:35:13 +00:00 committed by GitHub
parent d57ac60832
commit 4e8d577d1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 443 additions and 242 deletions

View File

@ -41,11 +41,11 @@ import (
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/google/btree"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/commitment"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/recsplit"
"github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32"
"github.com/ledgerwatch/log/v3"
@ -124,6 +124,21 @@ func (ft FileType) String() string {
}
}
func (ft FileType) Table() string {
switch ft {
case Account:
return kv.StateAccounts
case Storage:
return kv.StateStorage
case Code:
return kv.StateCode
case Commitment:
return kv.StateCommitment
default:
panic(fmt.Sprintf("unknown file type: %d", ft))
}
}
func ParseFileType(s string) (FileType, bool) {
switch s {
case "account":
@ -173,7 +188,6 @@ type Aggregator struct {
historyChannel chan struct{}
historyError chan error
historyWg sync.WaitGroup
trees [NumberOfStateTypes]*btree.BTree
fileHits, fileMisses uint64 // Counters for state file hit ratio
arches [NumberOfStateTypes][]uint32 // Over-arching hash tables containing the block number of last aggregation
archHasher murmur3.Hash128
@ -191,7 +205,6 @@ type ChangeFile struct {
wTx *bufio.Writer
r *bufio.Reader
rTx *bufio.Reader
sizeCounter uint64
txNum uint64 // Currently read transaction number
txRemaining uint64 // Remaining number of bytes to read in the current transaction
words []byte // Words pending for the next block record, in the same slice
@ -247,6 +260,12 @@ func (cf *ChangeFile) openFile(blockNum uint64, write bool) error {
if cf.fileTx, err = os.OpenFile(cf.pathTx, os.O_RDWR|os.O_CREATE, 0755); err != nil {
return err
}
if _, err = cf.file.Seek(0, 2 /* relative to the end of the file */); err != nil {
return err
}
if _, err = cf.fileTx.Seek(0, 2 /* relative to the end of the file */); err != nil {
return err
}
} else {
if cf.file, err = os.Open(cf.path); err != nil {
return err
@ -287,6 +306,7 @@ func (cf *ChangeFile) finish(txNum uint64) error {
var numBuf [10]byte
// Write out words
lastOffset := 0
var size uint64
for _, offset := range cf.wordOffsets {
word := cf.words[lastOffset:offset]
n := binary.PutUvarint(numBuf[:], uint64(len(word)))
@ -298,7 +318,7 @@ func (cf *ChangeFile) finish(txNum uint64) error {
return err
}
}
cf.sizeCounter += uint64(n + len(word))
size += uint64(n + len(word))
lastOffset = offset
}
cf.words = cf.words[:0]
@ -307,11 +327,10 @@ func (cf *ChangeFile) finish(txNum uint64) error {
if _, err := cf.wTx.Write(numBuf[:n]); err != nil {
return err
}
n = binary.PutUvarint(numBuf[:], cf.sizeCounter)
n = binary.PutUvarint(numBuf[:], size)
if _, err := cf.wTx.Write(numBuf[:n]); err != nil {
return err
}
cf.sizeCounter = 0
return nil
}
@ -597,7 +616,7 @@ func buildIndex(d *compress.Decompressor, idxPath, tmpDir string, count int) (*r
// aggregate gathers changes from the changefiles into a B-tree, and "removes" them from the database
// This function is time-critical because it needs to be run in the same go-routine (thread) as the general
// execution (due to read-write tx). After that, we can optimistically execute the rest in the background
func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, dbTree *btree.BTree, commitments bool) (*btree.BTree, error) {
func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, tx kv.RwTx, table string, commitments bool) (*btree.BTree, error) {
if err := c.openFiles(blockTo, false /* write */); err != nil {
return nil, fmt.Errorf("open files: %w", err)
}
@ -608,34 +627,42 @@ func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, dbTree *bt
}
// Clean up the DB table
var e error
var search AggregateItem
bt.Ascend(func(i btree.Item) bool {
item := i.(*AggregateItem)
if item.count == 0 {
return true
}
search.k = item.k
var prevV *AggregateItem
if prevVI := dbTree.Get(&search); prevVI != nil {
prevV = prevVI.(*AggregateItem)
dbPrefix := item.k
prevV, err := tx.GetOne(table, dbPrefix)
if err != nil {
e = err
return false
}
if prevV == nil {
e = fmt.Errorf("record not found in db tree for key %x", item.k)
e = fmt.Errorf("record not found in db for %s key %x", table, dbPrefix)
return false
}
if prevV.count < item.count {
e = fmt.Errorf("record count too low for key [%x] count %d, subtracting %d", item.k, prevV.count, item.count)
prevNum := binary.BigEndian.Uint32(prevV[:4])
if prevNum < item.count {
e = fmt.Errorf("record count too low for %s key %s count %d, subtracting %d", table, dbPrefix, prevNum, item.count)
return false
}
if prevV.count == item.count {
dbTree.Delete(prevV)
if prevNum == item.count {
if e = tx.Delete(table, dbPrefix, nil); e != nil {
return false
}
} else {
prevV.count -= item.count
v := make([]byte, len(prevV))
binary.BigEndian.PutUint32(v[:4], prevNum-item.count)
copy(v[4:], prevV[4:])
if e = tx.Put(table, dbPrefix, v); e != nil {
return false
}
}
return true
})
if e != nil {
return nil, fmt.Errorf("clean up after aggregation: %w", e)
return nil, fmt.Errorf("clean up table %s after aggregation: %w", table, e)
}
return bt, nil
}
@ -829,8 +856,9 @@ func (c *Changes) aggregateToBtree(bt *btree.BTree, prefixLen int, commitments b
}
//fmt.Printf("aggregateToBtree prefix [%x], [%x]+[%x]=>[%x]\n", commitment.CompactToHex(key), after, item.v, mergedVal)
item.v = mergedVal
} else {
item.v = common.Copy(after)
}
item.v = common.Copy(after)
item.count++
}
}
@ -956,7 +984,7 @@ func (a *Aggregator) scanStateFiles(files []fs.DirEntry) {
}
}
func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, changesets, commitments bool, minArch uint64) (*Aggregator, error) {
func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, changesets, commitments bool, minArch uint64, tx kv.RwTx) (*Aggregator, error) {
a := &Aggregator{
diffDir: diffDir,
unwindLimit: unwindLimit,
@ -977,9 +1005,6 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, c
for fType := FirstType; fType < NumberOfTypes; fType++ {
a.files[fType] = btree.New(32)
}
for fType := FirstType; fType < NumberOfStateTypes; fType++ {
a.trees[fType] = btree.New(32)
}
var closeStateFiles = true // It will be set to false in case of success at the end of the function
defer func() {
// Clean up all decompressor and indices upon error
@ -1074,7 +1099,7 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, c
return nil, err
}
}
if err = a.rebuildRecentState(); err != nil {
if err = a.rebuildRecentState(tx); err != nil {
return nil, fmt.Errorf("rebuilding recent state from change files: %w", err)
}
closeStateFiles = false
@ -1090,18 +1115,28 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, c
}
// rebuildRecentState reads change files and reconstructs the recent state
func (a *Aggregator) rebuildRecentState() error {
func (a *Aggregator) rebuildRecentState(tx kv.RwTx) error {
t := time.Now()
var err error
trees := map[FileType]*btree.BTree{}
a.changesBtree.Ascend(func(i btree.Item) bool {
item := i.(*ChangesItem)
for fType := FirstType; fType < NumberOfStateTypes; fType++ {
tree, ok := trees[fType]
if !ok {
tree = btree.New(32)
trees[fType] = tree
}
var changes Changes
changes.Init(fType.String(), a.aggregationStep, a.diffDir, false /* beforeOn */)
if err = changes.openFiles(item.startBlock, false /* write */); err != nil {
return false
}
if err = changes.aggregateToBtree(a.trees[fType], 0, fType == Commitment); err != nil {
var prefixLen int
if fType == Storage {
prefixLen = length.Addr
}
if err = changes.aggregateToBtree(tree, prefixLen, fType == Commitment); err != nil {
return false
}
if err = changes.closeFiles(); err != nil {
@ -1113,6 +1148,31 @@ func (a *Aggregator) rebuildRecentState() error {
if err != nil {
return err
}
for fType, tree := range trees {
table := fType.Table()
tree.Ascend(func(i btree.Item) bool {
item := i.(*AggregateItem)
if len(item.v) == 0 {
return true
}
var v []byte
if v, err = tx.GetOne(table, item.k); err != nil {
return false
}
if item.count != binary.BigEndian.Uint32(v[:4]) {
err = fmt.Errorf("mismatched count for %x: change file %d, db: %d", item.k, item.count, binary.BigEndian.Uint32(v[:4]))
return false
}
if !bytes.Equal(item.v, v[4:]) {
err = fmt.Errorf("mismatched v for %x: change file [%x], db: [%x]", item.k, item.v, v[4:])
return false
}
return true
})
}
if err != nil {
return err
}
log.Info("reconstructed recent state", "in", time.Since(t))
return nil
}
@ -1787,7 +1847,7 @@ func (a *Aggregator) closeFiles(fType FileType) {
func (a *Aggregator) Close() {
close(a.aggChannel)
a.aggWg.Wait() // Need to wait for the background aggregation to finish because itsends to merge channels
a.aggWg.Wait() // Need to wait for the background aggregation to finish because it sends to merge channels
// Drain channel before closing
select {
case <-a.mergeChannel:
@ -1952,77 +2012,90 @@ func (a *Aggregator) readByOffset(fType FileType, fileI int, offset uint64) ([]b
return key, val
}
func (a *Aggregator) MakeStateReader(blockNum uint64) *Reader {
func (a *Aggregator) MakeStateReader(blockNum uint64, tx kv.Tx) *Reader {
r := &Reader{
a: a,
blockNum: blockNum,
tx: tx,
}
return r
}
type Reader struct {
a *Aggregator
search AggregateItem
tx kv.Getter
blockNum uint64
}
func (r *Reader) ReadAccountData(addr []byte, trace bool) []byte {
// Look in the summary table first
r.search.k = addr
if vi := r.a.trees[Account].Get(&r.search); vi != nil {
return vi.(*AggregateItem).v
func (r *Reader) ReadAccountData(addr []byte, trace bool) ([]byte, error) {
v, err := r.tx.GetOne(kv.StateAccounts, addr)
if err != nil {
return nil, err
}
val, _ := r.a.readFromFiles(Account, true /* lock */, r.blockNum, addr, trace)
return val
if v != nil {
return v[4:], nil
}
v, _ = r.a.readFromFiles(Account, true /* lock */, r.blockNum, addr, trace)
return v, nil
}
func (r *Reader) ReadAccountStorage(addr []byte, loc []byte, trace bool) *uint256.Int {
func (r *Reader) ReadAccountStorage(addr []byte, loc []byte, trace bool) ([]byte, error) {
// Look in the summary table first
dbkey := make([]byte, len(addr)+len(loc))
copy(dbkey[0:], addr)
copy(dbkey[len(addr):], loc)
r.search.k = dbkey
var v []byte
if vi := r.a.trees[Storage].Get(&r.search); vi != nil {
v = vi.(*AggregateItem).v
} else {
v, _ = r.a.readFromFiles(Storage, true /* lock */, r.blockNum, dbkey, trace)
v, err := r.tx.GetOne(kv.StateStorage, dbkey)
if err != nil {
return nil, err
}
if v != nil {
return new(uint256.Int).SetBytes(v)
if len(v) == 4 {
return nil, nil
}
return v[4:], nil
}
return nil
v, _ = r.a.readFromFiles(Storage, true /* lock */, r.blockNum, dbkey, trace)
return v, nil
}
func (r *Reader) ReadAccountCode(addr []byte, trace bool) []byte {
func (r *Reader) ReadAccountCode(addr []byte, trace bool) ([]byte, error) {
// Look in the summary table first
r.search.k = addr
if vi := r.a.trees[Code].Get(&r.search); vi != nil {
return vi.(*AggregateItem).v
v, err := r.tx.GetOne(kv.StateCode, addr)
if err != nil {
return nil, err
}
if v != nil {
if len(v) == 4 {
return nil, nil
}
return v[4:], nil
}
// Look in the files
val, _ := r.a.readFromFiles(Code, true /* lock */, r.blockNum, addr, trace)
return val
v, _ = r.a.readFromFiles(Code, true /* lock */, r.blockNum, addr, trace)
return v, nil
}
func (r *Reader) ReadAccountCodeSize(addr []byte, trace bool) int {
func (r *Reader) ReadAccountCodeSize(addr []byte, trace bool) (int, error) {
// Look in the summary table first
r.search.k = addr
if vi := r.a.trees[Code].Get(&r.search); vi != nil {
return len(vi.(*AggregateItem).v)
v, err := r.tx.GetOne(kv.StateCode, addr)
if err != nil {
return 0, err
}
if v != nil {
return len(v) - 4, nil
}
// Look in the files. TODO - use specialised function to only lookup size
val, _ := r.a.readFromFiles(Code, true /* lock */, r.blockNum, addr, trace)
return len(val)
v, _ = r.a.readFromFiles(Code, true /* lock */, r.blockNum, addr, trace)
return len(v), nil
}
type Writer struct {
a *Aggregator
search AggregateItem // Aggregate item used to search in trees
blockNum uint64
changeFileNum uint64 // Block number associated with the current change files. It is the last block number whose changes will go into that file
changes [NumberOfStateTypes]Changes
commTree *btree.BTree // BTree used for gathering commitment data
tx kv.RwTx
}
func (a *Aggregator) MakeStateWriter(beforeOn bool) *Writer {
@ -2046,7 +2119,8 @@ func (w *Writer) Close() {
}
}
func (w *Writer) Reset(blockNum uint64) error {
func (w *Writer) Reset(blockNum uint64, tx kv.RwTx) error {
w.tx = tx
w.blockNum = blockNum
typesLimit := Commitment
if w.a.commitments {
@ -2083,16 +2157,18 @@ func (i *CommitmentItem) Less(than btree.Item) bool {
return bytes.Compare(i.hashedKey, than.(*CommitmentItem).hashedKey) < 0
}
func (w *Writer) branchFn(prefix []byte) []byte {
func (w *Writer) branchFn(prefix []byte) ([]byte, error) {
for lockFType := FirstType; lockFType < NumberOfStateTypes; lockFType++ {
w.a.fileLocks[lockFType].RLock()
defer w.a.fileLocks[lockFType].RUnlock()
}
var mergedVal []byte
// Look in the summary table first
w.search.k = prefix
if vi := w.a.trees[Commitment].Get(&w.search); vi != nil {
mergedVal = vi.(*AggregateItem).v
mergedVal, err := w.tx.GetOne(kv.StateCommitment, prefix)
if err != nil {
return nil, err
}
if mergedVal != nil {
mergedVal = mergedVal[4:]
}
// Look in the files and merge, while it becomes complete
var startBlock = w.blockNum + 1
@ -2104,7 +2180,7 @@ func (w *Writer) branchFn(prefix []byte) []byte {
val, startBlock = w.a.readFromFiles(Commitment, false /* lock */, startBlock-1, prefix, false /* trace */)
if val == nil {
if mergedVal == nil {
return nil
return nil, nil
}
panic(fmt.Sprintf("Incomplete branch data prefix [%x], mergeVal=[%x], startBlock=%d\n", commitment.CompactToHex(prefix), mergedVal, startBlock))
}
@ -2113,15 +2189,15 @@ func (w *Writer) branchFn(prefix []byte) []byte {
if mergedVal == nil {
mergedVal = val
} else if mergedVal, err = commitment.MergeBranches(val, mergedVal, nil); err != nil {
panic(err)
return nil, err
}
//fmt.Printf("Post-merge prefix [%x] [%x], startBlock %d\n", commitment.CompactToHex(prefix), mergedVal, startBlock)
}
if mergedVal == nil {
return nil
return nil, nil
}
//fmt.Printf("Returning branch data prefix [%x], mergeVal=[%x], startBlock=%d\n", commitment.CompactToHex(prefix), mergedVal, startBlock)
return mergedVal[2:] // Skip touchMap but keep afterMap
return mergedVal[2:], nil // Skip touchMap but keep afterMap
}
func bytesToUint64(buf []byte) (x uint64) {
@ -2134,12 +2210,14 @@ func bytesToUint64(buf []byte) (x uint64) {
return
}
func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) []byte {
var enc []byte
func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) error {
// Look in the summary table first
w.search.k = plainKey
if encI := w.a.trees[Account].Get(&w.search); encI != nil {
enc = encI.(*AggregateItem).v
enc, err := w.tx.GetOne(kv.StateAccounts, plainKey)
if err != nil {
return err
}
if enc != nil {
enc = enc[4:]
} else {
// Look in the files
enc, _ = w.a.readFromFiles(Account, true /* lock */, w.blockNum, plainKey, false /* trace */)
@ -2162,9 +2240,12 @@ func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) []byte {
cell.Balance.SetBytes(enc[pos : pos+balanceBytes])
}
}
w.search.k = plainKey
if encI := w.a.trees[Code].Get(&w.search); encI != nil {
enc = encI.(*AggregateItem).v
enc, err = w.tx.GetOne(kv.StateCode, plainKey)
if err != nil {
return err
}
if enc != nil {
enc = enc[4:]
} else {
// Look in the files
enc, _ = w.a.readFromFiles(Code, true /* lock */, w.blockNum, plainKey, false /* trace */)
@ -2174,22 +2255,24 @@ func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) []byte {
w.a.keccak.Write(enc)
w.a.keccak.(io.Reader).Read(cell.CodeHash[:])
}
return plainKey
return nil
}
func (w *Writer) storageFn(plainKey []byte, cell *commitment.Cell) []byte {
var enc []byte
func (w *Writer) storageFn(plainKey []byte, cell *commitment.Cell) error {
// Look in the summary table first
w.search.k = plainKey
if encI := w.a.trees[Storage].Get(&w.search); encI != nil {
enc = encI.(*AggregateItem).v
enc, err := w.tx.GetOne(kv.StateStorage, plainKey)
if err != nil {
return err
}
if enc != nil {
enc = enc[4:]
} else {
// Look in the files
enc, _ = w.a.readFromFiles(Storage, true /* lock */, w.blockNum, plainKey, false /* trace */)
}
cell.StorageLen = len(enc)
copy(cell.Storage[:], enc)
return plainKey
return nil
}
func (w *Writer) captureCommitmentType(fType FileType, trace bool, f func(commTree *btree.BTree, h hash.Hash, key, val []byte)) {
@ -2340,17 +2423,20 @@ func (w *Writer) computeCommitment(trace bool) ([]byte, error) {
continue
}
prefix := []byte(prefixStr)
w.search.k = prefix
var prevV *AggregateItem
if prevVI := w.a.trees[Commitment].Get(&w.search); prevVI != nil {
prevV = prevVI.(*AggregateItem)
var prevV []byte
var prevNum uint32
if prevV, err = w.tx.GetOne(kv.StateCommitment, prefix); err != nil {
return nil, err
}
if prevV != nil {
prevNum = binary.BigEndian.Uint32(prevV[:4])
}
var original []byte
if prevV == nil {
original, _ = w.a.readFromFiles(Commitment, true /* lock */, w.blockNum, prefix, false)
} else {
original = prevV.v
original = prevV[4:]
}
if original != nil {
var mergedVal []byte
@ -2362,11 +2448,11 @@ func (w *Writer) computeCommitment(trace bool) ([]byte, error) {
}
}
//fmt.Printf("computeCommitment set [%x] [%x]\n", commitment.CompactToHex(prefix), branchNodeUpdate)
if prevV == nil {
w.a.trees[Commitment].ReplaceOrInsert(&AggregateItem{k: prefix, v: branchNodeUpdate, count: 1})
} else {
prevV.v = branchNodeUpdate
prevV.count++
v := make([]byte, 4+len(branchNodeUpdate))
binary.BigEndian.PutUint32(v[:4], prevNum+1)
copy(v[4:], branchNodeUpdate)
if err = w.tx.Put(kv.StateCommitment, prefix, v); err != nil {
return nil, err
}
if len(branchNodeUpdate) == 0 {
w.changes[Commitment].delete(prefix, original)
@ -2430,27 +2516,30 @@ func (w *Writer) Aggregate(trace bool) error {
return nil
}
func (w *Writer) UpdateAccountData(addr []byte, account []byte, trace bool) {
var prevV *AggregateItem
w.search.k = addr
if prevVI := w.a.trees[Account].Get(&w.search); prevVI != nil {
prevV = prevVI.(*AggregateItem)
func (w *Writer) UpdateAccountData(addr []byte, account []byte, trace bool) error {
var prevNum uint32
prevV, err := w.tx.GetOne(kv.StateAccounts, addr)
if err != nil {
return err
}
if prevV != nil {
prevNum = binary.BigEndian.Uint32(prevV[:4])
}
var original []byte
if prevV == nil {
original, _ = w.a.readFromFiles(Account, true /* lock */, w.blockNum, addr, trace)
} else {
original = prevV.v
original = prevV[4:]
}
if bytes.Equal(account, original) {
// No change
return
return nil
}
if prevV == nil {
w.a.trees[Account].ReplaceOrInsert(&AggregateItem{k: addr, v: account, count: 1})
} else {
prevV.v = account
prevV.count++
v := make([]byte, 4+len(account))
binary.BigEndian.PutUint32(v[:4], prevNum+1)
copy(v[4:], account)
if err = w.tx.Put(kv.StateAccounts, addr, v); err != nil {
return err
}
if prevV == nil && len(original) == 0 {
w.changes[Account].insert(addr, account)
@ -2461,25 +2550,29 @@ func (w *Writer) UpdateAccountData(addr []byte, account []byte, trace bool) {
w.a.trace = true
w.a.tracedKeys[string(addr)] = struct{}{}
}
return nil
}
func (w *Writer) UpdateAccountCode(addr []byte, code []byte, trace bool) {
var prevV *AggregateItem
w.search.k = addr
if prevVI := w.a.trees[Code].Get(&w.search); prevVI != nil {
prevV = prevVI.(*AggregateItem)
func (w *Writer) UpdateAccountCode(addr []byte, code []byte, trace bool) error {
var prevNum uint32
prevV, err := w.tx.GetOne(kv.StateCode, addr)
if err != nil {
return err
}
if prevV != nil {
prevNum = binary.BigEndian.Uint32(prevV[:4])
}
var original []byte
if prevV == nil {
original, _ = w.a.readFromFiles(Code, true /* lock */, w.blockNum, addr, trace)
} else {
original = prevV.v
original = prevV[4:]
}
if prevV == nil {
w.a.trees[Code].ReplaceOrInsert(&AggregateItem{k: addr, v: code, count: 1})
} else {
prevV.v = code
prevV.count++
v := make([]byte, 4+len(code))
binary.BigEndian.PutUint32(v[:4], prevNum+1)
copy(v[4:], code)
if err = w.tx.Put(kv.StateCode, addr, v); err != nil {
return err
}
if prevV == nil && len(original) == 0 {
w.changes[Code].insert(addr, code)
@ -2490,12 +2583,14 @@ func (w *Writer) UpdateAccountCode(addr []byte, code []byte, trace bool) {
w.a.trace = true
w.a.tracedKeys[string(addr)] = struct{}{}
}
return nil
}
type CursorType uint8
const (
FILE_CURSOR CursorType = iota
DB_CURSOR
TREE_CURSOR
)
@ -2507,6 +2602,7 @@ type CursorItem struct {
key, val []byte
dg *compress.Getter
tree *btree.BTree
c kv.Cursor
}
type CursorHeap []*CursorItem
@ -2540,59 +2636,68 @@ func (ch *CursorHeap) Pop() interface{} {
return x
}
func (w *Writer) deleteAccount(addr []byte, trace bool) bool {
var prevV *AggregateItem
w.search.k = addr
if prevVI := w.a.trees[Account].Get(&w.search); prevVI != nil {
prevV = prevVI.(*AggregateItem)
func (w *Writer) deleteAccount(addr []byte, trace bool) (bool, error) {
prevV, err := w.tx.GetOne(kv.StateAccounts, addr)
if err != nil {
return false, err
}
var prevNum uint32
if prevV != nil {
prevNum = binary.BigEndian.Uint32(prevV[:4])
}
var original []byte
if prevV == nil {
original, _ = w.a.readFromFiles(Account, true /* lock */, w.blockNum, addr, trace)
if original == nil {
return false
return false, nil
}
} else {
original = prevV.v
original = prevV[4:]
}
if prevV == nil {
w.a.trees[Account].ReplaceOrInsert(&AggregateItem{k: addr, v: nil, count: 1})
} else {
prevV.v = nil
prevV.count++
v := make([]byte, 4)
binary.BigEndian.PutUint32(v[:4], prevNum+1)
if err = w.tx.Put(kv.StateAccounts, addr, v); err != nil {
return false, err
}
w.changes[Account].delete(addr, original)
return true
return true, nil
}
func (w *Writer) deleteCode(addr []byte, trace bool) {
var prevV *AggregateItem
w.search.k = addr
if prevVI := w.a.trees[Code].Get(&w.search); prevVI != nil {
prevV = prevVI.(*AggregateItem)
func (w *Writer) deleteCode(addr []byte, trace bool) error {
prevV, err := w.tx.GetOne(kv.StateCode, addr)
if err != nil {
return err
}
var prevNum uint32
if prevV != nil {
prevNum = binary.BigEndian.Uint32(prevV[:4])
}
var original []byte
if prevV == nil {
original, _ = w.a.readFromFiles(Code, true /* lock */, w.blockNum, addr, trace)
if original == nil {
// Nothing to do
return
return nil
}
} else {
original = prevV.v
original = prevV[4:]
}
if prevV == nil {
w.a.trees[Code].ReplaceOrInsert(&AggregateItem{k: addr, v: nil, count: 1})
} else {
prevV.v = nil
prevV.count++
v := make([]byte, 4)
binary.BigEndian.PutUint32(v[:4], prevNum+1)
if err = w.tx.Put(kv.StateCode, addr, v); err != nil {
return err
}
w.changes[Code].delete(addr, original)
return nil
}
func (w *Writer) DeleteAccount(addr []byte, trace bool) {
if deleted := w.deleteAccount(addr, trace); !deleted {
return
func (w *Writer) DeleteAccount(addr []byte, trace bool) error {
deleted, err := w.deleteAccount(addr, trace)
if err != nil {
return err
}
if !deleted {
return nil
}
w.a.fileLocks[Storage].RLock()
defer w.a.fileLocks[Storage].RUnlock()
@ -2600,20 +2705,16 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) {
// Find all storage items for this address
var cp CursorHeap
heap.Init(&cp)
w.search.k = addr
foundInTree := false
var c kv.Cursor
if c, err = w.tx.Cursor(kv.StateStorage); err != nil {
return err
}
var k, v []byte
w.a.trees[Storage].AscendGreaterOrEqual(&w.search, func(i btree.Item) bool {
item := i.(*AggregateItem)
if bytes.HasPrefix(item.k, addr) {
foundInTree = true
k = item.k
v = item.v
}
return false
})
if foundInTree {
heap.Push(&cp, &CursorItem{t: TREE_CURSOR, key: common.Copy(k), val: common.Copy(v), tree: w.a.trees[Storage], endBlock: w.blockNum})
if k, v, err = c.Seek(addr); err != nil {
return err
}
if k != nil && bytes.HasPrefix(k, addr) {
heap.Push(&cp, &CursorItem{t: DB_CURSOR, key: common.Copy(k), val: common.Copy(v), c: c, endBlock: w.blockNum})
}
w.a.files[Storage].Ascend(func(i btree.Item) bool {
item := i.(*byEndBlockItem)
@ -2672,6 +2773,18 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) {
} else {
heap.Pop(&cp)
}
case DB_CURSOR:
k, v, err = ci1.c.Next()
if err != nil {
return err
}
if k != nil && bytes.HasPrefix(k, addr) {
ci1.key = common.Copy(k)
ci1.val = common.Copy(v)
heap.Fix(&cp, 0)
} else {
heap.Pop(&cp)
}
case TREE_CURSOR:
skip := true
var aitem *AggregateItem
@ -2692,16 +2805,19 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) {
}
}
}
var prevV *AggregateItem
w.search.k = lastKey
if prevVI := w.a.trees[Storage].Get(&w.search); prevVI != nil {
prevV = prevVI.(*AggregateItem)
var prevV []byte
prevV, err = w.tx.GetOne(kv.StateStorage, lastKey)
if err != nil {
return err
}
if prevV == nil {
w.a.trees[Storage].ReplaceOrInsert(&AggregateItem{k: lastKey, v: nil, count: 1})
} else {
prevV.v = nil
prevV.count++
var prevNum uint32
if prevV != nil {
prevNum = binary.BigEndian.Uint32(prevV[:4])
}
v = make([]byte, 4)
binary.BigEndian.PutUint32(v[:4], prevNum+1)
if err = w.tx.Put(kv.StateStorage, lastKey, v); err != nil {
return err
}
w.changes[Storage].delete(lastKey, lastVal)
}
@ -2709,45 +2825,47 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) {
w.a.trace = true
w.a.tracedKeys[string(addr)] = struct{}{}
}
return nil
}
func (w *Writer) WriteAccountStorage(addr, loc []byte, value *uint256.Int, trace bool) {
func (w *Writer) WriteAccountStorage(addr, loc []byte, value []byte, trace bool) error {
dbkey := make([]byte, len(addr)+len(loc))
copy(dbkey[0:], addr)
copy(dbkey[len(addr):], loc)
w.search.k = dbkey
var prevV *AggregateItem
if prevVI := w.a.trees[Storage].Get(&w.search); prevVI != nil {
prevV = prevVI.(*AggregateItem)
prevV, err := w.tx.GetOne(kv.StateStorage, dbkey)
if err != nil {
return err
}
var prevNum uint32
if prevV != nil {
prevNum = binary.BigEndian.Uint32(prevV[:4])
}
var original []byte
if prevV == nil {
original, _ = w.a.readFromFiles(Storage, true /* lock */, w.blockNum, dbkey, trace)
} else {
original = prevV.v
original = prevV[4:]
}
vLen := value.ByteLen()
v := make([]byte, vLen)
value.WriteToSlice(v)
if bytes.Equal(v, original) {
if bytes.Equal(value, original) {
// No change
return
return nil
}
if prevV == nil {
w.a.trees[Storage].ReplaceOrInsert(&AggregateItem{k: dbkey, v: v, count: 1})
} else {
prevV.v = v
prevV.count++
v := make([]byte, 4+len(value))
binary.BigEndian.PutUint32(v[:4], prevNum+1)
copy(v[4:], value)
if err = w.tx.Put(kv.StateStorage, dbkey, v); err != nil {
return err
}
if prevV == nil && len(original) == 0 {
w.changes[Storage].insert(dbkey, v)
w.changes[Storage].insert(dbkey, value)
} else {
w.changes[Storage].update(dbkey, original, v)
w.changes[Storage].update(dbkey, original, value)
}
if trace {
w.a.trace = true
w.a.tracedKeys[string(dbkey)] = struct{}{}
}
return nil
}
// findLargestMerge looks through the state files of the speficied type and determines the largest merge that can be undertaken
@ -2890,7 +3008,7 @@ func (w *Writer) aggregateUpto(blockFrom, blockTo uint64) error {
if fType == Storage {
prefixLen = length.Addr
}
if aggTask.bt[fType], err = aggTask.changes[fType].aggregate(blockFrom, blockTo, prefixLen, w.a.trees[fType], fType == Commitment); err != nil {
if aggTask.bt[fType], err = aggTask.changes[fType].aggregate(blockFrom, blockTo, prefixLen, w.tx, fType.Table(), fType == Commitment); err != nil {
return fmt.Errorf("aggregate %sChanges: %w", fType.String(), err)
}
}

View File

@ -18,10 +18,13 @@ package aggregator
import (
"bytes"
"context"
"encoding/binary"
"testing"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
)
func int160(i uint64) []byte {
@ -67,14 +70,22 @@ func accountWithBalance(i uint64) []byte {
}
func TestSimpleAggregator(t *testing.T) {
db := memdb.New()
defer db.Close()
var rwTx kv.RwTx
var err error
if rwTx, err = db.BeginRw(context.Background()); err != nil {
t.Fatal(err)
}
defer rwTx.Rollback()
tmpDir := t.TempDir()
a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000)
a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000, rwTx)
if err != nil {
t.Fatal(err)
}
defer a.Close()
w := a.MakeStateWriter(true /* beforeOn */)
if err = w.Reset(0); err != nil {
if err = w.Reset(0, rwTx); err != nil {
t.Fatal(err)
}
defer w.Close()
@ -86,17 +97,30 @@ func TestSimpleAggregator(t *testing.T) {
if err = w.Aggregate(false /* trace */); err != nil {
t.Fatal(err)
}
r := a.MakeStateReader(2)
acc := r.ReadAccountData(int160(1), false /* trace */)
r := a.MakeStateReader(2, rwTx)
acc, err := r.ReadAccountData(int160(1), false /* trace */)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(acc, account1) {
t.Errorf("read account %x, expected account %x", acc, account1)
}
a.Close()
if err = rwTx.Commit(); err != nil {
t.Fatal(err)
}
}
func TestLoopAggregator(t *testing.T) {
db := memdb.New()
defer db.Close()
var rwTx kv.RwTx
var err error
if rwTx, err = db.BeginRw(context.Background()); err != nil {
t.Fatal(err)
}
defer rwTx.Rollback()
tmpDir := t.TempDir()
a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000)
a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000, rwTx)
if err != nil {
t.Fatal(err)
}
@ -107,7 +131,7 @@ func TestLoopAggregator(t *testing.T) {
for blockNum := uint64(0); blockNum < 1000; blockNum++ {
accountKey := int160(blockNum/10 + 1)
//fmt.Printf("blockNum = %d\n", blockNum)
if err = w.Reset(blockNum); err != nil {
if err = w.Reset(blockNum, rwTx); err != nil {
t.Fatal(err)
}
w.UpdateAccountData(accountKey, account1, false /* trace */)
@ -117,18 +141,32 @@ func TestLoopAggregator(t *testing.T) {
if err = w.Aggregate(false /* trace */); err != nil {
t.Fatal(err)
}
r := a.MakeStateReader(blockNum + 1)
acc := r.ReadAccountData(accountKey, false /* trace */)
r := a.MakeStateReader(blockNum+1, rwTx)
acc, err := r.ReadAccountData(accountKey, false /* trace */)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(acc, account1) {
t.Errorf("read account %x, expected account %x for block %d", acc, account1, blockNum)
}
account1 = accountWithBalance(blockNum + 2)
}
if err = rwTx.Commit(); err != nil {
t.Fatal(err)
}
}
func TestRecreateAccountWithStorage(t *testing.T) {
db := memdb.New()
defer db.Close()
var rwTx kv.RwTx
var err error
if rwTx, err = db.BeginRw(context.Background()); err != nil {
t.Fatal(err)
}
defer rwTx.Rollback()
tmpDir := t.TempDir()
a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000)
a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000, rwTx)
if err != nil {
t.Fatal(err)
}
@ -139,21 +177,21 @@ func TestRecreateAccountWithStorage(t *testing.T) {
w := a.MakeStateWriter(true /* beforeOn */)
defer w.Close()
for blockNum := uint64(0); blockNum < 100; blockNum++ {
if err = w.Reset(blockNum); err != nil {
if err = w.Reset(blockNum, rwTx); err != nil {
t.Fatal(err)
}
switch blockNum {
case 1:
w.UpdateAccountData(accountKey, account1, false /* trace */)
for s := uint64(0); s < 100; s++ {
w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(s+1), false /* trace */)
w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(s+1).Bytes(), false /* trace */)
}
case 22:
w.DeleteAccount(accountKey, false /* trace */)
case 45:
w.UpdateAccountData(accountKey, account2, false /* trace */)
for s := uint64(50); s < 150; s++ {
w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(2*s+1), false /* trace */)
w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(2*s+1).Bytes(), false /* trace */)
}
}
if err = w.FinishTx(blockNum, false /* trace */); err != nil {
@ -162,52 +200,81 @@ func TestRecreateAccountWithStorage(t *testing.T) {
if err = w.Aggregate(false /* trace */); err != nil {
t.Fatal(err)
}
r := a.MakeStateReader(blockNum + 1)
r := a.MakeStateReader(blockNum+1, rwTx)
switch blockNum {
case 1:
acc := r.ReadAccountData(accountKey, false /* trace */)
acc, err := r.ReadAccountData(accountKey, false /* trace */)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(account1, acc) {
t.Errorf("wrong account after block %d, expected %x, got %x", blockNum, account1, acc)
}
for s := uint64(0); s < 100; s++ {
v := r.ReadAccountStorage(accountKey, int256(s), false /* trace */)
if !uint256.NewInt(s + 1).Eq(v) {
t.Errorf("wrong storage value after block %d, expected %d, got %s", blockNum, s+1, v)
v, err := r.ReadAccountStorage(accountKey, int256(s), false /* trace */)
if err != nil {
t.Fatal(err)
}
if !uint256.NewInt(s + 1).Eq(uint256.NewInt(0).SetBytes(v)) {
t.Errorf("wrong storage value after block %d, expected %d, got %d", blockNum, s+1, uint256.NewInt(0).SetBytes(v))
}
}
case 22, 44:
acc := r.ReadAccountData(accountKey, false /* trace */)
acc, err := r.ReadAccountData(accountKey, false /* trace */)
if err != nil {
t.Fatal(err)
}
if len(acc) > 0 {
t.Errorf("wrong account after block %d, expected nil, got %x", blockNum, acc)
}
for s := uint64(0); s < 100; s++ {
v := r.ReadAccountStorage(accountKey, int256(s), false /* trace */)
v, err := r.ReadAccountStorage(accountKey, int256(s), false /* trace */)
if err != nil {
t.Fatal(err)
}
if v != nil {
t.Errorf("wrong storage value after block %d, expected nil, got %s", blockNum, v)
t.Errorf("wrong storage value after block %d, expected nil, got %d", blockNum, uint256.NewInt(0).SetBytes(v))
}
}
case 66:
acc := r.ReadAccountData(accountKey, false /* trace */)
acc, err := r.ReadAccountData(accountKey, false /* trace */)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(account2, acc) {
t.Errorf("wrong account after block %d, expected %x, got %x", blockNum, account1, acc)
}
for s := uint64(0); s < 150; s++ {
v := r.ReadAccountStorage(accountKey, int256(s), false /* trace */)
v, err := r.ReadAccountStorage(accountKey, int256(s), false /* trace */)
if err != nil {
t.Fatal(err)
}
if s < 50 {
if v != nil {
t.Errorf("wrong storage value after block %d, expected nil, got %s", blockNum, v)
t.Errorf("wrong storage value after block %d, expected nil, got %d", blockNum, uint256.NewInt(0).SetBytes(v))
}
} else if v == nil || !uint256.NewInt(2*s+1).Eq(v) {
t.Errorf("wrong storage value after block %d, expected %d, got %s", blockNum, 2*s+1, v)
} else if v == nil || !uint256.NewInt(2*s+1).Eq(uint256.NewInt(0).SetBytes(v)) {
t.Errorf("wrong storage value after block %d, expected %d, got %d", blockNum, 2*s+1, uint256.NewInt(0).SetBytes(v))
}
}
}
}
if err = rwTx.Commit(); err != nil {
t.Fatal(err)
}
}
func TestChangeCode(t *testing.T) {
db := memdb.New()
defer db.Close()
var rwTx kv.RwTx
var err error
if rwTx, err = db.BeginRw(context.Background()); err != nil {
t.Fatal(err)
}
defer rwTx.Rollback()
tmpDir := t.TempDir()
a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000)
a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000, rwTx)
if err != nil {
t.Fatal(err)
}
@ -218,7 +285,7 @@ func TestChangeCode(t *testing.T) {
w := a.MakeStateWriter(true /* beforeOn */)
defer w.Close()
for blockNum := uint64(0); blockNum < 100; blockNum++ {
if err = w.Reset(blockNum); err != nil {
if err = w.Reset(blockNum, rwTx); err != nil {
t.Fatal(err)
}
switch blockNum {
@ -234,22 +301,34 @@ func TestChangeCode(t *testing.T) {
if err = w.Aggregate(false /* trace */); err != nil {
t.Fatal(err)
}
r := a.MakeStateReader(blockNum + 1)
r := a.MakeStateReader(blockNum+1, rwTx)
switch blockNum {
case 22:
acc := r.ReadAccountData(accountKey, false /* trace */)
acc, err := r.ReadAccountData(accountKey, false /* trace */)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(account1, acc) {
t.Errorf("wrong account after block %d, expected %x, got %x", blockNum, account1, acc)
}
code := r.ReadAccountCode(accountKey, false /* trace */)
code, err := r.ReadAccountCode(accountKey, false /* trace */)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(code1, code) {
t.Errorf("wrong code after block %d, expected %x, got %x", blockNum, code1, code)
}
case 47:
code := r.ReadAccountCode(accountKey, false /* trace */)
code, err := r.ReadAccountCode(accountKey, false /* trace */)
if err != nil {
t.Fatal(err)
}
if code != nil {
t.Errorf("wrong code after block %d, expected nil, got %x", blockNum, code)
}
}
}
if err = rwTx.Commit(); err != nil {
t.Fatal(err)
}
}

View File

@ -78,11 +78,11 @@ type HexPatriciaHashed struct {
// Function used to load branch node and fill up the cells
// For each cell, it sets the cell type, clears the modified flag, fills the hash,
// and for the extension, account, and leaf type, the `l` and `k`
branchFn func(prefix []byte) []byte
branchFn func(prefix []byte) ([]byte, error)
// Function used to fetch account with given plain key
accountFn func(plainKey []byte, cell *Cell) []byte
accountFn func(plainKey []byte, cell *Cell) error
// Function used to fetch account with given plain key
storageFn func(plainKey []byte, cell *Cell) []byte
storageFn func(plainKey []byte, cell *Cell) error
keccak keccakState
keccak2 keccakState
accountKeyLen int
@ -92,9 +92,9 @@ type HexPatriciaHashed struct {
}
func NewHexPatriciaHashed(accountKeyLen int,
branchFn func(prefix []byte) []byte,
accountFn func(plainKey []byte, cell *Cell) []byte,
storageFn func(plainKey []byte, cell *Cell) []byte,
branchFn func(prefix []byte) ([]byte, error),
accountFn func(plainKey []byte, cell *Cell) error,
storageFn func(plainKey []byte, cell *Cell) error,
) *HexPatriciaHashed {
return &HexPatriciaHashed{
keccak: sha3.NewLegacyKeccak256().(keccakState),
@ -826,9 +826,9 @@ func (hph *HexPatriciaHashed) Reset() {
}
func (hph *HexPatriciaHashed) ResetFns(
branchFn func(prefix []byte) []byte,
accountFn func(plainKey []byte, cell *Cell) []byte,
storageFn func(plainKey []byte, cell *Cell) []byte,
branchFn func(prefix []byte) ([]byte, error),
accountFn func(plainKey []byte, cell *Cell) error,
storageFn func(plainKey []byte, cell *Cell) error,
) {
hph.branchFn = branchFn
hph.accountFn = accountFn
@ -883,7 +883,10 @@ func (hph *HexPatriciaHashed) needUnfolding(hashedKey []byte) int {
}
func (hph *HexPatriciaHashed) unfoldBranchNode(row int, deleted bool, depth int) error {
branchData := hph.branchFn(hexToCompact(hph.currentKey[:hph.currentKeyLen]))
branchData, err := hph.branchFn(hexToCompact(hph.currentKey[:hph.currentKeyLen]))
if err != nil {
return err
}
if !hph.rootChecked && hph.currentKeyLen == 0 && len(branchData) == 0 {
// Special case - empty or deleted root
hph.rootChecked = true
@ -916,17 +919,13 @@ func (hph *HexPatriciaHashed) unfoldBranchNode(row int, deleted bool, depth int)
fmt.Printf("cell (%d, %x) depth=%d, hash=[%x], a=[%x], s=[%x], ex=[%x]\n", row, nibble, depth, cell.h[:cell.hl], cell.apk[:cell.apl], cell.spk[:cell.spl], cell.extension[:cell.extLen])
}
if cell.apl > 0 {
k := hph.accountFn(cell.apk[:cell.apl], cell)
cell.apl = len(k)
copy(cell.apk[:], k)
hph.accountFn(cell.apk[:cell.apl], cell)
if hph.trace {
fmt.Printf("accountFn[%x] return balance=%d, nonce=%d\n", cell.apk[:cell.apl], &cell.Balance, cell.Nonce)
}
}
if cell.spl > 0 {
k := hph.storageFn(cell.spk[:cell.spl], cell)
cell.spl = len(k)
copy(cell.spk[:], k)
hph.storageFn(cell.spk[:cell.spl], cell)
}
if err = cell.deriveHashedKeys(depth, hph.keccak, hph.accountKeyLen); err != nil {
return err

View File

@ -46,14 +46,14 @@ func NewMockState(t *testing.T) *MockState {
}
}
func (ms MockState) branchFn(prefix []byte) []byte {
func (ms MockState) branchFn(prefix []byte) ([]byte, error) {
if exBytes, ok := ms.cm[string(prefix)]; ok {
return exBytes[2:] // Skip touchMap, but keep afterMap
return exBytes[2:], nil // Skip touchMap, but keep afterMap
}
return nil
return nil, nil
}
func (ms MockState) accountFn(plainKey []byte, cell *Cell) []byte {
func (ms MockState) accountFn(plainKey []byte, cell *Cell) error {
exBytes, ok := ms.sm[string(plainKey)]
if !ok {
ms.t.Fatalf("accountFn not found key [%x]", plainKey)
@ -91,10 +91,10 @@ func (ms MockState) accountFn(plainKey []byte, cell *Cell) []byte {
} else {
cell.CodeHash = [32]byte{}
}
return plainKey
return nil
}
func (ms MockState) storageFn(plainKey []byte, cell *Cell) []byte {
func (ms MockState) storageFn(plainKey []byte, cell *Cell) error {
exBytes, ok := ms.sm[string(plainKey)]
if !ok {
ms.t.Fatalf("storageFn not found key [%x]", plainKey)
@ -131,7 +131,7 @@ func (ms MockState) storageFn(plainKey []byte, cell *Cell) []byte {
} else {
cell.Storage = [32]byte{}
}
return plainKey
return nil
}
func (ms *MockState) applyPlainUpdates(plainKeys [][]byte, updates []Update) error {

View File

@ -141,6 +141,11 @@ func (opts MdbxOpts) MapSize(sz datasize.ByteSize) MdbxOpts {
return opts
}
func (opts MdbxOpts) WriteMap() MdbxOpts {
opts.flags |= mdbx.WriteMap
return opts
}
func (opts MdbxOpts) WithTablessCfg(f TableCfgFunc) MdbxOpts {
opts.bucketsCfg = f
return opts