diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 2df77dd37..019388cfd 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -41,11 +41,11 @@ import ( "github.com/RoaringBitmap/roaring/roaring64" "github.com/google/btree" - "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/commitment" "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/length" "github.com/ledgerwatch/erigon-lib/compress" + "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/recsplit" "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32" "github.com/ledgerwatch/log/v3" @@ -124,6 +124,21 @@ func (ft FileType) String() string { } } +func (ft FileType) Table() string { + switch ft { + case Account: + return kv.StateAccounts + case Storage: + return kv.StateStorage + case Code: + return kv.StateCode + case Commitment: + return kv.StateCommitment + default: + panic(fmt.Sprintf("unknown file type: %d", ft)) + } +} + func ParseFileType(s string) (FileType, bool) { switch s { case "account": @@ -173,7 +188,6 @@ type Aggregator struct { historyChannel chan struct{} historyError chan error historyWg sync.WaitGroup - trees [NumberOfStateTypes]*btree.BTree fileHits, fileMisses uint64 // Counters for state file hit ratio arches [NumberOfStateTypes][]uint32 // Over-arching hash tables containing the block number of last aggregation archHasher murmur3.Hash128 @@ -191,7 +205,6 @@ type ChangeFile struct { wTx *bufio.Writer r *bufio.Reader rTx *bufio.Reader - sizeCounter uint64 txNum uint64 // Currently read transaction number txRemaining uint64 // Remaining number of bytes to read in the current transaction words []byte // Words pending for the next block record, in the same slice @@ -247,6 +260,12 @@ func (cf *ChangeFile) openFile(blockNum uint64, write bool) error { if cf.fileTx, err = os.OpenFile(cf.pathTx, os.O_RDWR|os.O_CREATE, 0755); err != nil { return err } + if _, err = cf.file.Seek(0, 2 /* relative to the end of the file */); err != nil { + return err + } + if _, err = cf.fileTx.Seek(0, 2 /* relative to the end of the file */); err != nil { + return err + } } else { if cf.file, err = os.Open(cf.path); err != nil { return err @@ -287,6 +306,7 @@ func (cf *ChangeFile) finish(txNum uint64) error { var numBuf [10]byte // Write out words lastOffset := 0 + var size uint64 for _, offset := range cf.wordOffsets { word := cf.words[lastOffset:offset] n := binary.PutUvarint(numBuf[:], uint64(len(word))) @@ -298,7 +318,7 @@ func (cf *ChangeFile) finish(txNum uint64) error { return err } } - cf.sizeCounter += uint64(n + len(word)) + size += uint64(n + len(word)) lastOffset = offset } cf.words = cf.words[:0] @@ -307,11 +327,10 @@ func (cf *ChangeFile) finish(txNum uint64) error { if _, err := cf.wTx.Write(numBuf[:n]); err != nil { return err } - n = binary.PutUvarint(numBuf[:], cf.sizeCounter) + n = binary.PutUvarint(numBuf[:], size) if _, err := cf.wTx.Write(numBuf[:n]); err != nil { return err } - cf.sizeCounter = 0 return nil } @@ -597,7 +616,7 @@ func buildIndex(d *compress.Decompressor, idxPath, tmpDir string, count int) (*r // aggregate gathers changes from the changefiles into a B-tree, and "removes" them from the database // This function is time-critical because it needs to be run in the same go-routine (thread) as the general // execution (due to read-write tx). After that, we can optimistically execute the rest in the background -func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, dbTree *btree.BTree, commitments bool) (*btree.BTree, error) { +func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, tx kv.RwTx, table string, commitments bool) (*btree.BTree, error) { if err := c.openFiles(blockTo, false /* write */); err != nil { return nil, fmt.Errorf("open files: %w", err) } @@ -608,34 +627,42 @@ func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, dbTree *bt } // Clean up the DB table var e error - var search AggregateItem bt.Ascend(func(i btree.Item) bool { item := i.(*AggregateItem) if item.count == 0 { return true } - search.k = item.k - var prevV *AggregateItem - if prevVI := dbTree.Get(&search); prevVI != nil { - prevV = prevVI.(*AggregateItem) + dbPrefix := item.k + prevV, err := tx.GetOne(table, dbPrefix) + if err != nil { + e = err + return false } if prevV == nil { - e = fmt.Errorf("record not found in db tree for key %x", item.k) + e = fmt.Errorf("record not found in db for %s key %x", table, dbPrefix) return false } - if prevV.count < item.count { - e = fmt.Errorf("record count too low for key [%x] count %d, subtracting %d", item.k, prevV.count, item.count) + prevNum := binary.BigEndian.Uint32(prevV[:4]) + if prevNum < item.count { + e = fmt.Errorf("record count too low for %s key %s count %d, subtracting %d", table, dbPrefix, prevNum, item.count) return false } - if prevV.count == item.count { - dbTree.Delete(prevV) + if prevNum == item.count { + if e = tx.Delete(table, dbPrefix, nil); e != nil { + return false + } } else { - prevV.count -= item.count + v := make([]byte, len(prevV)) + binary.BigEndian.PutUint32(v[:4], prevNum-item.count) + copy(v[4:], prevV[4:]) + if e = tx.Put(table, dbPrefix, v); e != nil { + return false + } } return true }) if e != nil { - return nil, fmt.Errorf("clean up after aggregation: %w", e) + return nil, fmt.Errorf("clean up table %s after aggregation: %w", table, e) } return bt, nil } @@ -829,8 +856,9 @@ func (c *Changes) aggregateToBtree(bt *btree.BTree, prefixLen int, commitments b } //fmt.Printf("aggregateToBtree prefix [%x], [%x]+[%x]=>[%x]\n", commitment.CompactToHex(key), after, item.v, mergedVal) item.v = mergedVal + } else { + item.v = common.Copy(after) } - item.v = common.Copy(after) item.count++ } } @@ -956,7 +984,7 @@ func (a *Aggregator) scanStateFiles(files []fs.DirEntry) { } } -func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, changesets, commitments bool, minArch uint64) (*Aggregator, error) { +func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, changesets, commitments bool, minArch uint64, tx kv.RwTx) (*Aggregator, error) { a := &Aggregator{ diffDir: diffDir, unwindLimit: unwindLimit, @@ -977,9 +1005,6 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, c for fType := FirstType; fType < NumberOfTypes; fType++ { a.files[fType] = btree.New(32) } - for fType := FirstType; fType < NumberOfStateTypes; fType++ { - a.trees[fType] = btree.New(32) - } var closeStateFiles = true // It will be set to false in case of success at the end of the function defer func() { // Clean up all decompressor and indices upon error @@ -1074,7 +1099,7 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, c return nil, err } } - if err = a.rebuildRecentState(); err != nil { + if err = a.rebuildRecentState(tx); err != nil { return nil, fmt.Errorf("rebuilding recent state from change files: %w", err) } closeStateFiles = false @@ -1090,18 +1115,28 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, c } // rebuildRecentState reads change files and reconstructs the recent state -func (a *Aggregator) rebuildRecentState() error { +func (a *Aggregator) rebuildRecentState(tx kv.RwTx) error { t := time.Now() var err error + trees := map[FileType]*btree.BTree{} a.changesBtree.Ascend(func(i btree.Item) bool { item := i.(*ChangesItem) for fType := FirstType; fType < NumberOfStateTypes; fType++ { + tree, ok := trees[fType] + if !ok { + tree = btree.New(32) + trees[fType] = tree + } var changes Changes changes.Init(fType.String(), a.aggregationStep, a.diffDir, false /* beforeOn */) if err = changes.openFiles(item.startBlock, false /* write */); err != nil { return false } - if err = changes.aggregateToBtree(a.trees[fType], 0, fType == Commitment); err != nil { + var prefixLen int + if fType == Storage { + prefixLen = length.Addr + } + if err = changes.aggregateToBtree(tree, prefixLen, fType == Commitment); err != nil { return false } if err = changes.closeFiles(); err != nil { @@ -1113,6 +1148,31 @@ func (a *Aggregator) rebuildRecentState() error { if err != nil { return err } + for fType, tree := range trees { + table := fType.Table() + tree.Ascend(func(i btree.Item) bool { + item := i.(*AggregateItem) + if len(item.v) == 0 { + return true + } + var v []byte + if v, err = tx.GetOne(table, item.k); err != nil { + return false + } + if item.count != binary.BigEndian.Uint32(v[:4]) { + err = fmt.Errorf("mismatched count for %x: change file %d, db: %d", item.k, item.count, binary.BigEndian.Uint32(v[:4])) + return false + } + if !bytes.Equal(item.v, v[4:]) { + err = fmt.Errorf("mismatched v for %x: change file [%x], db: [%x]", item.k, item.v, v[4:]) + return false + } + return true + }) + } + if err != nil { + return err + } log.Info("reconstructed recent state", "in", time.Since(t)) return nil } @@ -1787,7 +1847,7 @@ func (a *Aggregator) closeFiles(fType FileType) { func (a *Aggregator) Close() { close(a.aggChannel) - a.aggWg.Wait() // Need to wait for the background aggregation to finish because itsends to merge channels + a.aggWg.Wait() // Need to wait for the background aggregation to finish because it sends to merge channels // Drain channel before closing select { case <-a.mergeChannel: @@ -1952,77 +2012,90 @@ func (a *Aggregator) readByOffset(fType FileType, fileI int, offset uint64) ([]b return key, val } -func (a *Aggregator) MakeStateReader(blockNum uint64) *Reader { +func (a *Aggregator) MakeStateReader(blockNum uint64, tx kv.Tx) *Reader { r := &Reader{ a: a, blockNum: blockNum, + tx: tx, } return r } type Reader struct { a *Aggregator - search AggregateItem + tx kv.Getter blockNum uint64 } -func (r *Reader) ReadAccountData(addr []byte, trace bool) []byte { - // Look in the summary table first - r.search.k = addr - if vi := r.a.trees[Account].Get(&r.search); vi != nil { - return vi.(*AggregateItem).v +func (r *Reader) ReadAccountData(addr []byte, trace bool) ([]byte, error) { + v, err := r.tx.GetOne(kv.StateAccounts, addr) + if err != nil { + return nil, err } - val, _ := r.a.readFromFiles(Account, true /* lock */, r.blockNum, addr, trace) - return val + if v != nil { + return v[4:], nil + } + v, _ = r.a.readFromFiles(Account, true /* lock */, r.blockNum, addr, trace) + return v, nil } -func (r *Reader) ReadAccountStorage(addr []byte, loc []byte, trace bool) *uint256.Int { +func (r *Reader) ReadAccountStorage(addr []byte, loc []byte, trace bool) ([]byte, error) { // Look in the summary table first dbkey := make([]byte, len(addr)+len(loc)) copy(dbkey[0:], addr) copy(dbkey[len(addr):], loc) - r.search.k = dbkey - var v []byte - if vi := r.a.trees[Storage].Get(&r.search); vi != nil { - v = vi.(*AggregateItem).v - } else { - v, _ = r.a.readFromFiles(Storage, true /* lock */, r.blockNum, dbkey, trace) + v, err := r.tx.GetOne(kv.StateStorage, dbkey) + if err != nil { + return nil, err } if v != nil { - return new(uint256.Int).SetBytes(v) + if len(v) == 4 { + return nil, nil + } + return v[4:], nil } - return nil + v, _ = r.a.readFromFiles(Storage, true /* lock */, r.blockNum, dbkey, trace) + return v, nil } -func (r *Reader) ReadAccountCode(addr []byte, trace bool) []byte { +func (r *Reader) ReadAccountCode(addr []byte, trace bool) ([]byte, error) { // Look in the summary table first - r.search.k = addr - if vi := r.a.trees[Code].Get(&r.search); vi != nil { - return vi.(*AggregateItem).v + v, err := r.tx.GetOne(kv.StateCode, addr) + if err != nil { + return nil, err + } + if v != nil { + if len(v) == 4 { + return nil, nil + } + return v[4:], nil } // Look in the files - val, _ := r.a.readFromFiles(Code, true /* lock */, r.blockNum, addr, trace) - return val + v, _ = r.a.readFromFiles(Code, true /* lock */, r.blockNum, addr, trace) + return v, nil } -func (r *Reader) ReadAccountCodeSize(addr []byte, trace bool) int { +func (r *Reader) ReadAccountCodeSize(addr []byte, trace bool) (int, error) { // Look in the summary table first - r.search.k = addr - if vi := r.a.trees[Code].Get(&r.search); vi != nil { - return len(vi.(*AggregateItem).v) + v, err := r.tx.GetOne(kv.StateCode, addr) + if err != nil { + return 0, err + } + if v != nil { + return len(v) - 4, nil } // Look in the files. TODO - use specialised function to only lookup size - val, _ := r.a.readFromFiles(Code, true /* lock */, r.blockNum, addr, trace) - return len(val) + v, _ = r.a.readFromFiles(Code, true /* lock */, r.blockNum, addr, trace) + return len(v), nil } type Writer struct { a *Aggregator - search AggregateItem // Aggregate item used to search in trees blockNum uint64 changeFileNum uint64 // Block number associated with the current change files. It is the last block number whose changes will go into that file changes [NumberOfStateTypes]Changes commTree *btree.BTree // BTree used for gathering commitment data + tx kv.RwTx } func (a *Aggregator) MakeStateWriter(beforeOn bool) *Writer { @@ -2046,7 +2119,8 @@ func (w *Writer) Close() { } } -func (w *Writer) Reset(blockNum uint64) error { +func (w *Writer) Reset(blockNum uint64, tx kv.RwTx) error { + w.tx = tx w.blockNum = blockNum typesLimit := Commitment if w.a.commitments { @@ -2083,16 +2157,18 @@ func (i *CommitmentItem) Less(than btree.Item) bool { return bytes.Compare(i.hashedKey, than.(*CommitmentItem).hashedKey) < 0 } -func (w *Writer) branchFn(prefix []byte) []byte { +func (w *Writer) branchFn(prefix []byte) ([]byte, error) { for lockFType := FirstType; lockFType < NumberOfStateTypes; lockFType++ { w.a.fileLocks[lockFType].RLock() defer w.a.fileLocks[lockFType].RUnlock() } - var mergedVal []byte // Look in the summary table first - w.search.k = prefix - if vi := w.a.trees[Commitment].Get(&w.search); vi != nil { - mergedVal = vi.(*AggregateItem).v + mergedVal, err := w.tx.GetOne(kv.StateCommitment, prefix) + if err != nil { + return nil, err + } + if mergedVal != nil { + mergedVal = mergedVal[4:] } // Look in the files and merge, while it becomes complete var startBlock = w.blockNum + 1 @@ -2104,7 +2180,7 @@ func (w *Writer) branchFn(prefix []byte) []byte { val, startBlock = w.a.readFromFiles(Commitment, false /* lock */, startBlock-1, prefix, false /* trace */) if val == nil { if mergedVal == nil { - return nil + return nil, nil } panic(fmt.Sprintf("Incomplete branch data prefix [%x], mergeVal=[%x], startBlock=%d\n", commitment.CompactToHex(prefix), mergedVal, startBlock)) } @@ -2113,15 +2189,15 @@ func (w *Writer) branchFn(prefix []byte) []byte { if mergedVal == nil { mergedVal = val } else if mergedVal, err = commitment.MergeBranches(val, mergedVal, nil); err != nil { - panic(err) + return nil, err } //fmt.Printf("Post-merge prefix [%x] [%x], startBlock %d\n", commitment.CompactToHex(prefix), mergedVal, startBlock) } if mergedVal == nil { - return nil + return nil, nil } //fmt.Printf("Returning branch data prefix [%x], mergeVal=[%x], startBlock=%d\n", commitment.CompactToHex(prefix), mergedVal, startBlock) - return mergedVal[2:] // Skip touchMap but keep afterMap + return mergedVal[2:], nil // Skip touchMap but keep afterMap } func bytesToUint64(buf []byte) (x uint64) { @@ -2134,12 +2210,14 @@ func bytesToUint64(buf []byte) (x uint64) { return } -func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) []byte { - var enc []byte +func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) error { // Look in the summary table first - w.search.k = plainKey - if encI := w.a.trees[Account].Get(&w.search); encI != nil { - enc = encI.(*AggregateItem).v + enc, err := w.tx.GetOne(kv.StateAccounts, plainKey) + if err != nil { + return err + } + if enc != nil { + enc = enc[4:] } else { // Look in the files enc, _ = w.a.readFromFiles(Account, true /* lock */, w.blockNum, plainKey, false /* trace */) @@ -2162,9 +2240,12 @@ func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) []byte { cell.Balance.SetBytes(enc[pos : pos+balanceBytes]) } } - w.search.k = plainKey - if encI := w.a.trees[Code].Get(&w.search); encI != nil { - enc = encI.(*AggregateItem).v + enc, err = w.tx.GetOne(kv.StateCode, plainKey) + if err != nil { + return err + } + if enc != nil { + enc = enc[4:] } else { // Look in the files enc, _ = w.a.readFromFiles(Code, true /* lock */, w.blockNum, plainKey, false /* trace */) @@ -2174,22 +2255,24 @@ func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) []byte { w.a.keccak.Write(enc) w.a.keccak.(io.Reader).Read(cell.CodeHash[:]) } - return plainKey + return nil } -func (w *Writer) storageFn(plainKey []byte, cell *commitment.Cell) []byte { - var enc []byte +func (w *Writer) storageFn(plainKey []byte, cell *commitment.Cell) error { // Look in the summary table first - w.search.k = plainKey - if encI := w.a.trees[Storage].Get(&w.search); encI != nil { - enc = encI.(*AggregateItem).v + enc, err := w.tx.GetOne(kv.StateStorage, plainKey) + if err != nil { + return err + } + if enc != nil { + enc = enc[4:] } else { // Look in the files enc, _ = w.a.readFromFiles(Storage, true /* lock */, w.blockNum, plainKey, false /* trace */) } cell.StorageLen = len(enc) copy(cell.Storage[:], enc) - return plainKey + return nil } func (w *Writer) captureCommitmentType(fType FileType, trace bool, f func(commTree *btree.BTree, h hash.Hash, key, val []byte)) { @@ -2340,17 +2423,20 @@ func (w *Writer) computeCommitment(trace bool) ([]byte, error) { continue } prefix := []byte(prefixStr) - w.search.k = prefix - var prevV *AggregateItem - if prevVI := w.a.trees[Commitment].Get(&w.search); prevVI != nil { - prevV = prevVI.(*AggregateItem) + var prevV []byte + var prevNum uint32 + if prevV, err = w.tx.GetOne(kv.StateCommitment, prefix); err != nil { + return nil, err + } + if prevV != nil { + prevNum = binary.BigEndian.Uint32(prevV[:4]) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Commitment, true /* lock */, w.blockNum, prefix, false) } else { - original = prevV.v + original = prevV[4:] } if original != nil { var mergedVal []byte @@ -2362,11 +2448,11 @@ func (w *Writer) computeCommitment(trace bool) ([]byte, error) { } } //fmt.Printf("computeCommitment set [%x] [%x]\n", commitment.CompactToHex(prefix), branchNodeUpdate) - if prevV == nil { - w.a.trees[Commitment].ReplaceOrInsert(&AggregateItem{k: prefix, v: branchNodeUpdate, count: 1}) - } else { - prevV.v = branchNodeUpdate - prevV.count++ + v := make([]byte, 4+len(branchNodeUpdate)) + binary.BigEndian.PutUint32(v[:4], prevNum+1) + copy(v[4:], branchNodeUpdate) + if err = w.tx.Put(kv.StateCommitment, prefix, v); err != nil { + return nil, err } if len(branchNodeUpdate) == 0 { w.changes[Commitment].delete(prefix, original) @@ -2430,27 +2516,30 @@ func (w *Writer) Aggregate(trace bool) error { return nil } -func (w *Writer) UpdateAccountData(addr []byte, account []byte, trace bool) { - var prevV *AggregateItem - w.search.k = addr - if prevVI := w.a.trees[Account].Get(&w.search); prevVI != nil { - prevV = prevVI.(*AggregateItem) +func (w *Writer) UpdateAccountData(addr []byte, account []byte, trace bool) error { + var prevNum uint32 + prevV, err := w.tx.GetOne(kv.StateAccounts, addr) + if err != nil { + return err + } + if prevV != nil { + prevNum = binary.BigEndian.Uint32(prevV[:4]) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Account, true /* lock */, w.blockNum, addr, trace) } else { - original = prevV.v + original = prevV[4:] } if bytes.Equal(account, original) { // No change - return + return nil } - if prevV == nil { - w.a.trees[Account].ReplaceOrInsert(&AggregateItem{k: addr, v: account, count: 1}) - } else { - prevV.v = account - prevV.count++ + v := make([]byte, 4+len(account)) + binary.BigEndian.PutUint32(v[:4], prevNum+1) + copy(v[4:], account) + if err = w.tx.Put(kv.StateAccounts, addr, v); err != nil { + return err } if prevV == nil && len(original) == 0 { w.changes[Account].insert(addr, account) @@ -2461,25 +2550,29 @@ func (w *Writer) UpdateAccountData(addr []byte, account []byte, trace bool) { w.a.trace = true w.a.tracedKeys[string(addr)] = struct{}{} } + return nil } -func (w *Writer) UpdateAccountCode(addr []byte, code []byte, trace bool) { - var prevV *AggregateItem - w.search.k = addr - if prevVI := w.a.trees[Code].Get(&w.search); prevVI != nil { - prevV = prevVI.(*AggregateItem) +func (w *Writer) UpdateAccountCode(addr []byte, code []byte, trace bool) error { + var prevNum uint32 + prevV, err := w.tx.GetOne(kv.StateCode, addr) + if err != nil { + return err + } + if prevV != nil { + prevNum = binary.BigEndian.Uint32(prevV[:4]) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Code, true /* lock */, w.blockNum, addr, trace) } else { - original = prevV.v + original = prevV[4:] } - if prevV == nil { - w.a.trees[Code].ReplaceOrInsert(&AggregateItem{k: addr, v: code, count: 1}) - } else { - prevV.v = code - prevV.count++ + v := make([]byte, 4+len(code)) + binary.BigEndian.PutUint32(v[:4], prevNum+1) + copy(v[4:], code) + if err = w.tx.Put(kv.StateCode, addr, v); err != nil { + return err } if prevV == nil && len(original) == 0 { w.changes[Code].insert(addr, code) @@ -2490,12 +2583,14 @@ func (w *Writer) UpdateAccountCode(addr []byte, code []byte, trace bool) { w.a.trace = true w.a.tracedKeys[string(addr)] = struct{}{} } + return nil } type CursorType uint8 const ( FILE_CURSOR CursorType = iota + DB_CURSOR TREE_CURSOR ) @@ -2507,6 +2602,7 @@ type CursorItem struct { key, val []byte dg *compress.Getter tree *btree.BTree + c kv.Cursor } type CursorHeap []*CursorItem @@ -2540,59 +2636,68 @@ func (ch *CursorHeap) Pop() interface{} { return x } -func (w *Writer) deleteAccount(addr []byte, trace bool) bool { - var prevV *AggregateItem - w.search.k = addr - if prevVI := w.a.trees[Account].Get(&w.search); prevVI != nil { - prevV = prevVI.(*AggregateItem) +func (w *Writer) deleteAccount(addr []byte, trace bool) (bool, error) { + prevV, err := w.tx.GetOne(kv.StateAccounts, addr) + if err != nil { + return false, err + } + var prevNum uint32 + if prevV != nil { + prevNum = binary.BigEndian.Uint32(prevV[:4]) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Account, true /* lock */, w.blockNum, addr, trace) if original == nil { - return false + return false, nil } } else { - original = prevV.v + original = prevV[4:] } - if prevV == nil { - w.a.trees[Account].ReplaceOrInsert(&AggregateItem{k: addr, v: nil, count: 1}) - } else { - prevV.v = nil - prevV.count++ + v := make([]byte, 4) + binary.BigEndian.PutUint32(v[:4], prevNum+1) + if err = w.tx.Put(kv.StateAccounts, addr, v); err != nil { + return false, err } w.changes[Account].delete(addr, original) - return true + return true, nil } -func (w *Writer) deleteCode(addr []byte, trace bool) { - var prevV *AggregateItem - w.search.k = addr - if prevVI := w.a.trees[Code].Get(&w.search); prevVI != nil { - prevV = prevVI.(*AggregateItem) +func (w *Writer) deleteCode(addr []byte, trace bool) error { + prevV, err := w.tx.GetOne(kv.StateCode, addr) + if err != nil { + return err + } + var prevNum uint32 + if prevV != nil { + prevNum = binary.BigEndian.Uint32(prevV[:4]) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Code, true /* lock */, w.blockNum, addr, trace) if original == nil { // Nothing to do - return + return nil } } else { - original = prevV.v + original = prevV[4:] } - if prevV == nil { - w.a.trees[Code].ReplaceOrInsert(&AggregateItem{k: addr, v: nil, count: 1}) - } else { - prevV.v = nil - prevV.count++ + v := make([]byte, 4) + binary.BigEndian.PutUint32(v[:4], prevNum+1) + if err = w.tx.Put(kv.StateCode, addr, v); err != nil { + return err } w.changes[Code].delete(addr, original) + return nil } -func (w *Writer) DeleteAccount(addr []byte, trace bool) { - if deleted := w.deleteAccount(addr, trace); !deleted { - return +func (w *Writer) DeleteAccount(addr []byte, trace bool) error { + deleted, err := w.deleteAccount(addr, trace) + if err != nil { + return err + } + if !deleted { + return nil } w.a.fileLocks[Storage].RLock() defer w.a.fileLocks[Storage].RUnlock() @@ -2600,20 +2705,16 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) { // Find all storage items for this address var cp CursorHeap heap.Init(&cp) - w.search.k = addr - foundInTree := false + var c kv.Cursor + if c, err = w.tx.Cursor(kv.StateStorage); err != nil { + return err + } var k, v []byte - w.a.trees[Storage].AscendGreaterOrEqual(&w.search, func(i btree.Item) bool { - item := i.(*AggregateItem) - if bytes.HasPrefix(item.k, addr) { - foundInTree = true - k = item.k - v = item.v - } - return false - }) - if foundInTree { - heap.Push(&cp, &CursorItem{t: TREE_CURSOR, key: common.Copy(k), val: common.Copy(v), tree: w.a.trees[Storage], endBlock: w.blockNum}) + if k, v, err = c.Seek(addr); err != nil { + return err + } + if k != nil && bytes.HasPrefix(k, addr) { + heap.Push(&cp, &CursorItem{t: DB_CURSOR, key: common.Copy(k), val: common.Copy(v), c: c, endBlock: w.blockNum}) } w.a.files[Storage].Ascend(func(i btree.Item) bool { item := i.(*byEndBlockItem) @@ -2672,6 +2773,18 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) { } else { heap.Pop(&cp) } + case DB_CURSOR: + k, v, err = ci1.c.Next() + if err != nil { + return err + } + if k != nil && bytes.HasPrefix(k, addr) { + ci1.key = common.Copy(k) + ci1.val = common.Copy(v) + heap.Fix(&cp, 0) + } else { + heap.Pop(&cp) + } case TREE_CURSOR: skip := true var aitem *AggregateItem @@ -2692,16 +2805,19 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) { } } } - var prevV *AggregateItem - w.search.k = lastKey - if prevVI := w.a.trees[Storage].Get(&w.search); prevVI != nil { - prevV = prevVI.(*AggregateItem) + var prevV []byte + prevV, err = w.tx.GetOne(kv.StateStorage, lastKey) + if err != nil { + return err } - if prevV == nil { - w.a.trees[Storage].ReplaceOrInsert(&AggregateItem{k: lastKey, v: nil, count: 1}) - } else { - prevV.v = nil - prevV.count++ + var prevNum uint32 + if prevV != nil { + prevNum = binary.BigEndian.Uint32(prevV[:4]) + } + v = make([]byte, 4) + binary.BigEndian.PutUint32(v[:4], prevNum+1) + if err = w.tx.Put(kv.StateStorage, lastKey, v); err != nil { + return err } w.changes[Storage].delete(lastKey, lastVal) } @@ -2709,45 +2825,47 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) { w.a.trace = true w.a.tracedKeys[string(addr)] = struct{}{} } + return nil } -func (w *Writer) WriteAccountStorage(addr, loc []byte, value *uint256.Int, trace bool) { +func (w *Writer) WriteAccountStorage(addr, loc []byte, value []byte, trace bool) error { dbkey := make([]byte, len(addr)+len(loc)) copy(dbkey[0:], addr) copy(dbkey[len(addr):], loc) - w.search.k = dbkey - var prevV *AggregateItem - if prevVI := w.a.trees[Storage].Get(&w.search); prevVI != nil { - prevV = prevVI.(*AggregateItem) + prevV, err := w.tx.GetOne(kv.StateStorage, dbkey) + if err != nil { + return err + } + var prevNum uint32 + if prevV != nil { + prevNum = binary.BigEndian.Uint32(prevV[:4]) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Storage, true /* lock */, w.blockNum, dbkey, trace) } else { - original = prevV.v + original = prevV[4:] } - vLen := value.ByteLen() - v := make([]byte, vLen) - value.WriteToSlice(v) - if bytes.Equal(v, original) { + if bytes.Equal(value, original) { // No change - return + return nil } - if prevV == nil { - w.a.trees[Storage].ReplaceOrInsert(&AggregateItem{k: dbkey, v: v, count: 1}) - } else { - prevV.v = v - prevV.count++ + v := make([]byte, 4+len(value)) + binary.BigEndian.PutUint32(v[:4], prevNum+1) + copy(v[4:], value) + if err = w.tx.Put(kv.StateStorage, dbkey, v); err != nil { + return err } if prevV == nil && len(original) == 0 { - w.changes[Storage].insert(dbkey, v) + w.changes[Storage].insert(dbkey, value) } else { - w.changes[Storage].update(dbkey, original, v) + w.changes[Storage].update(dbkey, original, value) } if trace { w.a.trace = true w.a.tracedKeys[string(dbkey)] = struct{}{} } + return nil } // findLargestMerge looks through the state files of the speficied type and determines the largest merge that can be undertaken @@ -2890,7 +3008,7 @@ func (w *Writer) aggregateUpto(blockFrom, blockTo uint64) error { if fType == Storage { prefixLen = length.Addr } - if aggTask.bt[fType], err = aggTask.changes[fType].aggregate(blockFrom, blockTo, prefixLen, w.a.trees[fType], fType == Commitment); err != nil { + if aggTask.bt[fType], err = aggTask.changes[fType].aggregate(blockFrom, blockTo, prefixLen, w.tx, fType.Table(), fType == Commitment); err != nil { return fmt.Errorf("aggregate %sChanges: %w", fType.String(), err) } } diff --git a/aggregator/aggregator_test.go b/aggregator/aggregator_test.go index 6d5abe9e5..78237f769 100644 --- a/aggregator/aggregator_test.go +++ b/aggregator/aggregator_test.go @@ -18,10 +18,13 @@ package aggregator import ( "bytes" + "context" "encoding/binary" "testing" "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/memdb" ) func int160(i uint64) []byte { @@ -67,14 +70,22 @@ func accountWithBalance(i uint64) []byte { } func TestSimpleAggregator(t *testing.T) { + db := memdb.New() + defer db.Close() + var rwTx kv.RwTx + var err error + if rwTx, err = db.BeginRw(context.Background()); err != nil { + t.Fatal(err) + } + defer rwTx.Rollback() tmpDir := t.TempDir() - a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000) + a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000, rwTx) if err != nil { t.Fatal(err) } - + defer a.Close() w := a.MakeStateWriter(true /* beforeOn */) - if err = w.Reset(0); err != nil { + if err = w.Reset(0, rwTx); err != nil { t.Fatal(err) } defer w.Close() @@ -86,17 +97,30 @@ func TestSimpleAggregator(t *testing.T) { if err = w.Aggregate(false /* trace */); err != nil { t.Fatal(err) } - r := a.MakeStateReader(2) - acc := r.ReadAccountData(int160(1), false /* trace */) + r := a.MakeStateReader(2, rwTx) + acc, err := r.ReadAccountData(int160(1), false /* trace */) + if err != nil { + t.Fatal(err) + } if !bytes.Equal(acc, account1) { t.Errorf("read account %x, expected account %x", acc, account1) } - a.Close() + if err = rwTx.Commit(); err != nil { + t.Fatal(err) + } } func TestLoopAggregator(t *testing.T) { + db := memdb.New() + defer db.Close() + var rwTx kv.RwTx + var err error + if rwTx, err = db.BeginRw(context.Background()); err != nil { + t.Fatal(err) + } + defer rwTx.Rollback() tmpDir := t.TempDir() - a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000) + a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000, rwTx) if err != nil { t.Fatal(err) } @@ -107,7 +131,7 @@ func TestLoopAggregator(t *testing.T) { for blockNum := uint64(0); blockNum < 1000; blockNum++ { accountKey := int160(blockNum/10 + 1) //fmt.Printf("blockNum = %d\n", blockNum) - if err = w.Reset(blockNum); err != nil { + if err = w.Reset(blockNum, rwTx); err != nil { t.Fatal(err) } w.UpdateAccountData(accountKey, account1, false /* trace */) @@ -117,18 +141,32 @@ func TestLoopAggregator(t *testing.T) { if err = w.Aggregate(false /* trace */); err != nil { t.Fatal(err) } - r := a.MakeStateReader(blockNum + 1) - acc := r.ReadAccountData(accountKey, false /* trace */) + r := a.MakeStateReader(blockNum+1, rwTx) + acc, err := r.ReadAccountData(accountKey, false /* trace */) + if err != nil { + t.Fatal(err) + } if !bytes.Equal(acc, account1) { t.Errorf("read account %x, expected account %x for block %d", acc, account1, blockNum) } account1 = accountWithBalance(blockNum + 2) } + if err = rwTx.Commit(); err != nil { + t.Fatal(err) + } } func TestRecreateAccountWithStorage(t *testing.T) { + db := memdb.New() + defer db.Close() + var rwTx kv.RwTx + var err error + if rwTx, err = db.BeginRw(context.Background()); err != nil { + t.Fatal(err) + } + defer rwTx.Rollback() tmpDir := t.TempDir() - a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000) + a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000, rwTx) if err != nil { t.Fatal(err) } @@ -139,21 +177,21 @@ func TestRecreateAccountWithStorage(t *testing.T) { w := a.MakeStateWriter(true /* beforeOn */) defer w.Close() for blockNum := uint64(0); blockNum < 100; blockNum++ { - if err = w.Reset(blockNum); err != nil { + if err = w.Reset(blockNum, rwTx); err != nil { t.Fatal(err) } switch blockNum { case 1: w.UpdateAccountData(accountKey, account1, false /* trace */) for s := uint64(0); s < 100; s++ { - w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(s+1), false /* trace */) + w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(s+1).Bytes(), false /* trace */) } case 22: w.DeleteAccount(accountKey, false /* trace */) case 45: w.UpdateAccountData(accountKey, account2, false /* trace */) for s := uint64(50); s < 150; s++ { - w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(2*s+1), false /* trace */) + w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(2*s+1).Bytes(), false /* trace */) } } if err = w.FinishTx(blockNum, false /* trace */); err != nil { @@ -162,52 +200,81 @@ func TestRecreateAccountWithStorage(t *testing.T) { if err = w.Aggregate(false /* trace */); err != nil { t.Fatal(err) } - r := a.MakeStateReader(blockNum + 1) + r := a.MakeStateReader(blockNum+1, rwTx) switch blockNum { case 1: - acc := r.ReadAccountData(accountKey, false /* trace */) + acc, err := r.ReadAccountData(accountKey, false /* trace */) + if err != nil { + t.Fatal(err) + } if !bytes.Equal(account1, acc) { t.Errorf("wrong account after block %d, expected %x, got %x", blockNum, account1, acc) } for s := uint64(0); s < 100; s++ { - v := r.ReadAccountStorage(accountKey, int256(s), false /* trace */) - if !uint256.NewInt(s + 1).Eq(v) { - t.Errorf("wrong storage value after block %d, expected %d, got %s", blockNum, s+1, v) + v, err := r.ReadAccountStorage(accountKey, int256(s), false /* trace */) + if err != nil { + t.Fatal(err) + } + if !uint256.NewInt(s + 1).Eq(uint256.NewInt(0).SetBytes(v)) { + t.Errorf("wrong storage value after block %d, expected %d, got %d", blockNum, s+1, uint256.NewInt(0).SetBytes(v)) } } case 22, 44: - acc := r.ReadAccountData(accountKey, false /* trace */) + acc, err := r.ReadAccountData(accountKey, false /* trace */) + if err != nil { + t.Fatal(err) + } if len(acc) > 0 { t.Errorf("wrong account after block %d, expected nil, got %x", blockNum, acc) } for s := uint64(0); s < 100; s++ { - v := r.ReadAccountStorage(accountKey, int256(s), false /* trace */) + v, err := r.ReadAccountStorage(accountKey, int256(s), false /* trace */) + if err != nil { + t.Fatal(err) + } if v != nil { - t.Errorf("wrong storage value after block %d, expected nil, got %s", blockNum, v) + t.Errorf("wrong storage value after block %d, expected nil, got %d", blockNum, uint256.NewInt(0).SetBytes(v)) } } case 66: - acc := r.ReadAccountData(accountKey, false /* trace */) + acc, err := r.ReadAccountData(accountKey, false /* trace */) + if err != nil { + t.Fatal(err) + } if !bytes.Equal(account2, acc) { t.Errorf("wrong account after block %d, expected %x, got %x", blockNum, account1, acc) } for s := uint64(0); s < 150; s++ { - v := r.ReadAccountStorage(accountKey, int256(s), false /* trace */) + v, err := r.ReadAccountStorage(accountKey, int256(s), false /* trace */) + if err != nil { + t.Fatal(err) + } if s < 50 { if v != nil { - t.Errorf("wrong storage value after block %d, expected nil, got %s", blockNum, v) + t.Errorf("wrong storage value after block %d, expected nil, got %d", blockNum, uint256.NewInt(0).SetBytes(v)) } - } else if v == nil || !uint256.NewInt(2*s+1).Eq(v) { - t.Errorf("wrong storage value after block %d, expected %d, got %s", blockNum, 2*s+1, v) + } else if v == nil || !uint256.NewInt(2*s+1).Eq(uint256.NewInt(0).SetBytes(v)) { + t.Errorf("wrong storage value after block %d, expected %d, got %d", blockNum, 2*s+1, uint256.NewInt(0).SetBytes(v)) } } } } + if err = rwTx.Commit(); err != nil { + t.Fatal(err) + } } func TestChangeCode(t *testing.T) { + db := memdb.New() + defer db.Close() + var rwTx kv.RwTx + var err error + if rwTx, err = db.BeginRw(context.Background()); err != nil { + t.Fatal(err) + } + defer rwTx.Rollback() tmpDir := t.TempDir() - a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000) + a, err := NewAggregator(tmpDir, 16, 4, true, true, 1000, rwTx) if err != nil { t.Fatal(err) } @@ -218,7 +285,7 @@ func TestChangeCode(t *testing.T) { w := a.MakeStateWriter(true /* beforeOn */) defer w.Close() for blockNum := uint64(0); blockNum < 100; blockNum++ { - if err = w.Reset(blockNum); err != nil { + if err = w.Reset(blockNum, rwTx); err != nil { t.Fatal(err) } switch blockNum { @@ -234,22 +301,34 @@ func TestChangeCode(t *testing.T) { if err = w.Aggregate(false /* trace */); err != nil { t.Fatal(err) } - r := a.MakeStateReader(blockNum + 1) + r := a.MakeStateReader(blockNum+1, rwTx) switch blockNum { case 22: - acc := r.ReadAccountData(accountKey, false /* trace */) + acc, err := r.ReadAccountData(accountKey, false /* trace */) + if err != nil { + t.Fatal(err) + } if !bytes.Equal(account1, acc) { t.Errorf("wrong account after block %d, expected %x, got %x", blockNum, account1, acc) } - code := r.ReadAccountCode(accountKey, false /* trace */) + code, err := r.ReadAccountCode(accountKey, false /* trace */) + if err != nil { + t.Fatal(err) + } if !bytes.Equal(code1, code) { t.Errorf("wrong code after block %d, expected %x, got %x", blockNum, code1, code) } case 47: - code := r.ReadAccountCode(accountKey, false /* trace */) + code, err := r.ReadAccountCode(accountKey, false /* trace */) + if err != nil { + t.Fatal(err) + } if code != nil { t.Errorf("wrong code after block %d, expected nil, got %x", blockNum, code) } } } + if err = rwTx.Commit(); err != nil { + t.Fatal(err) + } } diff --git a/commitment/hex_patricia_hashed.go b/commitment/hex_patricia_hashed.go index 99f3285b4..3b7dde8fa 100644 --- a/commitment/hex_patricia_hashed.go +++ b/commitment/hex_patricia_hashed.go @@ -78,11 +78,11 @@ type HexPatriciaHashed struct { // Function used to load branch node and fill up the cells // For each cell, it sets the cell type, clears the modified flag, fills the hash, // and for the extension, account, and leaf type, the `l` and `k` - branchFn func(prefix []byte) []byte + branchFn func(prefix []byte) ([]byte, error) // Function used to fetch account with given plain key - accountFn func(plainKey []byte, cell *Cell) []byte + accountFn func(plainKey []byte, cell *Cell) error // Function used to fetch account with given plain key - storageFn func(plainKey []byte, cell *Cell) []byte + storageFn func(plainKey []byte, cell *Cell) error keccak keccakState keccak2 keccakState accountKeyLen int @@ -92,9 +92,9 @@ type HexPatriciaHashed struct { } func NewHexPatriciaHashed(accountKeyLen int, - branchFn func(prefix []byte) []byte, - accountFn func(plainKey []byte, cell *Cell) []byte, - storageFn func(plainKey []byte, cell *Cell) []byte, + branchFn func(prefix []byte) ([]byte, error), + accountFn func(plainKey []byte, cell *Cell) error, + storageFn func(plainKey []byte, cell *Cell) error, ) *HexPatriciaHashed { return &HexPatriciaHashed{ keccak: sha3.NewLegacyKeccak256().(keccakState), @@ -826,9 +826,9 @@ func (hph *HexPatriciaHashed) Reset() { } func (hph *HexPatriciaHashed) ResetFns( - branchFn func(prefix []byte) []byte, - accountFn func(plainKey []byte, cell *Cell) []byte, - storageFn func(plainKey []byte, cell *Cell) []byte, + branchFn func(prefix []byte) ([]byte, error), + accountFn func(plainKey []byte, cell *Cell) error, + storageFn func(plainKey []byte, cell *Cell) error, ) { hph.branchFn = branchFn hph.accountFn = accountFn @@ -883,7 +883,10 @@ func (hph *HexPatriciaHashed) needUnfolding(hashedKey []byte) int { } func (hph *HexPatriciaHashed) unfoldBranchNode(row int, deleted bool, depth int) error { - branchData := hph.branchFn(hexToCompact(hph.currentKey[:hph.currentKeyLen])) + branchData, err := hph.branchFn(hexToCompact(hph.currentKey[:hph.currentKeyLen])) + if err != nil { + return err + } if !hph.rootChecked && hph.currentKeyLen == 0 && len(branchData) == 0 { // Special case - empty or deleted root hph.rootChecked = true @@ -916,17 +919,13 @@ func (hph *HexPatriciaHashed) unfoldBranchNode(row int, deleted bool, depth int) fmt.Printf("cell (%d, %x) depth=%d, hash=[%x], a=[%x], s=[%x], ex=[%x]\n", row, nibble, depth, cell.h[:cell.hl], cell.apk[:cell.apl], cell.spk[:cell.spl], cell.extension[:cell.extLen]) } if cell.apl > 0 { - k := hph.accountFn(cell.apk[:cell.apl], cell) - cell.apl = len(k) - copy(cell.apk[:], k) + hph.accountFn(cell.apk[:cell.apl], cell) if hph.trace { fmt.Printf("accountFn[%x] return balance=%d, nonce=%d\n", cell.apk[:cell.apl], &cell.Balance, cell.Nonce) } } if cell.spl > 0 { - k := hph.storageFn(cell.spk[:cell.spl], cell) - cell.spl = len(k) - copy(cell.spk[:], k) + hph.storageFn(cell.spk[:cell.spl], cell) } if err = cell.deriveHashedKeys(depth, hph.keccak, hph.accountKeyLen); err != nil { return err diff --git a/commitment/hex_patricia_hashed_test.go b/commitment/hex_patricia_hashed_test.go index cb50efcf0..781ac2315 100644 --- a/commitment/hex_patricia_hashed_test.go +++ b/commitment/hex_patricia_hashed_test.go @@ -46,14 +46,14 @@ func NewMockState(t *testing.T) *MockState { } } -func (ms MockState) branchFn(prefix []byte) []byte { +func (ms MockState) branchFn(prefix []byte) ([]byte, error) { if exBytes, ok := ms.cm[string(prefix)]; ok { - return exBytes[2:] // Skip touchMap, but keep afterMap + return exBytes[2:], nil // Skip touchMap, but keep afterMap } - return nil + return nil, nil } -func (ms MockState) accountFn(plainKey []byte, cell *Cell) []byte { +func (ms MockState) accountFn(plainKey []byte, cell *Cell) error { exBytes, ok := ms.sm[string(plainKey)] if !ok { ms.t.Fatalf("accountFn not found key [%x]", plainKey) @@ -91,10 +91,10 @@ func (ms MockState) accountFn(plainKey []byte, cell *Cell) []byte { } else { cell.CodeHash = [32]byte{} } - return plainKey + return nil } -func (ms MockState) storageFn(plainKey []byte, cell *Cell) []byte { +func (ms MockState) storageFn(plainKey []byte, cell *Cell) error { exBytes, ok := ms.sm[string(plainKey)] if !ok { ms.t.Fatalf("storageFn not found key [%x]", plainKey) @@ -131,7 +131,7 @@ func (ms MockState) storageFn(plainKey []byte, cell *Cell) []byte { } else { cell.Storage = [32]byte{} } - return plainKey + return nil } func (ms *MockState) applyPlainUpdates(plainKeys [][]byte, updates []Update) error { diff --git a/kv/mdbx/kv_mdbx.go b/kv/mdbx/kv_mdbx.go index 5f2f22d9d..a6b71a8c2 100644 --- a/kv/mdbx/kv_mdbx.go +++ b/kv/mdbx/kv_mdbx.go @@ -141,6 +141,11 @@ func (opts MdbxOpts) MapSize(sz datasize.ByteSize) MdbxOpts { return opts } +func (opts MdbxOpts) WriteMap() MdbxOpts { + opts.flags |= mdbx.WriteMap + return opts +} + func (opts MdbxOpts) WithTablessCfg(f TableCfgFunc) MdbxOpts { opts.bucketsCfg = f return opts