mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-01 00:31:21 +00:00
[erigon2] optimisations (#297)
* Reuse getter and indexReader * Properly reset Getter * Use separate getter and indexReader for merge thread * Remove DB * Fixes * Print * Fixes * Cleanup signatures Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
This commit is contained in:
parent
ec354d1615
commit
cbfd733672
@ -39,7 +39,6 @@ import (
|
||||
"github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon-lib/common/length"
|
||||
"github.com/ledgerwatch/erigon-lib/compress"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon-lib/recsplit"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"golang.org/x/crypto/sha3"
|
||||
@ -92,6 +91,10 @@ type Aggregator struct {
|
||||
mergeChannel chan struct{}
|
||||
mergeError chan error
|
||||
mergeWg sync.WaitGroup
|
||||
accountsTree *btree.BTree
|
||||
codeTree *btree.BTree
|
||||
storageTree *btree.BTree
|
||||
commTree *btree.BTree
|
||||
}
|
||||
|
||||
type ChangeFile struct {
|
||||
@ -476,8 +479,9 @@ func buildIndex(datPath, idxPath, tmpDir string, count int) (*compress.Decompres
|
||||
}
|
||||
word := make([]byte, 0, 256)
|
||||
var pos uint64
|
||||
g := d.MakeGetter()
|
||||
for {
|
||||
g := d.MakeGetter()
|
||||
g.Reset(0)
|
||||
for g.HasNext() {
|
||||
word, _ = g.Next(word[:0])
|
||||
if err = rs.AddKey(word, pos); err != nil {
|
||||
@ -506,52 +510,45 @@ func buildIndex(datPath, idxPath, tmpDir string, count int) (*compress.Decompres
|
||||
// aggregate gathers changes from the changefiles into a B-tree, and "removes" them from the database
|
||||
// This function is time-critical because it needs to be run in the same go-routine (thread) as the general
|
||||
// execution (due to read-write tx). After that, we can optimistically execute the rest in the background
|
||||
func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, tx kv.RwTx, table string) (*btree.BTree, error) {
|
||||
func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, dbTree *btree.BTree) (*btree.BTree, error) {
|
||||
if err := c.openFiles(blockTo, false /* write */); err != nil {
|
||||
return nil, fmt.Errorf("open files: %w", err)
|
||||
}
|
||||
bt := btree.New(32)
|
||||
err := c.aggregateToBtree(bt, prefixLen)
|
||||
err := c.aggregateToBtree(bt, prefixLen, true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("aggregateToBtree: %w", err)
|
||||
}
|
||||
// Clean up the DB table
|
||||
var e error
|
||||
var search AggregateItem
|
||||
bt.Ascend(func(i btree.Item) bool {
|
||||
item := i.(*AggregateItem)
|
||||
if item.count == 0 {
|
||||
return true
|
||||
}
|
||||
dbPrefix := item.k
|
||||
prevV, err := tx.GetOne(table, dbPrefix)
|
||||
if err != nil {
|
||||
e = err
|
||||
return false
|
||||
search.k = item.k
|
||||
var prevV *AggregateItem
|
||||
if prevVI := dbTree.Get(&search); prevVI != nil {
|
||||
prevV = prevVI.(*AggregateItem)
|
||||
}
|
||||
if prevV == nil {
|
||||
e = fmt.Errorf("record not found in db for %s key %x", table, dbPrefix)
|
||||
e = fmt.Errorf("record not found in db tree for key %x", item.k)
|
||||
return false
|
||||
}
|
||||
prevNum := binary.BigEndian.Uint32(prevV[:4])
|
||||
if prevNum < item.count {
|
||||
e = fmt.Errorf("record count too low for %s key %s count %d, subtracting %d", table, dbPrefix, prevNum, item.count)
|
||||
if prevV.count < item.count {
|
||||
e = fmt.Errorf("record count too low for key [%x] count %d, subtracting %d", item.k, prevV.count, item.count)
|
||||
return false
|
||||
}
|
||||
if prevNum == item.count {
|
||||
if e = tx.Delete(table, dbPrefix, nil); e != nil {
|
||||
return false
|
||||
}
|
||||
if prevV.count == item.count {
|
||||
dbTree.Delete(prevV)
|
||||
} else {
|
||||
v := common.Copy(prevV)
|
||||
binary.BigEndian.PutUint32(v[:4], prevNum-item.count)
|
||||
if e = tx.Put(table, dbPrefix, v); e != nil {
|
||||
return false
|
||||
}
|
||||
prevV.count -= item.count
|
||||
}
|
||||
return true
|
||||
})
|
||||
if e != nil {
|
||||
return nil, fmt.Errorf("clean up table %s after aggregation: %w", table, e)
|
||||
return nil, fmt.Errorf("clean up after aggregation: %w", e)
|
||||
}
|
||||
return bt, nil
|
||||
}
|
||||
@ -615,6 +612,7 @@ func (c *Changes) produceChangeSets(datPath, idxPath string) error {
|
||||
}); err != nil {
|
||||
return fmt.Errorf("produceChangeSets NewRecSplit: %w", err)
|
||||
}
|
||||
g := d.MakeGetter()
|
||||
for {
|
||||
if err = c.rewind(); err != nil {
|
||||
return fmt.Errorf("produceChangeSets rewind2: %w", err)
|
||||
@ -622,7 +620,7 @@ func (c *Changes) produceChangeSets(datPath, idxPath string) error {
|
||||
var txKey = make([]byte, 8, 60)
|
||||
var pos, prevPos uint64
|
||||
var txNum uint64
|
||||
g := d.MakeGetter()
|
||||
g.Reset(0)
|
||||
for b, txNum, e = c.prevTx(); b && e == nil; b, txNum, e = c.prevTx() {
|
||||
binary.BigEndian.PutUint64(txKey[:8], txNum)
|
||||
for key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]); b && e == nil; key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]) {
|
||||
@ -658,7 +656,7 @@ func (c *Changes) produceChangeSets(datPath, idxPath string) error {
|
||||
// and create a B-tree where each key is only represented once, with the value corresponding to the "after" value
|
||||
// of the latest change. Also, the first byte of value in the B-tree indicates whether the change has occurred from
|
||||
// non-existent (zero) value. In such cases, the fist byte is set to 1 (insertion), otherwise it is 0 (update).
|
||||
func (c *Changes) aggregateToBtree(bt *btree.BTree, prefixLen int) error {
|
||||
func (c *Changes) aggregateToBtree(bt *btree.BTree, prefixLen int, insertFlag bool) error {
|
||||
var b bool
|
||||
var e error
|
||||
var key, before, after []byte
|
||||
@ -677,11 +675,19 @@ func (c *Changes) aggregateToBtree(bt *btree.BTree, prefixLen int) error {
|
||||
ai.k = key
|
||||
i := bt.Get(&ai)
|
||||
if i == nil {
|
||||
item := &AggregateItem{k: common.Copy(key), count: 1, v: common.Copy(after)}
|
||||
var v []byte
|
||||
if len(after) > 0 {
|
||||
if insertFlag {
|
||||
v = common.Copy(after)
|
||||
} else {
|
||||
v = common.Copy(after[1:])
|
||||
}
|
||||
}
|
||||
item := &AggregateItem{k: common.Copy(key), v: v, count: 1}
|
||||
bt.ReplaceOrInsert(item)
|
||||
} else {
|
||||
item := i.(*AggregateItem)
|
||||
if len(item.v) > 0 && len(after) > 0 {
|
||||
if insertFlag && len(item.v) > 0 && len(after) > 0 {
|
||||
item.v[0] = after[0]
|
||||
}
|
||||
item.count++
|
||||
@ -745,8 +751,12 @@ type byEndBlockItem struct {
|
||||
startBlock uint64
|
||||
endBlock uint64
|
||||
decompressor *compress.Decompressor
|
||||
getter *compress.Getter // reader for the decompressor
|
||||
getterMerge *compress.Getter // reader for the decomporessor used in the background merge thread
|
||||
index *recsplit.Index
|
||||
tree *btree.BTree // Substitute for decompressor+index combination
|
||||
indexReader *recsplit.IndexReader // reader for the index
|
||||
readerMerge *recsplit.IndexReader // index reader for the background merge thread
|
||||
tree *btree.BTree // Substitute for decompressor+index combination
|
||||
}
|
||||
|
||||
func (i *byEndBlockItem) Less(than btree.Item) bool {
|
||||
@ -773,6 +783,10 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64) (
|
||||
aggError: make(chan error, 1),
|
||||
mergeChannel: make(chan struct{}, 1),
|
||||
mergeError: make(chan error, 1),
|
||||
accountsTree: btree.New(32),
|
||||
codeTree: btree.New(32),
|
||||
storageTree: btree.New(32),
|
||||
commTree: btree.New(32),
|
||||
}
|
||||
var closeStateFiles = true // It will be set to false in case of success at the end of the function
|
||||
defer func() {
|
||||
@ -939,6 +953,9 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64) (
|
||||
if err = checkOverlapWithMinStart("commitment", a.commitmentFiles, minStart); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = a.rebuildRecentState(); err != nil {
|
||||
return nil, fmt.Errorf("rebuilding recent state from change files: %w", err)
|
||||
}
|
||||
closeStateFiles = false
|
||||
a.aggWg.Add(1)
|
||||
go a.backgroundAggregation()
|
||||
@ -947,6 +964,62 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64) (
|
||||
return a, nil
|
||||
}
|
||||
|
||||
// rebuildRecentState reads change files and reconstructs the recent state
|
||||
func (a *Aggregator) rebuildRecentState() error {
|
||||
t := time.Now()
|
||||
var err error
|
||||
a.changesBtree.Descend(func(i btree.Item) bool {
|
||||
item := i.(*ChangesItem)
|
||||
var accountChanges, codeChanges, storageChanges, commChanges Changes
|
||||
accountChanges.Init("accounts", a.aggregationStep, a.diffDir, false /* beforeOn */)
|
||||
if accountChanges.openFiles(item.startBlock, false /* write */); err != nil {
|
||||
return false
|
||||
}
|
||||
if err = accountChanges.aggregateToBtree(a.accountsTree, 0, false); err != nil {
|
||||
return false
|
||||
}
|
||||
if err = accountChanges.closeFiles(); err != nil {
|
||||
return false
|
||||
}
|
||||
codeChanges.Init("code", a.aggregationStep, a.diffDir, false /* beforeOn */)
|
||||
if codeChanges.openFiles(item.startBlock, false /* write */); err != nil {
|
||||
return false
|
||||
}
|
||||
if err = codeChanges.aggregateToBtree(a.codeTree, 0, false); err != nil {
|
||||
return false
|
||||
}
|
||||
if err = codeChanges.closeFiles(); err != nil {
|
||||
return false
|
||||
}
|
||||
storageChanges.Init("storage", a.aggregationStep, a.diffDir, false /* beforeOn */)
|
||||
if storageChanges.openFiles(item.startBlock, false /* write */); err != nil {
|
||||
return false
|
||||
}
|
||||
if err = storageChanges.aggregateToBtree(a.storageTree, 0, false); err != nil {
|
||||
return false
|
||||
}
|
||||
if err = storageChanges.closeFiles(); err != nil {
|
||||
return false
|
||||
}
|
||||
commChanges.Init("commitment", a.aggregationStep, a.diffDir, false /* we do not unwind commitment */)
|
||||
if commChanges.openFiles(item.startBlock, false /* write */); err != nil {
|
||||
return false
|
||||
}
|
||||
if err = commChanges.aggregateToBtree(a.commTree, 0, false); err != nil {
|
||||
return false
|
||||
}
|
||||
if err = commChanges.closeFiles(); err != nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Info("reconstructed recent state", "in", time.Since(t))
|
||||
return nil
|
||||
}
|
||||
|
||||
type AggregationTask struct {
|
||||
accountChanges *Changes
|
||||
accountsBt *btree.BTree
|
||||
@ -1051,6 +1124,10 @@ func (a *Aggregator) backgroundAggregation() {
|
||||
a.aggError <- fmt.Errorf("createDatAndIndex accounts: %w", err)
|
||||
return
|
||||
}
|
||||
accountsItem.getter = accountsItem.decompressor.MakeGetter()
|
||||
accountsItem.getterMerge = accountsItem.decompressor.MakeGetter()
|
||||
accountsItem.indexReader = recsplit.NewIndexReader(accountsItem.index)
|
||||
accountsItem.readerMerge = recsplit.NewIndexReader(accountsItem.index)
|
||||
if err = aggTask.accountChanges.deleteFiles(); err != nil {
|
||||
a.aggError <- fmt.Errorf("delete accountChanges: %w", err)
|
||||
return
|
||||
@ -1073,6 +1150,10 @@ func (a *Aggregator) backgroundAggregation() {
|
||||
a.aggError <- fmt.Errorf("createDatAndIndex code: %w", err)
|
||||
return
|
||||
}
|
||||
codeItem.getter = codeItem.decompressor.MakeGetter()
|
||||
codeItem.getterMerge = codeItem.decompressor.MakeGetter()
|
||||
codeItem.indexReader = recsplit.NewIndexReader(codeItem.index)
|
||||
codeItem.readerMerge = recsplit.NewIndexReader(codeItem.index)
|
||||
if err = aggTask.codeChanges.deleteFiles(); err != nil {
|
||||
a.aggError <- fmt.Errorf("delete codeChanges: %w", err)
|
||||
return
|
||||
@ -1095,6 +1176,10 @@ func (a *Aggregator) backgroundAggregation() {
|
||||
a.aggError <- fmt.Errorf("createDatAndIndex storage: %w", err)
|
||||
return
|
||||
}
|
||||
storageItem.getter = storageItem.decompressor.MakeGetter()
|
||||
storageItem.getterMerge = storageItem.decompressor.MakeGetter()
|
||||
storageItem.indexReader = recsplit.NewIndexReader(storageItem.index)
|
||||
storageItem.readerMerge = recsplit.NewIndexReader(storageItem.index)
|
||||
if err = aggTask.storageChanges.deleteFiles(); err != nil {
|
||||
a.aggError <- fmt.Errorf("delete storageChanges: %w", err)
|
||||
return
|
||||
@ -1109,6 +1194,10 @@ func (a *Aggregator) backgroundAggregation() {
|
||||
a.aggError <- fmt.Errorf("createDatAndIndex commitment: %w", err)
|
||||
return
|
||||
}
|
||||
commitmentItem.getter = commitmentItem.decompressor.MakeGetter()
|
||||
commitmentItem.getterMerge = commitmentItem.decompressor.MakeGetter()
|
||||
commitmentItem.indexReader = recsplit.NewIndexReader(commitmentItem.index)
|
||||
commitmentItem.readerMerge = recsplit.NewIndexReader(commitmentItem.index)
|
||||
if err = aggTask.commChanges.deleteFiles(); err != nil {
|
||||
a.aggError <- fmt.Errorf("delete commChanges: %w", err)
|
||||
return
|
||||
@ -1186,7 +1275,7 @@ func (cvt *CommitmentValTransform) commitmentValTransform(val []byte, transValBu
|
||||
// Optimised key referencing a state file record (file number and offset within the file)
|
||||
fileI := int(apk[0])
|
||||
offset := decodeU64(apk[1:])
|
||||
g := cvt.preAccounts[fileI].decompressor.MakeGetter() // TODO Cache in the reader
|
||||
g := cvt.preAccounts[fileI].getterMerge
|
||||
g.Reset(offset)
|
||||
apkBuf, _ = g.Next(apkBuf[:0])
|
||||
}
|
||||
@ -1196,9 +1285,8 @@ func (cvt *CommitmentValTransform) commitmentValTransform(val []byte, transValBu
|
||||
if item.index.Empty() {
|
||||
continue
|
||||
}
|
||||
reader := recsplit.NewIndexReader(item.index)
|
||||
offset := reader.Lookup(apkBuf)
|
||||
g := item.decompressor.MakeGetter() // TODO Cache in the reader
|
||||
offset := item.readerMerge.Lookup(apkBuf)
|
||||
g := item.getterMerge
|
||||
g.Reset(offset)
|
||||
if g.HasNext() {
|
||||
if keyMatch, _ := g.Match(apkBuf); keyMatch {
|
||||
@ -1217,7 +1305,7 @@ func (cvt *CommitmentValTransform) commitmentValTransform(val []byte, transValBu
|
||||
// Optimised key referencing a state file record (file number and offset within the file)
|
||||
fileI := int(spk[0])
|
||||
offset := decodeU64(spk[1:])
|
||||
g := cvt.preStorage[fileI].decompressor.MakeGetter() // TODO Cache in the reader
|
||||
g := cvt.preStorage[fileI].getterMerge
|
||||
g.Reset(offset)
|
||||
spkBuf, _ = g.Next(spkBuf[:0])
|
||||
}
|
||||
@ -1227,9 +1315,8 @@ func (cvt *CommitmentValTransform) commitmentValTransform(val []byte, transValBu
|
||||
if item.index.Empty() {
|
||||
continue
|
||||
}
|
||||
reader := recsplit.NewIndexReader(item.index)
|
||||
offset := reader.Lookup(spkBuf)
|
||||
g := item.decompressor.MakeGetter() // TODO Cache in the reader
|
||||
offset := item.readerMerge.Lookup(spkBuf)
|
||||
g := item.getterMerge
|
||||
g.Reset(offset)
|
||||
if g.HasNext() {
|
||||
if keyMatch, _ := g.Match(spkBuf); keyMatch {
|
||||
@ -1375,6 +1462,10 @@ func openFiles(treeName string, diffDir string, tree *btree.BTree) error {
|
||||
if item.index, err = recsplit.OpenIndex(path.Join(diffDir, fmt.Sprintf("%s.%d-%d.idx", treeName, item.startBlock, item.endBlock))); err != nil {
|
||||
return false
|
||||
}
|
||||
item.getter = item.decompressor.MakeGetter()
|
||||
item.getterMerge = item.decompressor.MakeGetter()
|
||||
item.indexReader = recsplit.NewIndexReader(item.index)
|
||||
item.readerMerge = recsplit.NewIndexReader(item.index)
|
||||
return true
|
||||
})
|
||||
return err
|
||||
@ -1440,9 +1531,8 @@ func readFromFiles(treeName string, tree **btree.BTree, lock sync.Locker, blockN
|
||||
if item.index.Empty() {
|
||||
return true
|
||||
}
|
||||
reader := recsplit.NewIndexReader(item.index)
|
||||
offset := reader.Lookup(filekey)
|
||||
g := item.decompressor.MakeGetter() // TODO Cache in the reader
|
||||
offset := item.indexReader.Lookup(filekey)
|
||||
g := item.getter
|
||||
g.Reset(offset)
|
||||
if g.HasNext() {
|
||||
if keyMatch, _ := g.Match(filekey); keyMatch {
|
||||
@ -1471,7 +1561,7 @@ func readByOffset(treeName string, tree **btree.BTree, fileI int, offset uint64)
|
||||
return true
|
||||
}
|
||||
item := i.(*byEndBlockItem)
|
||||
g := item.decompressor.MakeGetter() // TODO Cache in the reader
|
||||
g := item.getter
|
||||
g.Reset(offset)
|
||||
key, _ = g.Next(nil)
|
||||
val, _ = g.Next(nil)
|
||||
@ -1483,10 +1573,9 @@ func readByOffset(treeName string, tree **btree.BTree, fileI int, offset uint64)
|
||||
return key, nil
|
||||
}
|
||||
|
||||
func (a *Aggregator) MakeStateReader(tx kv.Getter, blockNum uint64) *Reader {
|
||||
func (a *Aggregator) MakeStateReader(blockNum uint64) *Reader {
|
||||
r := &Reader{
|
||||
a: a,
|
||||
tx: tx,
|
||||
blockNum: blockNum,
|
||||
}
|
||||
return r
|
||||
@ -1494,76 +1583,60 @@ func (a *Aggregator) MakeStateReader(tx kv.Getter, blockNum uint64) *Reader {
|
||||
|
||||
type Reader struct {
|
||||
a *Aggregator
|
||||
tx kv.Getter
|
||||
search AggregateItem
|
||||
blockNum uint64
|
||||
}
|
||||
|
||||
func (r *Reader) ReadAccountData(addr []byte, trace bool) ([]byte, error) {
|
||||
func (r *Reader) ReadAccountData(addr []byte, trace bool) []byte {
|
||||
// Look in the summary table first
|
||||
v, err := r.tx.GetOne(kv.StateAccounts, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
r.search.k = addr
|
||||
if vi := r.a.accountsTree.Get(&r.search); vi != nil {
|
||||
return vi.(*AggregateItem).v
|
||||
}
|
||||
if v != nil {
|
||||
// First 4 bytes is the number of 1-block state diffs containing the key
|
||||
if trace {
|
||||
fmt.Printf("ReadAccountData %x, found in DB: %x, number of diffs: %d\n", addr, v[4:], binary.BigEndian.Uint32(v[:4]))
|
||||
}
|
||||
return v[4:], nil
|
||||
}
|
||||
// Look in the files
|
||||
val := readFromFiles("accounts", &r.a.accountsFiles, r.a.accountsFilesLock.RLocker(), r.blockNum, addr, trace)
|
||||
return val, nil
|
||||
return readFromFiles("accounts", &r.a.accountsFiles, r.a.accountsFilesLock.RLocker(), r.blockNum, addr, trace)
|
||||
}
|
||||
|
||||
func (r *Reader) ReadAccountStorage(addr []byte, loc []byte, trace bool) (*uint256.Int, error) {
|
||||
func (r *Reader) ReadAccountStorage(addr []byte, loc []byte, trace bool) *uint256.Int {
|
||||
// Look in the summary table first
|
||||
dbkey := make([]byte, len(addr)+len(loc))
|
||||
copy(dbkey[0:], addr)
|
||||
copy(dbkey[len(addr):], loc)
|
||||
v, err := r.tx.GetOne(kv.StateStorage, dbkey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
r.search.k = dbkey
|
||||
var v []byte
|
||||
if vi := r.a.storageTree.Get(&r.search); vi != nil {
|
||||
v = vi.(*AggregateItem).v
|
||||
} else {
|
||||
v = readFromFiles("storage", &r.a.storageFiles, r.a.storageFilesLock.RLocker(), r.blockNum, dbkey, trace)
|
||||
}
|
||||
if v != nil {
|
||||
if trace {
|
||||
fmt.Printf("ReadAccountStorage %x %x, found in DB: %x, number of diffs: %d\n", addr, loc, v[4:], binary.BigEndian.Uint32(v[:4]))
|
||||
}
|
||||
if len(v) == 4 {
|
||||
return nil, nil
|
||||
}
|
||||
// First 4 bytes is the number of 1-block state diffs containing the key
|
||||
return new(uint256.Int).SetBytes(v[4:]), nil
|
||||
return new(uint256.Int).SetBytes(v)
|
||||
}
|
||||
// Look in the files
|
||||
val := readFromFiles("storage", &r.a.storageFiles, r.a.storageFilesLock.RLocker(), r.blockNum, dbkey, trace)
|
||||
if val != nil {
|
||||
return new(uint256.Int).SetBytes(val), nil
|
||||
}
|
||||
return nil, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reader) ReadAccountCode(addr []byte, trace bool) ([]byte, error) {
|
||||
func (r *Reader) ReadAccountCode(addr []byte, trace bool) []byte {
|
||||
// Look in the summary table first
|
||||
v, err := r.tx.GetOne(kv.StateCode, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if v != nil {
|
||||
// First 4 bytes is the number of 1-block state diffs containing the key
|
||||
if trace {
|
||||
fmt.Printf("ReadAccountCode %x, found in DB: %x, number of diffs: %d\n", addr, v[4:], binary.BigEndian.Uint32(v[:4]))
|
||||
}
|
||||
return v[4:], nil
|
||||
r.search.k = addr
|
||||
if vi := r.a.codeTree.Get(&r.search); vi != nil {
|
||||
return vi.(*AggregateItem).v
|
||||
}
|
||||
// Look in the files
|
||||
val := readFromFiles("code", &r.a.codeFiles, r.a.codeFilesLock.RLocker(), r.blockNum, addr, trace)
|
||||
return val, nil
|
||||
return readFromFiles("code", &r.a.codeFiles, r.a.codeFilesLock.RLocker(), r.blockNum, addr, trace)
|
||||
}
|
||||
|
||||
func (r *Reader) ReadAccountCodeSize(addr []byte, trace bool) int {
|
||||
// Look in the summary table first
|
||||
r.search.k = addr
|
||||
if vi := r.a.codeTree.Get(&r.search); vi != nil {
|
||||
return len(vi.(*AggregateItem).v)
|
||||
}
|
||||
// Look in the files. TODO - use specialised function to only lookup size
|
||||
return len(readFromFiles("code", &r.a.codeFiles, r.a.codeFilesLock.RLocker(), r.blockNum, addr, trace))
|
||||
}
|
||||
|
||||
type Writer struct {
|
||||
a *Aggregator
|
||||
tx kv.RwTx
|
||||
search AggregateItem // Aggregate item used to search in trees
|
||||
blockNum uint64
|
||||
changeFileNum uint64 // Block number associated with the current change files. It is the last block number whose changes will go into that file
|
||||
accountChanges Changes // Change files for accounts
|
||||
@ -1592,8 +1665,7 @@ func (w *Writer) Close() {
|
||||
w.commChanges.closeFiles()
|
||||
}
|
||||
|
||||
func (w *Writer) Reset(tx kv.RwTx, blockNum uint64) error {
|
||||
w.tx = tx
|
||||
func (w *Writer) Reset(blockNum uint64) error {
|
||||
w.blockNum = blockNum
|
||||
if blockNum > w.changeFileNum {
|
||||
if err := w.accountChanges.closeFiles(); err != nil {
|
||||
@ -1656,18 +1728,14 @@ func (w *Writer) unlockFn() {
|
||||
|
||||
func (w *Writer) branchFn(prefix []byte) ([]byte, error) {
|
||||
// Look in the summary table first
|
||||
dbPrefix := prefix
|
||||
v, err := w.tx.GetOne(kv.StateCommitment, dbPrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if v != nil {
|
||||
// First 4 bytes is the number of 1-block state diffs containing the key
|
||||
return v[4:], nil
|
||||
w.search.k = prefix
|
||||
var v []byte
|
||||
if vi := w.a.commTree.Get(&w.search); vi != nil {
|
||||
return vi.(*AggregateItem).v, nil
|
||||
}
|
||||
// Look in the files
|
||||
val := readFromFiles("commitment", &w.a.commitmentFiles, nil /* lock */, w.blockNum, prefix, false /* trace */)
|
||||
return val, nil
|
||||
v = readFromFiles("commitment", &w.a.commitmentFiles, nil /* lock */, w.blockNum, prefix, false /* trace */)
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func bytesToUint64(buf []byte) (x uint64) {
|
||||
@ -1682,20 +1750,15 @@ func bytesToUint64(buf []byte) (x uint64) {
|
||||
|
||||
func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) ([]byte, error) {
|
||||
var enc []byte
|
||||
var v []byte
|
||||
var err error
|
||||
if len(plainKey) != length.Addr { // Accessing account key and value via "thin reference" to the state file and offset
|
||||
fileI := int(plainKey[0])
|
||||
offset := decodeU64(plainKey[1:])
|
||||
plainKey, enc = readByOffset("accounts", &w.a.accountsFiles, fileI, offset)
|
||||
} else { // Full account key is provided, search as usual
|
||||
// Look in the summary table first
|
||||
if v, err = w.tx.GetOne(kv.StateAccounts, plainKey); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if v != nil {
|
||||
// First 4 bytes is the number of 1-block state diffs containing the key
|
||||
enc = v[4:]
|
||||
w.search.k = plainKey
|
||||
if encI := w.a.accountsTree.Get(&w.search); encI != nil {
|
||||
enc = encI.(*AggregateItem).v
|
||||
} else {
|
||||
// Look in the files
|
||||
enc = readFromFiles("accounts", &w.a.accountsFiles, nil /* lock */, w.blockNum, plainKey, false /* trace */)
|
||||
@ -1719,13 +1782,9 @@ func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) ([]byte, erro
|
||||
cell.Balance.SetBytes(enc[pos : pos+balanceBytes])
|
||||
}
|
||||
}
|
||||
v, err = w.tx.GetOne(kv.StateCode, plainKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if v != nil {
|
||||
// First 4 bytes is the number of 1-block state diffs containing the key
|
||||
enc = v[4:]
|
||||
w.search.k = plainKey
|
||||
if encI := w.a.codeTree.Get(&w.search); encI != nil {
|
||||
enc = encI.(*AggregateItem).v
|
||||
} else {
|
||||
// Look in the files
|
||||
enc = readFromFiles("code", &w.a.codeFiles, nil /* lock */, w.blockNum, plainKey, false /* trace */)
|
||||
@ -1740,20 +1799,15 @@ func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) ([]byte, erro
|
||||
|
||||
func (w *Writer) storageFn(plainKey []byte, cell *commitment.Cell) ([]byte, error) {
|
||||
var enc []byte
|
||||
var v []byte
|
||||
var err error
|
||||
if len(plainKey) != length.Addr+length.Hash { // Accessing storage key and value via "thin reference" to the state file and offset
|
||||
fileI := int(plainKey[0])
|
||||
offset := decodeU64(plainKey[1:])
|
||||
plainKey, enc = readByOffset("storage", &w.a.storageFiles, fileI, offset)
|
||||
} else { // Full storage key is provided, search as usual
|
||||
// Look in the summary table first
|
||||
if v, err = w.tx.GetOne(kv.StateStorage, plainKey); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if v != nil {
|
||||
// First 4 bytes is the number of 1-block state diffs containing the key
|
||||
enc = v[4:]
|
||||
w.search.k = plainKey
|
||||
if encI := w.a.storageTree.Get(&w.search); encI != nil {
|
||||
enc = encI.(*AggregateItem).v
|
||||
} else {
|
||||
// Look in the files
|
||||
enc = readFromFiles("storage", &w.a.storageFiles, nil /* lock */, w.blockNum, plainKey, false /* trace */)
|
||||
@ -1927,33 +1981,30 @@ func (w *Writer) computeCommitment(trace bool) ([]byte, error) {
|
||||
}
|
||||
for prefixStr, branchNodeUpdate := range branchNodeUpdates {
|
||||
prefix := []byte(prefixStr)
|
||||
dbPrefix := prefix
|
||||
prevV, err := w.tx.GetOne(kv.StateCommitment, dbPrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
w.search.k = prefix
|
||||
var prevV *AggregateItem
|
||||
if prevVI := w.a.commTree.Get(&w.search); prevVI != nil {
|
||||
prevV = prevVI.(*AggregateItem)
|
||||
}
|
||||
var prevNum uint32
|
||||
|
||||
var original []byte
|
||||
if prevV == nil {
|
||||
original = readFromFiles("commitment", &w.a.commitmentFiles, w.a.commFilesLock.RLocker(), w.blockNum, prefix, false)
|
||||
} else {
|
||||
prevNum = binary.BigEndian.Uint32(prevV[:4])
|
||||
original = prevV.v
|
||||
}
|
||||
v := make([]byte, 4+len(branchNodeUpdate))
|
||||
binary.BigEndian.PutUint32(v[:4], prevNum+1)
|
||||
copy(v[4:], branchNodeUpdate)
|
||||
if err = w.tx.Put(kv.StateCommitment, dbPrefix, v); err != nil {
|
||||
return nil, err
|
||||
if prevV == nil {
|
||||
w.a.commTree.ReplaceOrInsert(&AggregateItem{k: prefix, v: branchNodeUpdate, count: 1})
|
||||
} else {
|
||||
prevV.v = branchNodeUpdate
|
||||
prevV.count++
|
||||
}
|
||||
if len(branchNodeUpdate) == 0 {
|
||||
w.commChanges.delete(prefix, original)
|
||||
} else {
|
||||
if prevV == nil && original == nil {
|
||||
if prevV == nil && len(original) == 0 {
|
||||
w.commChanges.insert(prefix, branchNodeUpdate)
|
||||
} else {
|
||||
if original == nil {
|
||||
original = prevV[4:]
|
||||
}
|
||||
w.commChanges.update(prefix, original, branchNodeUpdate)
|
||||
}
|
||||
}
|
||||
@ -2008,28 +2059,27 @@ func (w *Writer) Aggregate(trace bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) UpdateAccountData(addr []byte, account []byte, trace bool) error {
|
||||
prevV, err := w.tx.GetOne(kv.StateAccounts, addr)
|
||||
if err != nil {
|
||||
return err
|
||||
func (w *Writer) UpdateAccountData(addr []byte, account []byte, trace bool) {
|
||||
var prevV *AggregateItem
|
||||
w.search.k = addr
|
||||
if prevVI := w.a.accountsTree.Get(&w.search); prevVI != nil {
|
||||
prevV = prevVI.(*AggregateItem)
|
||||
}
|
||||
var prevNum uint32
|
||||
var original []byte
|
||||
if prevV == nil {
|
||||
original = readFromFiles("accounts", &w.a.accountsFiles, w.a.accountsFilesLock.RLocker(), w.blockNum, addr, trace)
|
||||
} else {
|
||||
prevNum = binary.BigEndian.Uint32(prevV[:4])
|
||||
original = prevV[4:]
|
||||
original = prevV.v
|
||||
}
|
||||
if bytes.Equal(account, original) {
|
||||
// No change
|
||||
return nil
|
||||
return
|
||||
}
|
||||
v := make([]byte, 4+len(account))
|
||||
binary.BigEndian.PutUint32(v[:4], prevNum+1)
|
||||
copy(v[4:], account)
|
||||
if err = w.tx.Put(kv.StateAccounts, addr, v); err != nil {
|
||||
return err
|
||||
if prevV == nil {
|
||||
w.a.accountsTree.ReplaceOrInsert(&AggregateItem{k: addr, v: account, count: 1})
|
||||
} else {
|
||||
prevV.v = account
|
||||
prevV.count++
|
||||
}
|
||||
if prevV == nil && len(original) == 0 {
|
||||
w.accountChanges.insert(addr, account)
|
||||
@ -2040,47 +2090,41 @@ func (w *Writer) UpdateAccountData(addr []byte, account []byte, trace bool) erro
|
||||
w.a.trace = true
|
||||
w.a.tracedKeys[string(addr)] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) UpdateAccountCode(addr []byte, code []byte, trace bool) error {
|
||||
prevV, err := w.tx.GetOne(kv.StateCode, addr)
|
||||
if err != nil {
|
||||
return err
|
||||
func (w *Writer) UpdateAccountCode(addr []byte, code []byte, trace bool) {
|
||||
var prevV *AggregateItem
|
||||
w.search.k = addr
|
||||
if prevVI := w.a.codeTree.Get(&w.search); prevVI != nil {
|
||||
prevV = prevVI.(*AggregateItem)
|
||||
}
|
||||
var prevNum uint32
|
||||
var original []byte
|
||||
if prevV == nil {
|
||||
original = readFromFiles("code", &w.a.codeFiles, w.a.codeFilesLock.RLocker(), w.blockNum, addr, trace)
|
||||
} else {
|
||||
prevNum = binary.BigEndian.Uint32(prevV[:4])
|
||||
original = prevV.v
|
||||
}
|
||||
v := make([]byte, 4+len(code))
|
||||
binary.BigEndian.PutUint32(v[:4], prevNum+1)
|
||||
copy(v[4:], code)
|
||||
if err = w.tx.Put(kv.StateCode, addr, v); err != nil {
|
||||
return err
|
||||
if prevV == nil {
|
||||
w.a.codeTree.ReplaceOrInsert(&AggregateItem{k: addr, v: code, count: 1})
|
||||
} else {
|
||||
prevV.v = code
|
||||
prevV.count++
|
||||
}
|
||||
if prevV == nil && original == nil {
|
||||
if prevV == nil && len(original) == 0 {
|
||||
w.codeChanges.insert(addr, code)
|
||||
} else {
|
||||
if original == nil {
|
||||
original = prevV[4:]
|
||||
}
|
||||
w.codeChanges.update(addr, original, code)
|
||||
}
|
||||
if trace {
|
||||
w.a.trace = true
|
||||
w.a.tracedKeys[string(addr)] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type CursorType uint8
|
||||
|
||||
const (
|
||||
FILE_CURSOR CursorType = iota
|
||||
DB_CURSOR
|
||||
TREE_CURSOR
|
||||
)
|
||||
|
||||
@ -2091,7 +2135,6 @@ type CursorItem struct {
|
||||
endBlock uint64
|
||||
key, val []byte
|
||||
dg *compress.Getter
|
||||
c kv.Cursor
|
||||
tree *btree.BTree
|
||||
}
|
||||
|
||||
@ -2126,79 +2169,78 @@ func (ch *CursorHeap) Pop() interface{} {
|
||||
return x
|
||||
}
|
||||
|
||||
func (w *Writer) deleteAccount(addr []byte, trace bool) (bool, error) {
|
||||
prevV, err := w.tx.GetOne(kv.StateAccounts, addr)
|
||||
if err != nil {
|
||||
return false, err
|
||||
func (w *Writer) deleteAccount(addr []byte, trace bool) bool {
|
||||
var prevV *AggregateItem
|
||||
w.search.k = addr
|
||||
if prevVI := w.a.accountsTree.Get(&w.search); prevVI != nil {
|
||||
prevV = prevVI.(*AggregateItem)
|
||||
}
|
||||
var prevNum uint32
|
||||
var original []byte
|
||||
if prevV == nil {
|
||||
original = readFromFiles("accounts", &w.a.accountsFiles, w.a.accountsFilesLock.RLocker(), w.blockNum, addr, trace)
|
||||
if original == nil {
|
||||
return false, nil
|
||||
return false
|
||||
}
|
||||
} else {
|
||||
prevNum = binary.BigEndian.Uint32(prevV[:4])
|
||||
original = prevV[4:]
|
||||
original = prevV.v
|
||||
}
|
||||
v := make([]byte, 4)
|
||||
binary.BigEndian.PutUint32(v[:4], prevNum+1)
|
||||
if err = w.tx.Put(kv.StateAccounts, addr, v); err != nil {
|
||||
return false, err
|
||||
if prevV == nil {
|
||||
w.a.accountsTree.ReplaceOrInsert(&AggregateItem{k: addr, v: nil, count: 1})
|
||||
} else {
|
||||
prevV.v = nil
|
||||
prevV.count++
|
||||
}
|
||||
w.accountChanges.delete(addr, original)
|
||||
return true, nil
|
||||
return true
|
||||
}
|
||||
|
||||
func (w *Writer) deleteCode(addr []byte, trace bool) error {
|
||||
prevV, err := w.tx.GetOne(kv.StateCode, addr)
|
||||
if err != nil {
|
||||
return err
|
||||
func (w *Writer) deleteCode(addr []byte, trace bool) {
|
||||
var prevV *AggregateItem
|
||||
w.search.k = addr
|
||||
if prevVI := w.a.codeTree.Get(&w.search); prevVI != nil {
|
||||
prevV = prevVI.(*AggregateItem)
|
||||
}
|
||||
var prevNum uint32
|
||||
var original []byte
|
||||
if prevV == nil {
|
||||
original = readFromFiles("code", &w.a.codeFiles, w.a.codeFilesLock.RLocker(), w.blockNum, addr, trace)
|
||||
if original == nil {
|
||||
// Nothing to do
|
||||
return nil
|
||||
return
|
||||
}
|
||||
} else {
|
||||
prevNum = binary.BigEndian.Uint32(prevV[:4])
|
||||
original = prevV[4:]
|
||||
original = prevV.v
|
||||
}
|
||||
v := make([]byte, 4)
|
||||
binary.BigEndian.PutUint32(v[:4], prevNum+1)
|
||||
if err = w.tx.Put(kv.StateCode, addr, v); err != nil {
|
||||
return err
|
||||
if prevV == nil {
|
||||
w.a.codeTree.ReplaceOrInsert(&AggregateItem{k: addr, v: nil, count: 1})
|
||||
} else {
|
||||
prevV.v = nil
|
||||
prevV.count++
|
||||
}
|
||||
w.codeChanges.delete(addr, original)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) DeleteAccount(addr []byte, trace bool) error {
|
||||
if deleted, err := w.deleteAccount(addr, trace); err != nil {
|
||||
return err
|
||||
} else if !deleted {
|
||||
return nil
|
||||
}
|
||||
if err := w.deleteCode(addr, trace); err != nil {
|
||||
return err
|
||||
func (w *Writer) DeleteAccount(addr []byte, trace bool) {
|
||||
if deleted := w.deleteAccount(addr, trace); !deleted {
|
||||
return
|
||||
}
|
||||
w.deleteCode(addr, trace)
|
||||
// Find all storage items for this address
|
||||
var cp CursorHeap
|
||||
heap.Init(&cp)
|
||||
c, err := w.tx.Cursor(kv.StateStorage)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.search.k = addr
|
||||
found := false
|
||||
var k, v []byte
|
||||
if k, v, err = c.Seek(addr); err != nil {
|
||||
return err
|
||||
}
|
||||
if k != nil && bytes.HasPrefix(k, addr) {
|
||||
heap.Push(&cp, &CursorItem{t: DB_CURSOR, key: common.Copy(k), val: common.Copy(v), c: c, endBlock: w.blockNum})
|
||||
w.a.storageTree.AscendGreaterOrEqual(&w.search, func(i btree.Item) bool {
|
||||
item := i.(*AggregateItem)
|
||||
if bytes.HasPrefix(item.k, addr) {
|
||||
found = true
|
||||
k = item.k
|
||||
v = item.v
|
||||
}
|
||||
return false
|
||||
})
|
||||
if found {
|
||||
heap.Push(&cp, &CursorItem{t: TREE_CURSOR, key: common.Copy(k), val: common.Copy(v), tree: w.a.storageTree, endBlock: w.blockNum})
|
||||
}
|
||||
w.a.storageFiles.Ascend(func(i btree.Item) bool {
|
||||
item := i.(*byEndBlockItem)
|
||||
@ -2219,9 +2261,8 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) error {
|
||||
if item.index.Empty() {
|
||||
return true
|
||||
}
|
||||
reader := recsplit.NewIndexReader(item.index)
|
||||
offset := reader.Lookup(addr)
|
||||
g := item.decompressor.MakeGetter() // TODO Cache in the reader
|
||||
offset := item.indexReader.Lookup(addr)
|
||||
g := item.getter
|
||||
g.Reset(offset)
|
||||
if g.HasNext() {
|
||||
if keyMatch, _ := g.Match(addr); !keyMatch {
|
||||
@ -2257,18 +2298,6 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) error {
|
||||
} else {
|
||||
heap.Pop(&cp)
|
||||
}
|
||||
case DB_CURSOR:
|
||||
k, v, err = ci1.c.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if k != nil && bytes.HasPrefix(k, addr) {
|
||||
ci1.key = common.Copy(k)
|
||||
ci1.val = common.Copy(v)
|
||||
heap.Fix(&cp, 0)
|
||||
} else {
|
||||
heap.Pop(&cp)
|
||||
}
|
||||
case TREE_CURSOR:
|
||||
skip := true
|
||||
var aitem *AggregateItem
|
||||
@ -2289,19 +2318,16 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
var prevV []byte
|
||||
prevV, err = w.tx.GetOne(kv.StateStorage, lastKey)
|
||||
if err != nil {
|
||||
return err
|
||||
var prevV *AggregateItem
|
||||
w.search.k = lastKey
|
||||
if prevVI := w.a.storageTree.Get(&w.search); prevVI != nil {
|
||||
prevV = prevVI.(*AggregateItem)
|
||||
}
|
||||
var prevNum uint32
|
||||
if prevV != nil {
|
||||
prevNum = binary.BigEndian.Uint32(prevV[:4])
|
||||
}
|
||||
v = make([]byte, 4)
|
||||
binary.BigEndian.PutUint32(v[:4], prevNum+1)
|
||||
if err = w.tx.Put(kv.StateStorage, lastKey, v); err != nil {
|
||||
return err
|
||||
if prevV == nil {
|
||||
w.a.storageTree.ReplaceOrInsert(&AggregateItem{k: lastKey, v: nil, count: 1})
|
||||
} else {
|
||||
prevV.v = nil
|
||||
prevV.count++
|
||||
}
|
||||
w.storageChanges.delete(lastKey, lastVal)
|
||||
}
|
||||
@ -2309,46 +2335,45 @@ func (w *Writer) DeleteAccount(addr []byte, trace bool) error {
|
||||
w.a.trace = true
|
||||
w.a.tracedKeys[string(addr)] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) WriteAccountStorage(addr []byte, loc []byte, value *uint256.Int, trace bool) error {
|
||||
func (w *Writer) WriteAccountStorage(addr []byte, loc []byte, value *uint256.Int, trace bool) {
|
||||
dbkey := make([]byte, len(addr)+len(loc))
|
||||
copy(dbkey[0:], addr)
|
||||
copy(dbkey[len(addr):], loc)
|
||||
prevV, err := w.tx.GetOne(kv.StateStorage, dbkey)
|
||||
if err != nil {
|
||||
return err
|
||||
w.search.k = dbkey
|
||||
var prevV *AggregateItem
|
||||
if prevVI := w.a.storageTree.Get(&w.search); prevVI != nil {
|
||||
prevV = prevVI.(*AggregateItem)
|
||||
}
|
||||
var prevNum uint32
|
||||
var original []byte
|
||||
if prevV == nil {
|
||||
original = readFromFiles("storage", &w.a.storageFiles, w.a.storageFilesLock.RLocker(), w.blockNum, dbkey, trace)
|
||||
} else {
|
||||
prevNum = binary.BigEndian.Uint32(prevV[:4])
|
||||
original = prevV[4:]
|
||||
original = prevV.v
|
||||
}
|
||||
vLen := value.ByteLen()
|
||||
v := make([]byte, 4+vLen)
|
||||
binary.BigEndian.PutUint32(v[:4], prevNum+1)
|
||||
value.WriteToSlice(v[4:])
|
||||
if bytes.Equal(v[4:], original) {
|
||||
v := make([]byte, vLen)
|
||||
value.WriteToSlice(v)
|
||||
if bytes.Equal(v, original) {
|
||||
// No change
|
||||
return nil
|
||||
return
|
||||
}
|
||||
if err = w.tx.Put(kv.StateStorage, dbkey, v); err != nil {
|
||||
return err
|
||||
if prevV == nil {
|
||||
w.a.storageTree.ReplaceOrInsert(&AggregateItem{k: dbkey, v: v, count: 1})
|
||||
} else {
|
||||
prevV.v = v
|
||||
prevV.count++
|
||||
}
|
||||
if prevV == nil && len(original) == 0 {
|
||||
w.storageChanges.insert(dbkey, v[4:])
|
||||
w.storageChanges.insert(dbkey, v)
|
||||
} else {
|
||||
w.storageChanges.update(dbkey, original, v[4:])
|
||||
w.storageChanges.update(dbkey, original, v)
|
||||
}
|
||||
if trace {
|
||||
w.a.trace = true
|
||||
w.a.tracedKeys[string(dbkey)] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func findLargestMerge(tree **btree.BTree, lock sync.Locker, maxTo uint64) (toAggregate []*byEndBlockItem, pre []*byEndBlockItem, post []*byEndBlockItem, aggFrom uint64, aggTo uint64) {
|
||||
@ -2394,6 +2419,7 @@ func (a *Aggregator) computeAggregation(treeName string, toAggregate []*byEndBlo
|
||||
heap.Init(&cp)
|
||||
for _, ag := range toAggregate {
|
||||
g := ag.decompressor.MakeGetter()
|
||||
g.Reset(0)
|
||||
if g.HasNext() {
|
||||
key, _ := g.Next(nil)
|
||||
val, _ := g.Next(nil)
|
||||
@ -2404,6 +2430,10 @@ func (a *Aggregator) computeAggregation(treeName string, toAggregate []*byEndBlo
|
||||
if item2.decompressor, item2.index, err = a.mergeIntoStateFile(&cp, 0, treeName, aggFrom, aggTo, a.diffDir, valTransform); err != nil {
|
||||
return nil, fmt.Errorf("mergeIntoStateFile %s [%d-%d]: %w", treeName, aggFrom, aggTo, err)
|
||||
}
|
||||
item2.getter = item2.decompressor.MakeGetter()
|
||||
item2.getterMerge = item2.decompressor.MakeGetter()
|
||||
item2.indexReader = recsplit.NewIndexReader(item2.index)
|
||||
item2.readerMerge = recsplit.NewIndexReader(item2.index)
|
||||
return item2, nil
|
||||
}
|
||||
|
||||
@ -2449,19 +2479,19 @@ func (w *Writer) aggregateUpto(blockFrom, blockTo uint64) error {
|
||||
commChanges.Init("commitment", w.a.aggregationStep, w.a.diffDir, false /* beforeOn */)
|
||||
var err error
|
||||
var accountsBt *btree.BTree
|
||||
if accountsBt, err = accountChanges.aggregate(blockFrom, blockTo, 0, w.tx, kv.StateAccounts); err != nil {
|
||||
if accountsBt, err = accountChanges.aggregate(blockFrom, blockTo, 0, w.a.accountsTree); err != nil {
|
||||
return fmt.Errorf("aggregate accountsChanges: %w", err)
|
||||
}
|
||||
var codeBt *btree.BTree
|
||||
if codeBt, err = codeChanges.aggregate(blockFrom, blockTo, 0, w.tx, kv.StateCode); err != nil {
|
||||
if codeBt, err = codeChanges.aggregate(blockFrom, blockTo, 0, w.a.codeTree); err != nil {
|
||||
return fmt.Errorf("aggregate codeChanges: %w", err)
|
||||
}
|
||||
var storageBt *btree.BTree
|
||||
if storageBt, err = storageChanges.aggregate(blockFrom, blockTo, 20, w.tx, kv.StateStorage); err != nil {
|
||||
if storageBt, err = storageChanges.aggregate(blockFrom, blockTo, 20, w.a.storageTree); err != nil {
|
||||
return fmt.Errorf("aggregate storageChanges: %w", err)
|
||||
}
|
||||
var commitmentBt *btree.BTree
|
||||
if commitmentBt, err = commChanges.aggregate(blockFrom, blockTo, 0, w.tx, kv.StateCommitment); err != nil {
|
||||
if commitmentBt, err = commChanges.aggregate(blockFrom, blockTo, 0, w.a.commTree); err != nil {
|
||||
return fmt.Errorf("aggregate commitmentChanges: %w", err)
|
||||
}
|
||||
aggTime := time.Since(t)
|
||||
|
@ -18,13 +18,10 @@ package aggregator
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"testing"
|
||||
|
||||
"github.com/holiman/uint256"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/memdb"
|
||||
)
|
||||
|
||||
func int160(i uint64) []byte {
|
||||
@ -71,46 +68,26 @@ func accountWithBalance(i uint64) []byte {
|
||||
|
||||
func TestSimpleAggregator(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
db := memdb.New()
|
||||
defer db.Close()
|
||||
a, err := NewAggregator(tmpDir, 16, 4)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var rwTx kv.RwTx
|
||||
if rwTx, err = db.BeginRw(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer rwTx.Rollback()
|
||||
|
||||
w := a.MakeStateWriter(true /* beforeOn */)
|
||||
if err = w.Reset(rwTx, 0); err != nil {
|
||||
if err = w.Reset(0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer w.Close()
|
||||
var account1 = accountWithBalance(1)
|
||||
if err = w.UpdateAccountData(int160(1), account1, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.UpdateAccountData(int160(1), account1, false /* trace */)
|
||||
if err = w.FinishTx(0, false); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.Aggregate(false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = rwTx.Commit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var tx kv.Tx
|
||||
if tx, err = db.BeginRo(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
r := a.MakeStateReader(tx, 2)
|
||||
var acc []byte
|
||||
if acc, err = r.ReadAccountData(int160(1), false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := a.MakeStateReader(2)
|
||||
acc := r.ReadAccountData(int160(1), false /* trace */)
|
||||
if !bytes.Equal(acc, account1) {
|
||||
t.Errorf("read account %x, expected account %x", acc, account1)
|
||||
}
|
||||
@ -119,86 +96,51 @@ func TestSimpleAggregator(t *testing.T) {
|
||||
|
||||
func TestLoopAggregator(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
db := memdb.New()
|
||||
defer db.Close()
|
||||
a, err := NewAggregator(tmpDir, 16, 4)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer a.Close()
|
||||
var account1 = accountWithBalance(1)
|
||||
var rwTx kv.RwTx
|
||||
defer func() {
|
||||
rwTx.Rollback()
|
||||
}()
|
||||
var tx kv.Tx
|
||||
defer func() {
|
||||
tx.Rollback()
|
||||
}()
|
||||
w := a.MakeStateWriter(true /* beforeOn */)
|
||||
defer w.Close()
|
||||
ctx := context.Background()
|
||||
for blockNum := uint64(0); blockNum < 1000; blockNum++ {
|
||||
accountKey := int160(blockNum/10 + 1)
|
||||
//fmt.Printf("blockNum = %d\n", blockNum)
|
||||
if rwTx, err = db.BeginRw(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.Reset(rwTx, blockNum); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.UpdateAccountData(accountKey, account1, false /* trace */); err != nil {
|
||||
if err = w.Reset(blockNum); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.UpdateAccountData(accountKey, account1, false /* trace */)
|
||||
if err = w.FinishTx(blockNum, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.Aggregate(false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = rwTx.Commit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if tx, err = db.BeginRo(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := a.MakeStateReader(tx, blockNum+1)
|
||||
var acc []byte
|
||||
if acc, err = r.ReadAccountData(accountKey, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
tx.Rollback()
|
||||
r := a.MakeStateReader(blockNum + 1)
|
||||
acc := r.ReadAccountData(accountKey, false /* trace */)
|
||||
if !bytes.Equal(acc, account1) {
|
||||
t.Errorf("read account %x, expected account %x for block %d", acc, account1, blockNum)
|
||||
}
|
||||
account1 = accountWithBalance(blockNum + 2)
|
||||
}
|
||||
if tx, err = db.BeginRo(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
blockNum := uint64(1000)
|
||||
r := a.MakeStateReader(tx, blockNum)
|
||||
r := a.MakeStateReader(blockNum)
|
||||
for i := uint64(0); i < blockNum/10+1; i++ {
|
||||
accountKey := int160(i)
|
||||
var expected []byte
|
||||
if i > 0 {
|
||||
expected = accountWithBalance(i * 10)
|
||||
}
|
||||
var acc []byte
|
||||
if acc, err = r.ReadAccountData(accountKey, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
acc := r.ReadAccountData(accountKey, false /* trace */)
|
||||
if !bytes.Equal(acc, expected) {
|
||||
t.Errorf("read account %x, expected account %x for block %d", acc, expected, i)
|
||||
}
|
||||
}
|
||||
tx.Rollback()
|
||||
}
|
||||
|
||||
func TestRecreateAccountWithStorage(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
db := memdb.New()
|
||||
defer db.Close()
|
||||
a, err := NewAggregator(tmpDir, 16, 4)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -207,46 +149,24 @@ func TestRecreateAccountWithStorage(t *testing.T) {
|
||||
accountKey := int160(1)
|
||||
var account1 = accountWithBalance(1)
|
||||
var account2 = accountWithBalance(2)
|
||||
var rwTx kv.RwTx
|
||||
defer func() {
|
||||
rwTx.Rollback()
|
||||
}()
|
||||
var tx kv.Tx
|
||||
defer func() {
|
||||
tx.Rollback()
|
||||
}()
|
||||
w := a.MakeStateWriter(true /* beforeOn */)
|
||||
defer w.Close()
|
||||
ctx := context.Background()
|
||||
for blockNum := uint64(0); blockNum < 100; blockNum++ {
|
||||
if rwTx, err = db.BeginRw(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.Reset(rwTx, blockNum); err != nil {
|
||||
if err = w.Reset(blockNum); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
switch blockNum {
|
||||
case 1:
|
||||
if err = w.UpdateAccountData(accountKey, account1, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.UpdateAccountData(accountKey, account1, false /* trace */)
|
||||
for s := uint64(0); s < 100; s++ {
|
||||
if err = w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(s+1), false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(s+1), false /* trace */)
|
||||
}
|
||||
case 22:
|
||||
if err = w.DeleteAccount(accountKey, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.DeleteAccount(accountKey, false /* trace */)
|
||||
case 45:
|
||||
if err = w.UpdateAccountData(accountKey, account2, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.UpdateAccountData(accountKey, account2, false /* trace */)
|
||||
for s := uint64(50); s < 150; s++ {
|
||||
if err = w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(2*s+1), false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.WriteAccountStorage(accountKey, int256(s), uint256.NewInt(2*s+1), false /* trace */)
|
||||
}
|
||||
}
|
||||
if err = w.FinishTx(blockNum, false /* trace */); err != nil {
|
||||
@ -255,61 +175,37 @@ func TestRecreateAccountWithStorage(t *testing.T) {
|
||||
if err = w.Aggregate(false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = rwTx.Commit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if tx, err = db.BeginRo(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := a.MakeStateReader(tx, blockNum+1)
|
||||
r := a.MakeStateReader(blockNum + 1)
|
||||
switch blockNum {
|
||||
case 1:
|
||||
var acc []byte
|
||||
if acc, err = r.ReadAccountData(accountKey, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
acc := r.ReadAccountData(accountKey, false /* trace */)
|
||||
if !bytes.Equal(account1, acc) {
|
||||
t.Errorf("wrong account after block %d, expected %x, got %x", blockNum, account1, acc)
|
||||
}
|
||||
for s := uint64(0); s < 100; s++ {
|
||||
var v *uint256.Int
|
||||
if v, err = r.ReadAccountStorage(accountKey, int256(s), false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
v := r.ReadAccountStorage(accountKey, int256(s), false /* trace */)
|
||||
if !uint256.NewInt(s + 1).Eq(v) {
|
||||
t.Errorf("wrong storage value after block %d, expected %d, got %s", blockNum, s+1, v)
|
||||
}
|
||||
}
|
||||
case 22, 44:
|
||||
var acc []byte
|
||||
if acc, err = r.ReadAccountData(accountKey, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
acc := r.ReadAccountData(accountKey, false /* trace */)
|
||||
if len(acc) > 0 {
|
||||
t.Errorf("wrong account after block %d, expected nil, got %x", blockNum, acc)
|
||||
}
|
||||
for s := uint64(0); s < 100; s++ {
|
||||
var v *uint256.Int
|
||||
if v, err = r.ReadAccountStorage(accountKey, int256(s), false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
v := r.ReadAccountStorage(accountKey, int256(s), false /* trace */)
|
||||
if v != nil {
|
||||
t.Errorf("wrong storage value after block %d, expected nil, got %s", blockNum, v)
|
||||
}
|
||||
}
|
||||
case 66:
|
||||
var acc []byte
|
||||
if acc, err = r.ReadAccountData(accountKey, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
acc := r.ReadAccountData(accountKey, false /* trace */)
|
||||
if !bytes.Equal(account2, acc) {
|
||||
t.Errorf("wrong account after block %d, expected %x, got %x", blockNum, account1, acc)
|
||||
}
|
||||
for s := uint64(0); s < 150; s++ {
|
||||
var v *uint256.Int
|
||||
if v, err = r.ReadAccountStorage(accountKey, int256(s), false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
v := r.ReadAccountStorage(accountKey, int256(s), false /* trace */)
|
||||
if s < 50 {
|
||||
if v != nil {
|
||||
t.Errorf("wrong storage value after block %d, expected nil, got %s", blockNum, v)
|
||||
@ -319,14 +215,11 @@ func TestRecreateAccountWithStorage(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
tx.Rollback()
|
||||
}
|
||||
}
|
||||
|
||||
func TestChangeCode(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
db := memdb.New()
|
||||
defer db.Close()
|
||||
a, err := NewAggregator(tmpDir, 16, 4)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -335,38 +228,18 @@ func TestChangeCode(t *testing.T) {
|
||||
accountKey := int160(1)
|
||||
var account1 = accountWithBalance(1)
|
||||
var code1 = []byte("This is the code number 1")
|
||||
//var code2 = []byte("This is the code number 2")
|
||||
var rwTx kv.RwTx
|
||||
defer func() {
|
||||
rwTx.Rollback()
|
||||
}()
|
||||
var tx kv.Tx
|
||||
defer func() {
|
||||
if tx != nil {
|
||||
tx.Rollback()
|
||||
}
|
||||
}()
|
||||
w := a.MakeStateWriter(true /* beforeOn */)
|
||||
defer w.Close()
|
||||
for blockNum := uint64(0); blockNum < 100; blockNum++ {
|
||||
if rwTx, err = db.BeginRw(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.Reset(rwTx, blockNum); err != nil {
|
||||
if err = w.Reset(blockNum); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
switch blockNum {
|
||||
case 1:
|
||||
if err = w.UpdateAccountData(accountKey, account1, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.UpdateAccountCode(accountKey, code1, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.UpdateAccountData(accountKey, account1, false /* trace */)
|
||||
w.UpdateAccountCode(accountKey, code1, false /* trace */)
|
||||
case 25:
|
||||
if err = w.DeleteAccount(accountKey, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.DeleteAccount(accountKey, false /* trace */)
|
||||
}
|
||||
if err = w.FinishTx(blockNum, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
@ -374,38 +247,22 @@ func TestChangeCode(t *testing.T) {
|
||||
if err = w.Aggregate(false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = rwTx.Commit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if tx, err = db.BeginRo(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := a.MakeStateReader(tx, blockNum+1)
|
||||
r := a.MakeStateReader(blockNum + 1)
|
||||
switch blockNum {
|
||||
case 22:
|
||||
var acc []byte
|
||||
if acc, err = r.ReadAccountData(accountKey, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
acc := r.ReadAccountData(accountKey, false /* trace */)
|
||||
if !bytes.Equal(account1, acc) {
|
||||
t.Errorf("wrong account after block %d, expected %x, got %x", blockNum, account1, acc)
|
||||
}
|
||||
var code []byte
|
||||
if code, err = r.ReadAccountCode(accountKey, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
code := r.ReadAccountCode(accountKey, false /* trace */)
|
||||
if !bytes.Equal(code1, code) {
|
||||
t.Errorf("wrong code after block %d, expected %x, got %x", blockNum, code1, code)
|
||||
}
|
||||
case 47:
|
||||
var code []byte
|
||||
if code, err = r.ReadAccountCode(accountKey, false /* trace */); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
code := r.ReadAccountCode(accountKey, false /* trace */)
|
||||
if code != nil {
|
||||
t.Errorf("wrong code after block %d, expected nil, got %x", blockNum, code)
|
||||
}
|
||||
}
|
||||
tx.Rollback()
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user