diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index e432815cb..b3daee1a2 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -39,7 +39,6 @@ import ( "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/length" "github.com/ledgerwatch/erigon-lib/compress" - "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/recsplit" "github.com/ledgerwatch/log/v3" "golang.org/x/crypto/sha3" @@ -92,6 +91,10 @@ type Aggregator struct { mergeChannel chan struct{} mergeError chan error mergeWg sync.WaitGroup + accountsTree *btree.BTree + codeTree *btree.BTree + storageTree *btree.BTree + commTree *btree.BTree } type ChangeFile struct { @@ -476,8 +479,9 @@ func buildIndex(datPath, idxPath, tmpDir string, count int) (*compress.Decompres } word := make([]byte, 0, 256) var pos uint64 + g := d.MakeGetter() for { - g := d.MakeGetter() + g.Reset(0) for g.HasNext() { word, _ = g.Next(word[:0]) if err = rs.AddKey(word, pos); err != nil { @@ -506,52 +510,45 @@ func buildIndex(datPath, idxPath, tmpDir string, count int) (*compress.Decompres // aggregate gathers changes from the changefiles into a B-tree, and "removes" them from the database // This function is time-critical because it needs to be run in the same go-routine (thread) as the general // execution (due to read-write tx). After that, we can optimistically execute the rest in the background -func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, tx kv.RwTx, table string) (*btree.BTree, error) { +func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, dbTree *btree.BTree) (*btree.BTree, error) { if err := c.openFiles(blockTo, false /* write */); err != nil { return nil, fmt.Errorf("open files: %w", err) } bt := btree.New(32) - err := c.aggregateToBtree(bt, prefixLen) + err := c.aggregateToBtree(bt, prefixLen, true) if err != nil { return nil, fmt.Errorf("aggregateToBtree: %w", err) } // Clean up the DB table var e error + var search AggregateItem bt.Ascend(func(i btree.Item) bool { item := i.(*AggregateItem) if item.count == 0 { return true } - dbPrefix := item.k - prevV, err := tx.GetOne(table, dbPrefix) - if err != nil { - e = err - return false + search.k = item.k + var prevV *AggregateItem + if prevVI := dbTree.Get(&search); prevVI != nil { + prevV = prevVI.(*AggregateItem) } if prevV == nil { - e = fmt.Errorf("record not found in db for %s key %x", table, dbPrefix) + e = fmt.Errorf("record not found in db tree for key %x", item.k) return false } - prevNum := binary.BigEndian.Uint32(prevV[:4]) - if prevNum < item.count { - e = fmt.Errorf("record count too low for %s key %s count %d, subtracting %d", table, dbPrefix, prevNum, item.count) + if prevV.count < item.count { + e = fmt.Errorf("record count too low for key [%x] count %d, subtracting %d", item.k, prevV.count, item.count) return false } - if prevNum == item.count { - if e = tx.Delete(table, dbPrefix, nil); e != nil { - return false - } + if prevV.count == item.count { + dbTree.Delete(prevV) } else { - v := common.Copy(prevV) - binary.BigEndian.PutUint32(v[:4], prevNum-item.count) - if e = tx.Put(table, dbPrefix, v); e != nil { - return false - } + prevV.count -= item.count } return true }) if e != nil { - return nil, fmt.Errorf("clean up table %s after aggregation: %w", table, e) + return nil, fmt.Errorf("clean up after aggregation: %w", e) } return bt, nil } @@ -615,6 +612,7 @@ func (c *Changes) produceChangeSets(datPath, idxPath string) error { }); err != nil { return fmt.Errorf("produceChangeSets NewRecSplit: %w", err) } + g := d.MakeGetter() for { if err = c.rewind(); err != nil { return fmt.Errorf("produceChangeSets rewind2: %w", err) @@ -622,7 +620,7 @@ func (c *Changes) produceChangeSets(datPath, idxPath string) error { var txKey = make([]byte, 8, 60) var pos, prevPos uint64 var txNum uint64 - g := d.MakeGetter() + g.Reset(0) for b, txNum, e = c.prevTx(); b && e == nil; b, txNum, e = c.prevTx() { binary.BigEndian.PutUint64(txKey[:8], txNum) for key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]); b && e == nil; key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]) { @@ -658,7 +656,7 @@ func (c *Changes) produceChangeSets(datPath, idxPath string) error { // and create a B-tree where each key is only represented once, with the value corresponding to the "after" value // of the latest change. Also, the first byte of value in the B-tree indicates whether the change has occurred from // non-existent (zero) value. In such cases, the fist byte is set to 1 (insertion), otherwise it is 0 (update). -func (c *Changes) aggregateToBtree(bt *btree.BTree, prefixLen int) error { +func (c *Changes) aggregateToBtree(bt *btree.BTree, prefixLen int, insertFlag bool) error { var b bool var e error var key, before, after []byte @@ -677,11 +675,19 @@ func (c *Changes) aggregateToBtree(bt *btree.BTree, prefixLen int) error { ai.k = key i := bt.Get(&ai) if i == nil { - item := &AggregateItem{k: common.Copy(key), count: 1, v: common.Copy(after)} + var v []byte + if len(after) > 0 { + if insertFlag { + v = common.Copy(after) + } else { + v = common.Copy(after[1:]) + } + } + item := &AggregateItem{k: common.Copy(key), v: v, count: 1} bt.ReplaceOrInsert(item) } else { item := i.(*AggregateItem) - if len(item.v) > 0 && len(after) > 0 { + if insertFlag && len(item.v) > 0 && len(after) > 0 { item.v[0] = after[0] } item.count++ @@ -745,8 +751,12 @@ type byEndBlockItem struct { startBlock uint64 endBlock uint64 decompressor *compress.Decompressor + getter *compress.Getter // reader for the decompressor + getterMerge *compress.Getter // reader for the decomporessor used in the background merge thread index *recsplit.Index - tree *btree.BTree // Substitute for decompressor+index combination + indexReader *recsplit.IndexReader // reader for the index + readerMerge *recsplit.IndexReader // index reader for the background merge thread + tree *btree.BTree // Substitute for decompressor+index combination } func (i *byEndBlockItem) Less(than btree.Item) bool { @@ -773,6 +783,10 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64) ( aggError: make(chan error, 1), mergeChannel: make(chan struct{}, 1), mergeError: make(chan error, 1), + accountsTree: btree.New(32), + codeTree: btree.New(32), + storageTree: btree.New(32), + commTree: btree.New(32), } var closeStateFiles = true // It will be set to false in case of success at the end of the function defer func() { @@ -939,6 +953,9 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64) ( if err = checkOverlapWithMinStart("commitment", a.commitmentFiles, minStart); err != nil { return nil, err } + if err = a.rebuildRecentState(); err != nil { + return nil, fmt.Errorf("rebuilding recent state from change files: %w", err) + } closeStateFiles = false a.aggWg.Add(1) go a.backgroundAggregation() @@ -947,6 +964,62 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64) ( return a, nil } +// rebuildRecentState reads change files and reconstructs the recent state +func (a *Aggregator) rebuildRecentState() error { + t := time.Now() + var err error + a.changesBtree.Descend(func(i btree.Item) bool { + item := i.(*ChangesItem) + var accountChanges, codeChanges, storageChanges, commChanges Changes + accountChanges.Init("accounts", a.aggregationStep, a.diffDir, false /* beforeOn */) + if accountChanges.openFiles(item.startBlock, false /* write */); err != nil { + return false + } + if err = accountChanges.aggregateToBtree(a.accountsTree, 0, false); err != nil { + return false + } + if err = accountChanges.closeFiles(); err != nil { + return false + } + codeChanges.Init("code", a.aggregationStep, a.diffDir, false /* beforeOn */) + if codeChanges.openFiles(item.startBlock, false /* write */); err != nil { + return false + } + if err = codeChanges.aggregateToBtree(a.codeTree, 0, false); err != nil { + return false + } + if err = codeChanges.closeFiles(); err != nil { + return false + } + storageChanges.Init("storage", a.aggregationStep, a.diffDir, false /* beforeOn */) + if storageChanges.openFiles(item.startBlock, false /* write */); err != nil { + return false + } + if err = storageChanges.aggregateToBtree(a.storageTree, 0, false); err != nil { + return false + } + if err = storageChanges.closeFiles(); err != nil { + return false + } + commChanges.Init("commitment", a.aggregationStep, a.diffDir, false /* we do not unwind commitment */) + if commChanges.openFiles(item.startBlock, false /* write */); err != nil { + return false + } + if err = commChanges.aggregateToBtree(a.commTree, 0, false); err != nil { + return false + } + if err = commChanges.closeFiles(); err != nil { + return false + } + return true + }) + if err != nil { + return err + } + log.Info("reconstructed recent state", "in", time.Since(t)) + return nil +} + type AggregationTask struct { accountChanges *Changes accountsBt *btree.BTree @@ -1051,6 +1124,10 @@ func (a *Aggregator) backgroundAggregation() { a.aggError <- fmt.Errorf("createDatAndIndex accounts: %w", err) return } + accountsItem.getter = accountsItem.decompressor.MakeGetter() + accountsItem.getterMerge = accountsItem.decompressor.MakeGetter() + accountsItem.indexReader = recsplit.NewIndexReader(accountsItem.index) + accountsItem.readerMerge = recsplit.NewIndexReader(accountsItem.index) if err = aggTask.accountChanges.deleteFiles(); err != nil { a.aggError <- fmt.Errorf("delete accountChanges: %w", err) return @@ -1073,6 +1150,10 @@ func (a *Aggregator) backgroundAggregation() { a.aggError <- fmt.Errorf("createDatAndIndex code: %w", err) return } + codeItem.getter = codeItem.decompressor.MakeGetter() + codeItem.getterMerge = codeItem.decompressor.MakeGetter() + codeItem.indexReader = recsplit.NewIndexReader(codeItem.index) + codeItem.readerMerge = recsplit.NewIndexReader(codeItem.index) if err = aggTask.codeChanges.deleteFiles(); err != nil { a.aggError <- fmt.Errorf("delete codeChanges: %w", err) return @@ -1095,6 +1176,10 @@ func (a *Aggregator) backgroundAggregation() { a.aggError <- fmt.Errorf("createDatAndIndex storage: %w", err) return } + storageItem.getter = storageItem.decompressor.MakeGetter() + storageItem.getterMerge = storageItem.decompressor.MakeGetter() + storageItem.indexReader = recsplit.NewIndexReader(storageItem.index) + storageItem.readerMerge = recsplit.NewIndexReader(storageItem.index) if err = aggTask.storageChanges.deleteFiles(); err != nil { a.aggError <- fmt.Errorf("delete storageChanges: %w", err) return @@ -1109,6 +1194,10 @@ func (a *Aggregator) backgroundAggregation() { a.aggError <- fmt.Errorf("createDatAndIndex commitment: %w", err) return } + commitmentItem.getter = commitmentItem.decompressor.MakeGetter() + commitmentItem.getterMerge = commitmentItem.decompressor.MakeGetter() + commitmentItem.indexReader = recsplit.NewIndexReader(commitmentItem.index) + commitmentItem.readerMerge = recsplit.NewIndexReader(commitmentItem.index) if err = aggTask.commChanges.deleteFiles(); err != nil { a.aggError <- fmt.Errorf("delete commChanges: %w", err) return @@ -1186,7 +1275,7 @@ func (cvt *CommitmentValTransform) commitmentValTransform(val []byte, transValBu // Optimised key referencing a state file record (file number and offset within the file) fileI := int(apk[0]) offset := decodeU64(apk[1:]) - g := cvt.preAccounts[fileI].decompressor.MakeGetter() // TODO Cache in the reader + g := cvt.preAccounts[fileI].getterMerge g.Reset(offset) apkBuf, _ = g.Next(apkBuf[:0]) } @@ -1196,9 +1285,8 @@ func (cvt *CommitmentValTransform) commitmentValTransform(val []byte, transValBu if item.index.Empty() { continue } - reader := recsplit.NewIndexReader(item.index) - offset := reader.Lookup(apkBuf) - g := item.decompressor.MakeGetter() // TODO Cache in the reader + offset := item.readerMerge.Lookup(apkBuf) + g := item.getterMerge g.Reset(offset) if g.HasNext() { if keyMatch, _ := g.Match(apkBuf); keyMatch { @@ -1217,7 +1305,7 @@ func (cvt *CommitmentValTransform) commitmentValTransform(val []byte, transValBu // Optimised key referencing a state file record (file number and offset within the file) fileI := int(spk[0]) offset := decodeU64(spk[1:]) - g := cvt.preStorage[fileI].decompressor.MakeGetter() // TODO Cache in the reader + g := cvt.preStorage[fileI].getterMerge g.Reset(offset) spkBuf, _ = g.Next(spkBuf[:0]) } @@ -1227,9 +1315,8 @@ func (cvt *CommitmentValTransform) commitmentValTransform(val []byte, transValBu if item.index.Empty() { continue } - reader := recsplit.NewIndexReader(item.index) - offset := reader.Lookup(spkBuf) - g := item.decompressor.MakeGetter() // TODO Cache in the reader + offset := item.readerMerge.Lookup(spkBuf) + g := item.getterMerge g.Reset(offset) if g.HasNext() { if keyMatch, _ := g.Match(spkBuf); keyMatch { @@ -1375,6 +1462,10 @@ func openFiles(treeName string, diffDir string, tree *btree.BTree) error { if item.index, err = recsplit.OpenIndex(path.Join(diffDir, fmt.Sprintf("%s.%d-%d.idx", treeName, item.startBlock, item.endBlock))); err != nil { return false } + item.getter = item.decompressor.MakeGetter() + item.getterMerge = item.decompressor.MakeGetter() + item.indexReader = recsplit.NewIndexReader(item.index) + item.readerMerge = recsplit.NewIndexReader(item.index) return true }) return err @@ -1440,9 +1531,8 @@ func readFromFiles(treeName string, tree **btree.BTree, lock sync.Locker, blockN if item.index.Empty() { return true } - reader := recsplit.NewIndexReader(item.index) - offset := reader.Lookup(filekey) - g := item.decompressor.MakeGetter() // TODO Cache in the reader + offset := item.indexReader.Lookup(filekey) + g := item.getter g.Reset(offset) if g.HasNext() { if keyMatch, _ := g.Match(filekey); keyMatch { @@ -1471,7 +1561,7 @@ func readByOffset(treeName string, tree **btree.BTree, fileI int, offset uint64) return true } item := i.(*byEndBlockItem) - g := item.decompressor.MakeGetter() // TODO Cache in the reader + g := item.getter g.Reset(offset) key, _ = g.Next(nil) val, _ = g.Next(nil) @@ -1483,10 +1573,9 @@ func readByOffset(treeName string, tree **btree.BTree, fileI int, offset uint64) return key, nil } -func (a *Aggregator) MakeStateReader(tx kv.Getter, blockNum uint64) *Reader { +func (a *Aggregator) MakeStateReader(blockNum uint64) *Reader { r := &Reader{ a: a, - tx: tx, blockNum: blockNum, } return r @@ -1494,76 +1583,60 @@ func (a *Aggregator) MakeStateReader(tx kv.Getter, blockNum uint64) *Reader { type Reader struct { a *Aggregator - tx kv.Getter + search AggregateItem blockNum uint64 } -func (r *Reader) ReadAccountData(addr []byte, trace bool) ([]byte, error) { +func (r *Reader) ReadAccountData(addr []byte, trace bool) []byte { // Look in the summary table first - v, err := r.tx.GetOne(kv.StateAccounts, addr) - if err != nil { - return nil, err + r.search.k = addr + if vi := r.a.accountsTree.Get(&r.search); vi != nil { + return vi.(*AggregateItem).v } - if v != nil { - // First 4 bytes is the number of 1-block state diffs containing the key - if trace { - fmt.Printf("ReadAccountData %x, found in DB: %x, number of diffs: %d\n", addr, v[4:], binary.BigEndian.Uint32(v[:4])) - } - return v[4:], nil - } - // Look in the files - val := readFromFiles("accounts", &r.a.accountsFiles, r.a.accountsFilesLock.RLocker(), r.blockNum, addr, trace) - return val, nil + return readFromFiles("accounts", &r.a.accountsFiles, r.a.accountsFilesLock.RLocker(), r.blockNum, addr, trace) } -func (r *Reader) ReadAccountStorage(addr []byte, loc []byte, trace bool) (*uint256.Int, error) { +func (r *Reader) ReadAccountStorage(addr []byte, loc []byte, trace bool) *uint256.Int { // Look in the summary table first dbkey := make([]byte, len(addr)+len(loc)) copy(dbkey[0:], addr) copy(dbkey[len(addr):], loc) - v, err := r.tx.GetOne(kv.StateStorage, dbkey) - if err != nil { - return nil, err + r.search.k = dbkey + var v []byte + if vi := r.a.storageTree.Get(&r.search); vi != nil { + v = vi.(*AggregateItem).v + } else { + v = readFromFiles("storage", &r.a.storageFiles, r.a.storageFilesLock.RLocker(), r.blockNum, dbkey, trace) } if v != nil { - if trace { - fmt.Printf("ReadAccountStorage %x %x, found in DB: %x, number of diffs: %d\n", addr, loc, v[4:], binary.BigEndian.Uint32(v[:4])) - } - if len(v) == 4 { - return nil, nil - } - // First 4 bytes is the number of 1-block state diffs containing the key - return new(uint256.Int).SetBytes(v[4:]), nil + return new(uint256.Int).SetBytes(v) } - // Look in the files - val := readFromFiles("storage", &r.a.storageFiles, r.a.storageFilesLock.RLocker(), r.blockNum, dbkey, trace) - if val != nil { - return new(uint256.Int).SetBytes(val), nil - } - return nil, nil + return nil } -func (r *Reader) ReadAccountCode(addr []byte, trace bool) ([]byte, error) { +func (r *Reader) ReadAccountCode(addr []byte, trace bool) []byte { // Look in the summary table first - v, err := r.tx.GetOne(kv.StateCode, addr) - if err != nil { - return nil, err - } - if v != nil { - // First 4 bytes is the number of 1-block state diffs containing the key - if trace { - fmt.Printf("ReadAccountCode %x, found in DB: %x, number of diffs: %d\n", addr, v[4:], binary.BigEndian.Uint32(v[:4])) - } - return v[4:], nil + r.search.k = addr + if vi := r.a.codeTree.Get(&r.search); vi != nil { + return vi.(*AggregateItem).v } // Look in the files - val := readFromFiles("code", &r.a.codeFiles, r.a.codeFilesLock.RLocker(), r.blockNum, addr, trace) - return val, nil + return readFromFiles("code", &r.a.codeFiles, r.a.codeFilesLock.RLocker(), r.blockNum, addr, trace) +} + +func (r *Reader) ReadAccountCodeSize(addr []byte, trace bool) int { + // Look in the summary table first + r.search.k = addr + if vi := r.a.codeTree.Get(&r.search); vi != nil { + return len(vi.(*AggregateItem).v) + } + // Look in the files. TODO - use specialised function to only lookup size + return len(readFromFiles("code", &r.a.codeFiles, r.a.codeFilesLock.RLocker(), r.blockNum, addr, trace)) } type Writer struct { a *Aggregator - tx kv.RwTx + search AggregateItem // Aggregate item used to search in trees blockNum uint64 changeFileNum uint64 // Block number associated with the current change files. It is the last block number whose changes will go into that file accountChanges Changes // Change files for accounts @@ -1592,8 +1665,7 @@ func (w *Writer) Close() { w.commChanges.closeFiles() } -func (w *Writer) Reset(tx kv.RwTx, blockNum uint64) error { - w.tx = tx +func (w *Writer) Reset(blockNum uint64) error { w.blockNum = blockNum if blockNum > w.changeFileNum { if err := w.accountChanges.closeFiles(); err != nil { @@ -1656,18 +1728,14 @@ func (w *Writer) unlockFn() { func (w *Writer) branchFn(prefix []byte) ([]byte, error) { // Look in the summary table first - dbPrefix := prefix - v, err := w.tx.GetOne(kv.StateCommitment, dbPrefix) - if err != nil { - return nil, err - } - if v != nil { - // First 4 bytes is the number of 1-block state diffs containing the key - return v[4:], nil + w.search.k = prefix + var v []byte + if vi := w.a.commTree.Get(&w.search); vi != nil { + return vi.(*AggregateItem).v, nil } // Look in the files - val := readFromFiles("commitment", &w.a.commitmentFiles, nil /* lock */, w.blockNum, prefix, false /* trace */) - return val, nil + v = readFromFiles("commitment", &w.a.commitmentFiles, nil /* lock */, w.blockNum, prefix, false /* trace */) + return v, nil } func bytesToUint64(buf []byte) (x uint64) { @@ -1682,20 +1750,15 @@ func bytesToUint64(buf []byte) (x uint64) { func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) ([]byte, error) { var enc []byte - var v []byte - var err error if len(plainKey) != length.Addr { // Accessing account key and value via "thin reference" to the state file and offset fileI := int(plainKey[0]) offset := decodeU64(plainKey[1:]) plainKey, enc = readByOffset("accounts", &w.a.accountsFiles, fileI, offset) } else { // Full account key is provided, search as usual // Look in the summary table first - if v, err = w.tx.GetOne(kv.StateAccounts, plainKey); err != nil { - return nil, err - } - if v != nil { - // First 4 bytes is the number of 1-block state diffs containing the key - enc = v[4:] + w.search.k = plainKey + if encI := w.a.accountsTree.Get(&w.search); encI != nil { + enc = encI.(*AggregateItem).v } else { // Look in the files enc = readFromFiles("accounts", &w.a.accountsFiles, nil /* lock */, w.blockNum, plainKey, false /* trace */) @@ -1719,13 +1782,9 @@ func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) ([]byte, erro cell.Balance.SetBytes(enc[pos : pos+balanceBytes]) } } - v, err = w.tx.GetOne(kv.StateCode, plainKey) - if err != nil { - return nil, err - } - if v != nil { - // First 4 bytes is the number of 1-block state diffs containing the key - enc = v[4:] + w.search.k = plainKey + if encI := w.a.codeTree.Get(&w.search); encI != nil { + enc = encI.(*AggregateItem).v } else { // Look in the files enc = readFromFiles("code", &w.a.codeFiles, nil /* lock */, w.blockNum, plainKey, false /* trace */) @@ -1740,20 +1799,15 @@ func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) ([]byte, erro func (w *Writer) storageFn(plainKey []byte, cell *commitment.Cell) ([]byte, error) { var enc []byte - var v []byte - var err error if len(plainKey) != length.Addr+length.Hash { // Accessing storage key and value via "thin reference" to the state file and offset fileI := int(plainKey[0]) offset := decodeU64(plainKey[1:]) plainKey, enc = readByOffset("storage", &w.a.storageFiles, fileI, offset) } else { // Full storage key is provided, search as usual // Look in the summary table first - if v, err = w.tx.GetOne(kv.StateStorage, plainKey); err != nil { - return nil, err - } - if v != nil { - // First 4 bytes is the number of 1-block state diffs containing the key - enc = v[4:] + w.search.k = plainKey + if encI := w.a.storageTree.Get(&w.search); encI != nil { + enc = encI.(*AggregateItem).v } else { // Look in the files enc = readFromFiles("storage", &w.a.storageFiles, nil /* lock */, w.blockNum, plainKey, false /* trace */) @@ -1927,33 +1981,30 @@ func (w *Writer) computeCommitment(trace bool) ([]byte, error) { } for prefixStr, branchNodeUpdate := range branchNodeUpdates { prefix := []byte(prefixStr) - dbPrefix := prefix - prevV, err := w.tx.GetOne(kv.StateCommitment, dbPrefix) - if err != nil { - return nil, err + w.search.k = prefix + var prevV *AggregateItem + if prevVI := w.a.commTree.Get(&w.search); prevVI != nil { + prevV = prevVI.(*AggregateItem) } - var prevNum uint32 + var original []byte if prevV == nil { original = readFromFiles("commitment", &w.a.commitmentFiles, w.a.commFilesLock.RLocker(), w.blockNum, prefix, false) } else { - prevNum = binary.BigEndian.Uint32(prevV[:4]) + original = prevV.v } - v := make([]byte, 4+len(branchNodeUpdate)) - binary.BigEndian.PutUint32(v[:4], prevNum+1) - copy(v[4:], branchNodeUpdate) - if err = w.tx.Put(kv.StateCommitment, dbPrefix, v); err != nil { - return nil, err + if prevV == nil { + w.a.commTree.ReplaceOrInsert(&AggregateItem{k: prefix, v: branchNodeUpdate, count: 1}) + } else { + prevV.v = branchNodeUpdate + prevV.count++ } if len(branchNodeUpdate) == 0 { w.commChanges.delete(prefix, original) } else { - if prevV == nil && original == nil { + if prevV == nil && len(original) == 0 { w.commChanges.insert(prefix, branchNodeUpdate) } else { - if original == nil { - original = prevV[4:] - } w.commChanges.update(prefix, original, branchNodeUpdate) } } @@ -2008,28 +2059,27 @@ func (w *Writer) Aggregate(trace bool) error { return nil } -func (w *Writer) UpdateAccountData(addr []byte, account []byte, trace bool) error { - prevV, err := w.tx.GetOne(kv.StateAccounts, addr) - if err != nil { - return err +func (w *Writer) UpdateAccountData(addr []byte, account []byte, trace bool) { + var prevV *AggregateItem + w.search.k = addr + if prevVI := w.a.accountsTree.Get(&w.search); prevVI != nil { + prevV = prevVI.(*AggregateItem) } - var prevNum uint32 var original []byte if prevV == nil { original = readFromFiles("accounts", &w.a.accountsFiles, w.a.accountsFilesLock.RLocker(), w.blockNum, addr, trace) } else { - prevNum = binary.BigEndian.Uint32(prevV[:4]) - original = prevV[4:] + original = prevV.v } if bytes.Equal(account, original) { // No change - return nil + return } - v := make([]byte, 4+len(account)) - binary.BigEndian.PutUint32(v[:4], prevNum+1) - copy(v[4:], account) - if err = w.tx.Put(kv.StateAccounts, addr, v); err != nil { - return err + if prevV == nil { + w.a.accountsTree.ReplaceOrInsert(&AggregateItem{k: addr, v: account, count: 1}) + } else { + prevV.v = account + prevV.count++ } if prevV == nil && len(original) == 0 { w.accountChanges.insert(addr, account) @@ -2040,47 +2090,41 @@ func (w *Writer) UpdateAccountData(addr []byte, account []byte, trace bool) erro w.a.trace = true w.a.tracedKeys[string(addr)] = struct{}{} } - return nil } -func (w *Writer) UpdateAccountCode(addr []byte, code []byte, trace bool) error { - prevV, err := w.tx.GetOne(kv.StateCode, addr) - if err != nil { - return err +func (w *Writer) UpdateAccountCode(addr []byte, code []byte, trace bool) { + var prevV *AggregateItem + w.search.k = addr + if prevVI := w.a.codeTree.Get(&w.search); prevVI != nil { + prevV = prevVI.(*AggregateItem) } - var prevNum uint32 var original []byte if prevV == nil { original = readFromFiles("code", &w.a.codeFiles, w.a.codeFilesLock.RLocker(), w.blockNum, addr, trace) } else { - prevNum = binary.BigEndian.Uint32(prevV[:4]) + original = prevV.v } - v := make([]byte, 4+len(code)) - binary.BigEndian.PutUint32(v[:4], prevNum+1) - copy(v[4:], code) - if err = w.tx.Put(kv.StateCode, addr, v); err != nil { - return err + if prevV == nil { + w.a.codeTree.ReplaceOrInsert(&AggregateItem{k: addr, v: code, count: 1}) + } else { + prevV.v = code + prevV.count++ } - if prevV == nil && original == nil { + if prevV == nil && len(original) == 0 { w.codeChanges.insert(addr, code) } else { - if original == nil { - original = prevV[4:] - } w.codeChanges.update(addr, original, code) } if trace { w.a.trace = true w.a.tracedKeys[string(addr)] = struct{}{} } - return nil } type CursorType uint8 const ( FILE_CURSOR CursorType = iota - DB_CURSOR TREE_CURSOR ) @@ -2091,7 +2135,6 @@ type CursorItem struct { endBlock uint64 key, val []byte dg *compress.Getter - c kv.Cursor tree *btree.BTree } @@ -2126,79 +2169,78 @@ func (ch *CursorHeap) Pop() interface{} { return x } -func (w *Writer) deleteAccount(addr []byte, trace bool) (bool, error) { - prevV, err := w.tx.GetOne(kv.StateAccounts, addr) - if err != nil { - return false, err +func (w *Writer) deleteAccount(addr []byte, trace bool) bool { + var prevV *AggregateItem + w.search.k = addr + if prevVI := w.a.accountsTree.Get(&w.search); prevVI != nil { + prevV = prevVI.(*AggregateItem) } - var prevNum uint32 var original []byte if prevV == nil { original = readFromFiles("accounts", &w.a.accountsFiles, w.a.accountsFilesLock.RLocker(), w.blockNum, addr, trace) if original == nil { - return false, nil + return false } } else { - prevNum = binary.BigEndian.Uint32(prevV[:4]) - original = prevV[4:] + original = prevV.v } - v := make([]byte, 4) - binary.BigEndian.PutUint32(v[:4], prevNum+1) - if err = w.tx.Put(kv.StateAccounts, addr, v); err != nil { - return false, err + if prevV == nil { + w.a.accountsTree.ReplaceOrInsert(&AggregateItem{k: addr, v: nil, count: 1}) + } else { + prevV.v = nil + prevV.count++ } w.accountChanges.delete(addr, original) - return true, nil + return true } -func (w *Writer) deleteCode(addr []byte, trace bool) error { - prevV, err := w.tx.GetOne(kv.StateCode, addr) - if err != nil { - return err +func (w *Writer) deleteCode(addr []byte, trace bool) { + var prevV *AggregateItem + w.search.k = addr + if prevVI := w.a.codeTree.Get(&w.search); prevVI != nil { + prevV = prevVI.(*AggregateItem) } - var prevNum uint32 var original []byte if prevV == nil { original = readFromFiles("code", &w.a.codeFiles, w.a.codeFilesLock.RLocker(), w.blockNum, addr, trace) if original == nil { // Nothing to do - return nil + return } } else { - prevNum = binary.BigEndian.Uint32(prevV[:4]) - original = prevV[4:] + original = prevV.v } - v := make([]byte, 4) - binary.BigEndian.PutUint32(v[:4], prevNum+1) - if err = w.tx.Put(kv.StateCode, addr, v); err != nil { - return err + if prevV == nil { + w.a.codeTree.ReplaceOrInsert(&AggregateItem{k: addr, v: nil, count: 1}) + } else { + prevV.v = nil + prevV.count++ } w.codeChanges.delete(addr, original) - return nil } -func (w *Writer) DeleteAccount(addr []byte, trace bool) error { - if deleted, err := w.deleteAccount(addr, trace); err != nil { - return err - } else if !deleted { - return nil - } - if err := w.deleteCode(addr, trace); err != nil { - return err +func (w *Writer) DeleteAccount(addr []byte, trace bool) { + if deleted := w.deleteAccount(addr, trace); !deleted { + return } + w.deleteCode(addr, trace) // Find all storage items for this address var cp CursorHeap heap.Init(&cp) - c, err := w.tx.Cursor(kv.StateStorage) - if err != nil { - return err - } + w.search.k = addr + found := false var k, v []byte - if k, v, err = c.Seek(addr); err != nil { - return err - } - if k != nil && bytes.HasPrefix(k, addr) { - heap.Push(&cp, &CursorItem{t: DB_CURSOR, key: common.Copy(k), val: common.Copy(v), c: c, endBlock: w.blockNum}) + w.a.storageTree.AscendGreaterOrEqual(&w.search, func(i btree.Item) bool { + item := i.(*AggregateItem) + if bytes.HasPrefix(item.k, addr) { + found = true + k = item.k + v = item.v + } + return false + }) + if found { + heap.Push(&cp, &CursorItem{t: TREE_CURSOR, key: common.Copy(k), val: common.Copy(v), tree: w.a.storageTree, endBlock: w.blockNum}) } w.a.storageFiles.Ascend(func(i btree.Item) bool { item := i.(*byEndBlockItem) @@ -2219,9 +2261,8 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) error { if item.index.Empty() { return true } - reader := recsplit.NewIndexReader(item.index) - offset := reader.Lookup(addr) - g := item.decompressor.MakeGetter() // TODO Cache in the reader + offset := item.indexReader.Lookup(addr) + g := item.getter g.Reset(offset) if g.HasNext() { if keyMatch, _ := g.Match(addr); !keyMatch { @@ -2257,18 +2298,6 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) error { } else { heap.Pop(&cp) } - case DB_CURSOR: - k, v, err = ci1.c.Next() - if err != nil { - return err - } - if k != nil && bytes.HasPrefix(k, addr) { - ci1.key = common.Copy(k) - ci1.val = common.Copy(v) - heap.Fix(&cp, 0) - } else { - heap.Pop(&cp) - } case TREE_CURSOR: skip := true var aitem *AggregateItem @@ -2289,19 +2318,16 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) error { } } } - var prevV []byte - prevV, err = w.tx.GetOne(kv.StateStorage, lastKey) - if err != nil { - return err + var prevV *AggregateItem + w.search.k = lastKey + if prevVI := w.a.storageTree.Get(&w.search); prevVI != nil { + prevV = prevVI.(*AggregateItem) } - var prevNum uint32 - if prevV != nil { - prevNum = binary.BigEndian.Uint32(prevV[:4]) - } - v = make([]byte, 4) - binary.BigEndian.PutUint32(v[:4], prevNum+1) - if err = w.tx.Put(kv.StateStorage, lastKey, v); err != nil { - return err + if prevV == nil { + w.a.storageTree.ReplaceOrInsert(&AggregateItem{k: lastKey, v: nil, count: 1}) + } else { + prevV.v = nil + prevV.count++ } w.storageChanges.delete(lastKey, lastVal) } @@ -2309,46 +2335,45 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) error { w.a.trace = true w.a.tracedKeys[string(addr)] = struct{}{} } - return nil } -func (w *Writer) WriteAccountStorage(addr []byte, loc []byte, value *uint256.Int, trace bool) error { +func (w *Writer) WriteAccountStorage(addr []byte, loc []byte, value *uint256.Int, trace bool) { dbkey := make([]byte, len(addr)+len(loc)) copy(dbkey[0:], addr) copy(dbkey[len(addr):], loc) - prevV, err := w.tx.GetOne(kv.StateStorage, dbkey) - if err != nil { - return err + w.search.k = dbkey + var prevV *AggregateItem + if prevVI := w.a.storageTree.Get(&w.search); prevVI != nil { + prevV = prevVI.(*AggregateItem) } - var prevNum uint32 var original []byte if prevV == nil { original = readFromFiles("storage", &w.a.storageFiles, w.a.storageFilesLock.RLocker(), w.blockNum, dbkey, trace) } else { - prevNum = binary.BigEndian.Uint32(prevV[:4]) - original = prevV[4:] + original = prevV.v } vLen := value.ByteLen() - v := make([]byte, 4+vLen) - binary.BigEndian.PutUint32(v[:4], prevNum+1) - value.WriteToSlice(v[4:]) - if bytes.Equal(v[4:], original) { + v := make([]byte, vLen) + value.WriteToSlice(v) + if bytes.Equal(v, original) { // No change - return nil + return } - if err = w.tx.Put(kv.StateStorage, dbkey, v); err != nil { - return err + if prevV == nil { + w.a.storageTree.ReplaceOrInsert(&AggregateItem{k: dbkey, v: v, count: 1}) + } else { + prevV.v = v + prevV.count++ } if prevV == nil && len(original) == 0 { - w.storageChanges.insert(dbkey, v[4:]) + w.storageChanges.insert(dbkey, v) } else { - w.storageChanges.update(dbkey, original, v[4:]) + w.storageChanges.update(dbkey, original, v) } if trace { w.a.trace = true w.a.tracedKeys[string(dbkey)] = struct{}{} } - return nil } func findLargestMerge(tree **btree.BTree, lock sync.Locker, maxTo uint64) (toAggregate []*byEndBlockItem, pre []*byEndBlockItem, post []*byEndBlockItem, aggFrom uint64, aggTo uint64) { @@ -2394,6 +2419,7 @@ func (a *Aggregator) computeAggregation(treeName string, toAggregate []*byEndBlo heap.Init(&cp) for _, ag := range toAggregate { g := ag.decompressor.MakeGetter() + g.Reset(0) if g.HasNext() { key, _ := g.Next(nil) val, _ := g.Next(nil) @@ -2404,6 +2430,10 @@ func (a *Aggregator) computeAggregation(treeName string, toAggregate []*byEndBlo if item2.decompressor, item2.index, err = a.mergeIntoStateFile(&cp, 0, treeName, aggFrom, aggTo, a.diffDir, valTransform); err != nil { return nil, fmt.Errorf("mergeIntoStateFile %s [%d-%d]: %w", treeName, aggFrom, aggTo, err) } + item2.getter = item2.decompressor.MakeGetter() + item2.getterMerge = item2.decompressor.MakeGetter() + item2.indexReader = recsplit.NewIndexReader(item2.index) + item2.readerMerge = recsplit.NewIndexReader(item2.index) return item2, nil } @@ -2449,19 +2479,19 @@ func (w *Writer) aggregateUpto(blockFrom, blockTo uint64) error { commChanges.Init("commitment", w.a.aggregationStep, w.a.diffDir, false /* beforeOn */) var err error var accountsBt *btree.BTree - if accountsBt, err = accountChanges.aggregate(blockFrom, blockTo, 0, w.tx, kv.StateAccounts); err != nil { + if accountsBt, err = accountChanges.aggregate(blockFrom, blockTo, 0, w.a.accountsTree); err != nil { return fmt.Errorf("aggregate accountsChanges: %w", err) } var codeBt *btree.BTree - if codeBt, err = codeChanges.aggregate(blockFrom, blockTo, 0, w.tx, kv.StateCode); err != nil { + if codeBt, err = codeChanges.aggregate(blockFrom, blockTo, 0, w.a.codeTree); err != nil { return fmt.Errorf("aggregate codeChanges: %w", err) } var storageBt *btree.BTree - if storageBt, err = storageChanges.aggregate(blockFrom, blockTo, 20, w.tx, kv.StateStorage); err != nil { + if storageBt, err = storageChanges.aggregate(blockFrom, blockTo, 20, w.a.storageTree); err != nil { return fmt.Errorf("aggregate storageChanges: %w", err) } var commitmentBt *btree.BTree - if commitmentBt, err = commChanges.aggregate(blockFrom, blockTo, 0, w.tx, kv.StateCommitment); err != nil { + if commitmentBt, err = commChanges.aggregate(blockFrom, blockTo, 0, w.a.commTree); err != nil { return fmt.Errorf("aggregate commitmentChanges: %w", err) } aggTime := time.Since(t) diff --git a/aggregator/aggregator_test.go b/aggregator/aggregator_test.go index 6fe6b05ed..a3e4bcf06 100644 --- a/aggregator/aggregator_test.go +++ b/aggregator/aggregator_test.go @@ -18,13 +18,10 @@ package aggregator import ( "bytes" - "context" "encoding/binary" "testing" "github.com/holiman/uint256" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/memdb" ) func int160(i uint64) []byte { @@ -71,46 +68,26 @@ func accountWithBalance(i uint64) []byte { func TestSimpleAggregator(t *testing.T) { tmpDir := t.TempDir() - db := memdb.New() - defer db.Close() a, err := NewAggregator(tmpDir, 16, 4) if err != nil { t.Fatal(err) } - var rwTx kv.RwTx - if rwTx, err = db.BeginRw(context.Background()); err != nil { - t.Fatal(err) - } - defer rwTx.Rollback() w := a.MakeStateWriter(true /* beforeOn */) - if err = w.Reset(rwTx, 0); err != nil { + if err = w.Reset(0); err != nil { t.Fatal(err) } defer w.Close() var account1 = accountWithBalance(1) - if err = w.UpdateAccountData(int160(1), account1, false /* trace */); err != nil { - t.Fatal(err) - } + w.UpdateAccountData(int160(1), account1, false /* trace */) if err = w.FinishTx(0, false); err != nil { t.Fatal(err) } if err = w.Aggregate(false /* trace */); err != nil { t.Fatal(err) } - if err = rwTx.Commit(); err != nil { - t.Fatal(err) - } - var tx kv.Tx - if tx, err = db.BeginRo(context.Background()); err != nil { - t.Fatal(err) - } - defer tx.Rollback() - r := a.MakeStateReader(tx, 2) - var acc []byte - if acc, err = r.ReadAccountData(int160(1), false /* trace */); err != nil { - t.Fatal(err) - } + r := a.MakeStateReader(2) + acc := r.ReadAccountData(int160(1), false /* trace */) if !bytes.Equal(acc, account1) { t.Errorf("read account %x, expected account %x", acc, account1) } @@ -119,86 +96,51 @@ func TestSimpleAggregator(t *testing.T) { func TestLoopAggregator(t *testing.T) { tmpDir := t.TempDir() - db := memdb.New() - defer db.Close() a, err := NewAggregator(tmpDir, 16, 4) if err != nil { t.Fatal(err) } defer a.Close() var account1 = accountWithBalance(1) - var rwTx kv.RwTx - defer func() { - rwTx.Rollback() - }() - var tx kv.Tx - defer func() { - tx.Rollback() - }() w := a.MakeStateWriter(true /* beforeOn */) defer w.Close() - ctx := context.Background() for blockNum := uint64(0); blockNum < 1000; blockNum++ { accountKey := int160(blockNum/10 + 1) //fmt.Printf("blockNum = %d\n", blockNum) - if rwTx, err = db.BeginRw(ctx); err != nil { - t.Fatal(err) - } - if err = w.Reset(rwTx, blockNum); err != nil { - t.Fatal(err) - } - if err = w.UpdateAccountData(accountKey, account1, false /* trace */); err != nil { + if err = w.Reset(blockNum); err != nil { t.Fatal(err) } + w.UpdateAccountData(accountKey, account1, false /* trace */) if err = w.FinishTx(blockNum, false /* trace */); err != nil { t.Fatal(err) } if err = w.Aggregate(false /* trace */); err != nil { t.Fatal(err) } - if err = rwTx.Commit(); err != nil { - t.Fatal(err) - } - if tx, err = db.BeginRo(ctx); err != nil { - t.Fatal(err) - } - r := a.MakeStateReader(tx, blockNum+1) - var acc []byte - if acc, err = r.ReadAccountData(accountKey, false /* trace */); err != nil { - t.Fatal(err) - } - tx.Rollback() + r := a.MakeStateReader(blockNum + 1) + acc := r.ReadAccountData(accountKey, false /* trace */) if !bytes.Equal(acc, account1) { t.Errorf("read account %x, expected account %x for block %d", acc, account1, blockNum) } account1 = accountWithBalance(blockNum + 2) } - if tx, err = db.BeginRo(ctx); err != nil { - t.Fatal(err) - } blockNum := uint64(1000) - r := a.MakeStateReader(tx, blockNum) + r := a.MakeStateReader(blockNum) for i := uint64(0); i < blockNum/10+1; i++ { accountKey := int160(i) var expected []byte if i > 0 { expected = accountWithBalance(i * 10) } - var acc []byte - if acc, err = r.ReadAccountData(accountKey, false /* trace */); err != nil { - t.Fatal(err) - } + acc := r.ReadAccountData(accountKey, false /* trace */) if !bytes.Equal(acc, expected) { t.Errorf("read account %x, expected account %x for block %d", acc, expected, i) } } - tx.Rollback() } func TestRecreateAccountWithStorage(t *testing.T) { tmpDir := t.TempDir() - db := memdb.New() - defer db.Close() a, err := NewAggregator(tmpDir, 16, 4) if err != nil { t.Fatal(err) @@ -207,46 +149,24 @@ func TestRecreateAccountWithStorage(t *testing.T) { accountKey := int160(1) var account1 = accountWithBalance(1) var account2 = accountWithBalance(2) - var rwTx kv.RwTx - defer func() { - rwTx.Rollback() - }() - var tx kv.Tx - defer func() { - tx.Rollback() - }() w := a.MakeStateWriter(true /* beforeOn */) defer w.Close() - ctx := context.Background() for blockNum := uint64(0); blockNum < 100; blockNum++ { - if rwTx, err = db.BeginRw(ctx); err != nil { - t.Fatal(err) - } - if err = w.Reset(rwTx, blockNum); err != nil { + if err = w.Reset(blockNum); err != nil { t.Fatal(err) } switch blockNum { case 1: - if err = w.UpdateAccountData(accountKey, account1, false /* trace */); err != nil { - t.Fatal(err) - } + w.UpdateAccountData(accountKey, account1, false /* trace */) for s := uint64(0); s < 100; s++ { - if err = w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(s+1), false /* trace */); err != nil { - t.Fatal(err) - } + w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(s+1), false /* trace */) } case 22: - if err = w.DeleteAccount(accountKey, false /* trace */); err != nil { - t.Fatal(err) - } + w.DeleteAccount(accountKey, false /* trace */) case 45: - if err = w.UpdateAccountData(accountKey, account2, false /* trace */); err != nil { - t.Fatal(err) - } + w.UpdateAccountData(accountKey, account2, false /* trace */) for s := uint64(50); s < 150; s++ { - if err = w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(2*s+1), false /* trace */); err != nil { - t.Fatal(err) - } + w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(2*s+1), false /* trace */) } } if err = w.FinishTx(blockNum, false /* trace */); err != nil { @@ -255,61 +175,37 @@ func TestRecreateAccountWithStorage(t *testing.T) { if err = w.Aggregate(false /* trace */); err != nil { t.Fatal(err) } - if err = rwTx.Commit(); err != nil { - t.Fatal(err) - } - if tx, err = db.BeginRo(ctx); err != nil { - t.Fatal(err) - } - r := a.MakeStateReader(tx, blockNum+1) + r := a.MakeStateReader(blockNum + 1) switch blockNum { case 1: - var acc []byte - if acc, err = r.ReadAccountData(accountKey, false /* trace */); err != nil { - t.Fatal(err) - } + acc := r.ReadAccountData(accountKey, false /* trace */) if !bytes.Equal(account1, acc) { t.Errorf("wrong account after block %d, expected %x, got %x", blockNum, account1, acc) } for s := uint64(0); s < 100; s++ { - var v *uint256.Int - if v, err = r.ReadAccountStorage(accountKey, int256(s), false /* trace */); err != nil { - t.Fatal(err) - } + v := r.ReadAccountStorage(accountKey, int256(s), false /* trace */) if !uint256.NewInt(s + 1).Eq(v) { t.Errorf("wrong storage value after block %d, expected %d, got %s", blockNum, s+1, v) } } case 22, 44: - var acc []byte - if acc, err = r.ReadAccountData(accountKey, false /* trace */); err != nil { - t.Fatal(err) - } + acc := r.ReadAccountData(accountKey, false /* trace */) if len(acc) > 0 { t.Errorf("wrong account after block %d, expected nil, got %x", blockNum, acc) } for s := uint64(0); s < 100; s++ { - var v *uint256.Int - if v, err = r.ReadAccountStorage(accountKey, int256(s), false /* trace */); err != nil { - t.Fatal(err) - } + v := r.ReadAccountStorage(accountKey, int256(s), false /* trace */) if v != nil { t.Errorf("wrong storage value after block %d, expected nil, got %s", blockNum, v) } } case 66: - var acc []byte - if acc, err = r.ReadAccountData(accountKey, false /* trace */); err != nil { - t.Fatal(err) - } + acc := r.ReadAccountData(accountKey, false /* trace */) if !bytes.Equal(account2, acc) { t.Errorf("wrong account after block %d, expected %x, got %x", blockNum, account1, acc) } for s := uint64(0); s < 150; s++ { - var v *uint256.Int - if v, err = r.ReadAccountStorage(accountKey, int256(s), false /* trace */); err != nil { - t.Fatal(err) - } + v := r.ReadAccountStorage(accountKey, int256(s), false /* trace */) if s < 50 { if v != nil { t.Errorf("wrong storage value after block %d, expected nil, got %s", blockNum, v) @@ -319,14 +215,11 @@ func TestRecreateAccountWithStorage(t *testing.T) { } } } - tx.Rollback() } } func TestChangeCode(t *testing.T) { tmpDir := t.TempDir() - db := memdb.New() - defer db.Close() a, err := NewAggregator(tmpDir, 16, 4) if err != nil { t.Fatal(err) @@ -335,38 +228,18 @@ func TestChangeCode(t *testing.T) { accountKey := int160(1) var account1 = accountWithBalance(1) var code1 = []byte("This is the code number 1") - //var code2 = []byte("This is the code number 2") - var rwTx kv.RwTx - defer func() { - rwTx.Rollback() - }() - var tx kv.Tx - defer func() { - if tx != nil { - tx.Rollback() - } - }() w := a.MakeStateWriter(true /* beforeOn */) defer w.Close() for blockNum := uint64(0); blockNum < 100; blockNum++ { - if rwTx, err = db.BeginRw(context.Background()); err != nil { - t.Fatal(err) - } - if err = w.Reset(rwTx, blockNum); err != nil { + if err = w.Reset(blockNum); err != nil { t.Fatal(err) } switch blockNum { case 1: - if err = w.UpdateAccountData(accountKey, account1, false /* trace */); err != nil { - t.Fatal(err) - } - if err = w.UpdateAccountCode(accountKey, code1, false /* trace */); err != nil { - t.Fatal(err) - } + w.UpdateAccountData(accountKey, account1, false /* trace */) + w.UpdateAccountCode(accountKey, code1, false /* trace */) case 25: - if err = w.DeleteAccount(accountKey, false /* trace */); err != nil { - t.Fatal(err) - } + w.DeleteAccount(accountKey, false /* trace */) } if err = w.FinishTx(blockNum, false /* trace */); err != nil { t.Fatal(err) @@ -374,38 +247,22 @@ func TestChangeCode(t *testing.T) { if err = w.Aggregate(false /* trace */); err != nil { t.Fatal(err) } - if err = rwTx.Commit(); err != nil { - t.Fatal(err) - } - if tx, err = db.BeginRo(context.Background()); err != nil { - t.Fatal(err) - } - r := a.MakeStateReader(tx, blockNum+1) + r := a.MakeStateReader(blockNum + 1) switch blockNum { case 22: - var acc []byte - if acc, err = r.ReadAccountData(accountKey, false /* trace */); err != nil { - t.Fatal(err) - } + acc := r.ReadAccountData(accountKey, false /* trace */) if !bytes.Equal(account1, acc) { t.Errorf("wrong account after block %d, expected %x, got %x", blockNum, account1, acc) } - var code []byte - if code, err = r.ReadAccountCode(accountKey, false /* trace */); err != nil { - t.Fatal(err) - } + code := r.ReadAccountCode(accountKey, false /* trace */) if !bytes.Equal(code1, code) { t.Errorf("wrong code after block %d, expected %x, got %x", blockNum, code1, code) } case 47: - var code []byte - if code, err = r.ReadAccountCode(accountKey, false /* trace */); err != nil { - t.Fatal(err) - } + code := r.ReadAccountCode(accountKey, false /* trace */) if code != nil { t.Errorf("wrong code after block %d, expected nil, got %x", blockNum, code) } } - tx.Rollback() } }