/* Copyright 2022 Erigon contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package aggregator import ( "bufio" "bytes" "container/heap" "context" "encoding/binary" "fmt" "hash" "io" "io/fs" "math" "os" "path" "path/filepath" "regexp" "sort" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/RoaringBitmap/roaring/roaring64" "github.com/google/btree" "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/commitment" "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/length" "github.com/ledgerwatch/erigon-lib/compress" "github.com/ledgerwatch/erigon-lib/recsplit" "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32" "github.com/ledgerwatch/log/v3" "github.com/spaolacci/murmur3" "golang.org/x/crypto/sha3" ) // Aggregator of multiple state files to support state reader and state writer // The convension for the file names are as follows // State is composed of three types of files: // 1. Accounts. keys are addresses (20 bytes), values are encoding of accounts // 2. Contract storage. Keys are concatenation of addresses (20 bytes) and storage locations (32 bytes), values have their leading zeroes removed // 3. Contract codes. Keys are addresses (20 bytes), values are bycodes // Within each type, any file can cover an interval of block numbers, for example, `accounts.1-16` represents changes in accounts // that were effected by the blocks from 1 to 16, inclusively. The second component of the interval will be called "end block" for the file. // Finally, for each type and interval, there are two files - one with the compressed data (extension `dat`), // and another with the index (extension `idx`) consisting of the minimal perfect hash table mapping keys to the offsets of corresponding keys // in the data file // Aggregator consists (apart from the file it is aggregating) of the 4 parts: // 1. Persistent table of expiration time for each of the files. Key - name of the file, value - timestamp, at which the file can be removed // 2. Transient (in-memory) mapping the "end block" of each file to the objects required for accessing the file (compress.Decompressor and resplit.Index) // 3. Persistent tables (one for accounts, one for contract storage, and one for contract code) summarising all the 1-block state diff files // that were not yet merged together to form larger files. In these tables, keys are the same as keys in the state diff files, but values are also // augemented by the number of state diff files this key is present. This number gets decremented every time when a 1-block state diff files is removed // from the summary table (due to being merged). And when this number gets to 0, the record is deleted from the summary table. // This number is encoded into first 4 bytes of the value // 4. Aggregating persistent hash table. Maps state keys to the block numbers for the use in the part 2 (which is not necessarily the block number where // the item last changed, but it is guaranteed to find correct element in the Transient mapping of part 2 type FileType int const ( Account FileType = iota Storage Code Commitment AccountHistory StorageHistory CodeHistory AccountBitmap StorageBitmap CodeBitmap NumberOfTypes ) const ( FirstType = Account NumberOfAccountStorageTypes = Code NumberOfStateTypes = AccountHistory ) func (ft FileType) String() string { switch ft { case Account: return "account" case Storage: return "storage" case Code: return "code" case Commitment: return "commitment" case AccountHistory: return "ahistory" case CodeHistory: return "chistory" case StorageHistory: return "shistory" case AccountBitmap: return "abitmap" case CodeBitmap: return "cbitmap" case StorageBitmap: return "sbitmap" default: panic(fmt.Sprintf("unknown file type: %d", ft)) } } func ParseFileType(s string) (FileType, bool) { switch s { case "account": return Account, true case "storage": return Storage, true case "code": return Code, true case "commitment": return Commitment, true case "ahistory": return AccountHistory, true case "chistory": return CodeHistory, true case "shistory": return StorageHistory, true case "abitmap": return AccountBitmap, true case "cbitmap": return CodeBitmap, true case "sbitmap": return StorageBitmap, true default: return NumberOfTypes, false } } type Aggregator struct { diffDir string // Directory where the state diff files are stored files [NumberOfTypes]*btree.BTree fileLocks [NumberOfTypes]sync.RWMutex unwindLimit uint64 // How far the chain may unwind aggregationStep uint64 // How many items (block, but later perhaps txs or changes) are required to form one state diff file changesBtree *btree.BTree // btree of ChangesItem trace bool // Turns on tracing for specific accounts and locations tracedKeys map[string]struct{} // Set of keys being traced during aggregations hph *commitment.HexPatriciaHashed keccak hash.Hash changesets bool // Whether to generate changesets (off by default) commitments bool // Whether to calculate commitments aggChannel chan *AggregationTask aggBackCh chan struct{} // Channel for acknoledgement of AggregationTask aggError chan error aggWg sync.WaitGroup mergeChannel chan struct{} mergeError chan error mergeWg sync.WaitGroup historyChannel chan struct{} historyError chan error historyWg sync.WaitGroup trees [NumberOfStateTypes]*btree.BTree fileHits, fileMisses uint64 // Counters for state file hit ratio arches [NumberOfStateTypes][]uint32 // Over-arching hash tables containing the block number of last aggregation archHasher murmur3.Hash128 } type ChangeFile struct { dir string step uint64 namebase string path string file *os.File w *bufio.Writer r *bufio.Reader numBuf [8]byte sizeCounter uint64 txPos int64 // Position of the last block iterated upon txNum uint64 txSize uint64 txRemaining uint64 // Remaining number of bytes to read in the current transaction words []byte // Words pending for the next block record, in the same slice wordOffsets []int // Offsets of words in the `words` slice } func (cf *ChangeFile) closeFile() error { if len(cf.wordOffsets) > 0 { return fmt.Errorf("closeFile without finish") } if cf.w != nil { if err := cf.w.Flush(); err != nil { return err } cf.w = nil } if cf.file != nil { if err := cf.file.Close(); err != nil { return err } cf.file = nil } return nil } func (cf *ChangeFile) openFile(blockNum uint64, write bool) error { if len(cf.wordOffsets) > 0 { return fmt.Errorf("openFile without finish") } rem := blockNum % cf.step startBlock := blockNum - rem endBlock := startBlock + cf.step - 1 if cf.w == nil { cf.path = filepath.Join(cf.dir, fmt.Sprintf("%s.%d-%d.chg", cf.namebase, startBlock, endBlock)) var err error if write { if cf.file, err = os.OpenFile(cf.path, os.O_RDWR|os.O_CREATE, 0755); err != nil { return err } } else { if cf.file, err = os.Open(cf.path); err != nil { return err } } if cf.txPos, err = cf.file.Seek(0, 2 /* relative to the end of the file */); err != nil { return err } if write { cf.w = bufio.NewWriter(cf.file) } cf.r = bufio.NewReader(cf.file) } return nil } func (cf *ChangeFile) rewind() error { var err error if cf.txPos, err = cf.file.Seek(0, 2 /* relative to the end of the file */); err != nil { return err } cf.r = bufio.NewReader(cf.file) return nil } func (cf *ChangeFile) add(word []byte) { cf.words = append(cf.words, word...) cf.wordOffsets = append(cf.wordOffsets, len(cf.words)) } func (cf *ChangeFile) finish(txNum uint64) error { // Write out words lastOffset := 0 for _, offset := range cf.wordOffsets { word := cf.words[lastOffset:offset] n := binary.PutUvarint(cf.numBuf[:], uint64(len(word))) if _, err := cf.w.Write(cf.numBuf[:n]); err != nil { return err } if len(word) > 0 { if _, err := cf.w.Write(word); err != nil { return err } } cf.sizeCounter += uint64(n + len(word)) lastOffset = offset } cf.words = cf.words[:0] cf.wordOffsets = cf.wordOffsets[:0] // Write out tx number and then size of changes in this block binary.BigEndian.PutUint64(cf.numBuf[:], txNum) if _, err := cf.w.Write(cf.numBuf[:]); err != nil { return err } binary.BigEndian.PutUint64(cf.numBuf[:], cf.sizeCounter) if _, err := cf.w.Write(cf.numBuf[:]); err != nil { return err } cf.sizeCounter = 0 return nil } // prevTx positions the reader to the beginning // of the transaction func (cf *ChangeFile) prevTx() (bool, error) { if cf.txPos == 0 { return false, nil } // Move back 16 bytes to read tx number and tx size pos, err := cf.file.Seek(cf.txPos-16, 0 /* relative to the beginning */) if err != nil { return false, err } cf.r.Reset(cf.file) if _, err = io.ReadFull(cf.r, cf.numBuf[:8]); err != nil { return false, err } cf.txNum = binary.BigEndian.Uint64(cf.numBuf[:]) if _, err = io.ReadFull(cf.r, cf.numBuf[:8]); err != nil { return false, err } cf.txSize = binary.BigEndian.Uint64(cf.numBuf[:]) cf.txRemaining = cf.txSize cf.txPos, err = cf.file.Seek(pos-int64(cf.txSize), 0) if err != nil { return false, err } cf.r.Reset(cf.file) return true, nil } func (cf *ChangeFile) nextWord(wordBuf []byte) ([]byte, bool, error) { if cf.txRemaining == 0 { return wordBuf, false, nil } ws, err := binary.ReadUvarint(cf.r) if err != nil { return wordBuf, false, fmt.Errorf("word size: %w", err) } var buf []byte if total := len(wordBuf) + int(ws); cap(wordBuf) >= total { buf = wordBuf[:total] // Reuse the space in wordBuf, is it has enough capacity } else { buf = make([]byte, total) copy(buf, wordBuf) } if _, err = io.ReadFull(cf.r, buf[len(wordBuf):]); err != nil { return wordBuf, false, fmt.Errorf("read word (%d %d): %w", ws, len(buf[len(wordBuf):]), err) } n := binary.PutUvarint(cf.numBuf[:], ws) cf.txRemaining -= uint64(n) + ws return buf, true, nil } func (cf *ChangeFile) deleteFile() error { return os.Remove(cf.path) } type Changes struct { namebase string keys ChangeFile before ChangeFile after ChangeFile step uint64 dir string beforeOn bool } func (c *Changes) Init(namebase string, step uint64, dir string, beforeOn bool) { c.namebase = namebase c.step = step c.dir = dir c.keys.namebase = namebase + ".keys" c.keys.dir = dir c.keys.step = step c.before.namebase = namebase + ".before" c.before.dir = dir c.before.step = step c.after.namebase = namebase + ".after" c.after.dir = dir c.after.step = step c.beforeOn = beforeOn } func (c *Changes) closeFiles() error { if err := c.keys.closeFile(); err != nil { return err } if c.beforeOn { if err := c.before.closeFile(); err != nil { return err } } if err := c.after.closeFile(); err != nil { return err } return nil } func (c *Changes) openFiles(blockNum uint64, write bool) error { if err := c.keys.openFile(blockNum, write); err != nil { return err } if c.beforeOn { if err := c.before.openFile(blockNum, write); err != nil { return err } } if err := c.after.openFile(blockNum, write); err != nil { return err } return nil } func (c *Changes) insert(key, after []byte) { c.keys.add(key) if c.beforeOn { c.before.add(nil) } c.after.add(after) } func (c *Changes) update(key, before, after []byte) { c.keys.add(key) if c.beforeOn { c.before.add(before) } c.after.add(after) } func (c *Changes) delete(key, before []byte) { c.keys.add(key) if c.beforeOn { c.before.add(before) } c.after.add(nil) } func (c *Changes) finish(txNum uint64) error { if err := c.keys.finish(txNum); err != nil { return err } if c.beforeOn { if err := c.before.finish(txNum); err != nil { return err } } if err := c.after.finish(txNum); err != nil { return err } return nil } func (c *Changes) prevTx() (bool, uint64, error) { bkeys, err := c.keys.prevTx() if err != nil { return false, 0, err } var bbefore, bafter bool if c.beforeOn { if bbefore, err = c.before.prevTx(); err != nil { return false, 0, err } } if bafter, err = c.after.prevTx(); err != nil { return false, 0, err } if c.beforeOn && bkeys != bbefore { return false, 0, fmt.Errorf("inconsistent tx iteration") } if bkeys != bafter { return false, 0, fmt.Errorf("inconsistent tx iteration") } txNum := c.keys.txNum if c.beforeOn { if txNum != c.before.txNum { return false, 0, fmt.Errorf("inconsistent txNum, keys: %d, before: %d", txNum, c.before.txNum) } } if txNum != c.after.txNum { return false, 0, fmt.Errorf("inconsistent txNum, keys: %d, after: %d", txNum, c.after.txNum) } return bkeys, txNum, nil } func (c *Changes) rewind() error { if err := c.keys.rewind(); err != nil { return err } if c.beforeOn { if err := c.before.rewind(); err != nil { return err } } if err := c.after.rewind(); err != nil { return err } return nil } func (c *Changes) nextTriple(keyBuf, beforeBuf []byte, afterBuf []byte) ([]byte, []byte, []byte, bool, error) { key, bkeys, err := c.keys.nextWord(keyBuf) if err != nil { return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("next key: %w", err) } var before, after []byte var bbefore, bafter bool if c.beforeOn { if before, bbefore, err = c.before.nextWord(beforeBuf); err != nil { return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("next before: %w", err) } } if c.beforeOn && bkeys != bbefore { return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("inconsistent word iteration") } if after, bafter, err = c.after.nextWord(afterBuf); err != nil { return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("next after: %w", err) } if bkeys != bafter { return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("inconsistent word iteration") } return key, before, after, bkeys, nil } func (c *Changes) deleteFiles() error { if err := c.keys.deleteFile(); err != nil { return err } if c.beforeOn { if err := c.before.deleteFile(); err != nil { return err } } if err := c.after.deleteFile(); err != nil { return err } return nil } func buildIndex(d *compress.Decompressor, idxPath, tmpDir string, count int) (*recsplit.Index, error) { var rs *recsplit.RecSplit var err error if rs, err = recsplit.NewRecSplit(recsplit.RecSplitArgs{ KeyCount: count, Enums: false, BucketSize: 2000, LeafSize: 8, TmpDir: tmpDir, StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, IndexFile: idxPath, }); err != nil { return nil, err } defer rs.Close() word := make([]byte, 0, 256) var pos uint64 g := d.MakeGetter() for { g.Reset(0) for g.HasNext() { word, _ = g.Next(word[:0]) if err = rs.AddKey(word, pos); err != nil { return nil, err } // Skip value pos = g.Skip() } if err = rs.Build(); err != nil { if rs.Collision() { log.Info("Building recsplit. Collision happened. It's ok. Restarting...") rs.ResetNextSalt() } else { return nil, err } } else { break } } var idx *recsplit.Index if idx, err = recsplit.OpenIndex(idxPath); err != nil { return nil, err } return idx, nil } // aggregate gathers changes from the changefiles into a B-tree, and "removes" them from the database // This function is time-critical because it needs to be run in the same go-routine (thread) as the general // execution (due to read-write tx). After that, we can optimistically execute the rest in the background func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, dbTree *btree.BTree, commitments bool) (*btree.BTree, error) { if err := c.openFiles(blockTo, false /* write */); err != nil { return nil, fmt.Errorf("open files: %w", err) } bt := btree.New(32) err := c.aggregateToBtree(bt, prefixLen, commitments) if err != nil { return nil, fmt.Errorf("aggregateToBtree: %w", err) } // Clean up the DB table var e error var search AggregateItem bt.Ascend(func(i btree.Item) bool { item := i.(*AggregateItem) if item.count == 0 { return true } search.k = item.k var prevV *AggregateItem if prevVI := dbTree.Get(&search); prevVI != nil { prevV = prevVI.(*AggregateItem) } if prevV == nil { e = fmt.Errorf("record not found in db tree for key %x", item.k) return false } if prevV.count < item.count { e = fmt.Errorf("record count too low for key [%x] count %d, subtracting %d", item.k, prevV.count, item.count) return false } if prevV.count == item.count { dbTree.Delete(prevV) } else { prevV.count -= item.count } return true }) if e != nil { return nil, fmt.Errorf("clean up after aggregation: %w", e) } return bt, nil } func (a *Aggregator) updateArch(bt *btree.BTree, fType FileType, blockNum32 uint32) { arch := a.arches[fType] h := a.archHasher n := uint64(len(arch)) if n == 0 { return } bt.Ascend(func(i btree.Item) bool { item := i.(*AggregateItem) if item.count == 0 { return true } h.Reset() h.Write(item.k) //nolint:errcheck p, _ := h.Sum128() p = p % n v := atomic.LoadUint32(&arch[p]) if v < blockNum32 { //fmt.Printf("Updated %s arch [%x]=%d %d\n", fType.String(), item.k, p, blockNum32) atomic.StoreUint32(&arch[p], blockNum32) } return true }) } type AggregateItem struct { k, v []byte count uint32 } func (i *AggregateItem) Less(than btree.Item) bool { return bytes.Compare(i.k, than.(*AggregateItem).k) < 0 } func (c *Changes) produceChangeSets(blockFrom, blockTo uint64, historyType, bitmapType FileType) (*compress.Decompressor, *recsplit.Index, *compress.Decompressor, *recsplit.Index, error) { chsetDatPath := filepath.Join(c.dir, fmt.Sprintf("%s.%d-%d.dat", historyType.String(), blockFrom, blockTo)) chsetIdxPath := filepath.Join(c.dir, fmt.Sprintf("%s.%d-%d.idx", historyType.String(), blockFrom, blockTo)) bitmapDatPath := filepath.Join(c.dir, fmt.Sprintf("%s.%d-%d.dat", bitmapType.String(), blockFrom, blockTo)) bitmapIdxPath := filepath.Join(c.dir, fmt.Sprintf("%s.%d-%d.idx", bitmapType.String(), blockFrom, blockTo)) var blockSuffix [8]byte binary.BigEndian.PutUint64(blockSuffix[:], blockTo) bitmaps := map[string]*roaring64.Bitmap{} comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, chsetDatPath, c.dir, compress.MinPatternScore, 1) if err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets NewCompressor: %w", err) } defer func() { if comp != nil { comp.Close() } }() var totalRecords int var b bool var e error var txNum uint64 var key, before, after []byte if err = c.rewind(); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets rewind: %w", err) } var txKey = make([]byte, 8, 60) for b, txNum, e = c.prevTx(); b && e == nil; b, txNum, e = c.prevTx() { binary.BigEndian.PutUint64(txKey[:8], txNum) for key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]); b && e == nil; key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]) { totalRecords++ txKey = append(txKey[:8], key...) // In the inital files and most merged file, the txKey is added to the file, but it gets removed in the final merge if err = comp.AddWord(txKey); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets AddWord key: %w", err) } if err = comp.AddWord(before); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets AddWord before: %w", err) } //if historyType == AccountHistory { // fmt.Printf("produce %s.%d-%d [%x]=>[%x]\n", historyType.String(), blockFrom, blockTo, txKey, before) //} var bitmap *roaring64.Bitmap var ok bool if bitmap, ok = bitmaps[string(key)]; !ok { bitmap = roaring64.New() bitmaps[string(key)] = bitmap } bitmap.Add(txNum) } if e != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets nextTriple: %w", e) } } if e != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets prevTx: %w", e) } if err = comp.Compress(); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets Compress: %w", err) } comp.Close() comp = nil var d *compress.Decompressor var index *recsplit.Index if d, err = compress.NewDecompressor(chsetDatPath); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets changeset decompressor: %w", err) } if index, err = buildIndex(d, chsetIdxPath, c.dir, totalRecords); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets changeset buildIndex: %w", err) } // Create bitmap files bitmapC, err := compress.NewCompressor(context.Background(), AggregatorPrefix, bitmapDatPath, c.dir, compress.MinPatternScore, 1) if err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap NewCompressor: %w", err) } defer func() { if bitmapC != nil { bitmapC.Close() } }() idxKeys := make([]string, len(bitmaps)) i := 0 var buf []byte for key := range bitmaps { idxKeys[i] = key i++ } sort.Strings(idxKeys) for _, key := range idxKeys { if err = bitmapC.AddWord([]byte(key)); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap add key: %w", err) } bitmap := bitmaps[key] ef := eliasfano32.NewEliasFano(bitmap.GetCardinality(), bitmap.Maximum()) it := bitmap.Iterator() for it.HasNext() { v := it.Next() ef.AddOffset(v) } ef.Build() buf = ef.AppendBytes(buf[:0]) if err = bitmapC.AddUncompressedWord(buf); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap add val: %w", err) } } if err = bitmapC.Compress(); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap Compress: %w", err) } bitmapC.Close() bitmapC = nil bitmapD, err := compress.NewDecompressor(bitmapDatPath) if err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap decompressor: %w", err) } bitmapI, err := buildIndex(bitmapD, bitmapIdxPath, c.dir, len(idxKeys)) if err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap buildIndex: %w", err) } return d, index, bitmapD, bitmapI, nil } // aggregateToBtree iterates over all available changes in the change files covered by this instance `c` // (there are 3 of them, one for "keys", one for values "before" every change, and one for values "after" every change) // and create a B-tree where each key is only represented once, with the value corresponding to the "after" value // of the latest change. func (c *Changes) aggregateToBtree(bt *btree.BTree, prefixLen int, commitments bool) error { var b bool var e error var key, before, after []byte var ai AggregateItem var prefix []byte // Note that the following loop iterates over transactions backwards, therefore it does not replace entries in the B-tree, // but instead just updates their "change count" and the first byte of the value (insertion vs update flag) for b, _, e = c.prevTx(); b && e == nil; b, _, e = c.prevTx() { // Within each transaction, keys are unique, but they can appear in any order for key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]); b && e == nil; key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]) { if prefixLen > 0 && !bytes.Equal(prefix, key[:prefixLen]) { prefix = common.Copy(key[:prefixLen]) item := &AggregateItem{k: prefix, count: 0} bt.ReplaceOrInsert(item) } ai.k = key i := bt.Get(&ai) if i == nil { item := &AggregateItem{k: common.Copy(key), v: common.Copy(after), count: 1} bt.ReplaceOrInsert(item) } else { item := i.(*AggregateItem) if commitments { var err error var mergedVal []byte if mergedVal, err = commitment.MergeBranches(after, item.v, nil); err != nil { return fmt.Errorf("merge branches: %w", err) } //fmt.Printf("aggregateToBtree prefix [%x], [%x]+[%x]=>[%x]\n", commitment.CompactToHex(key), after, item.v, mergedVal) item.v = mergedVal } item.count++ } } if e != nil { return fmt.Errorf("aggregateToBtree nextTriple: %w", e) } } if e != nil { return fmt.Errorf("aggregateToBtree prevTx: %w", e) } return nil } const AggregatorPrefix = "aggregator" func btreeToFile(bt *btree.BTree, datPath string, tmpdir string, trace bool, workers int) (int, error) { comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datPath, tmpdir, compress.MinPatternScore, workers) if err != nil { return 0, err } defer comp.Close() comp.SetTrace(trace) count := 0 bt.Ascend(func(i btree.Item) bool { item := i.(*AggregateItem) if err = comp.AddWord(item.k); err != nil { return false } count++ // Only counting keys, not values if err = comp.AddWord(item.v); err != nil { return false } return true }) if err != nil { return 0, err } if err = comp.Compress(); err != nil { return 0, err } return count, nil } type ChangesItem struct { endBlock uint64 startBlock uint64 fileCount int } func (i *ChangesItem) Less(than btree.Item) bool { if i.endBlock == than.(*ChangesItem).endBlock { // Larger intevals will come last return i.startBlock > than.(*ChangesItem).startBlock } return i.endBlock < than.(*ChangesItem).endBlock } type byEndBlockItem struct { startBlock uint64 endBlock uint64 decompressor *compress.Decompressor getter *compress.Getter // reader for the decompressor getterMerge *compress.Getter // reader for the decomporessor used in the background merge thread index *recsplit.Index indexReader *recsplit.IndexReader // reader for the index readerMerge *recsplit.IndexReader // index reader for the background merge thread tree *btree.BTree // Substitute for decompressor+index combination } func (i *byEndBlockItem) Less(than btree.Item) bool { if i.endBlock == than.(*byEndBlockItem).endBlock { return i.startBlock > than.(*byEndBlockItem).startBlock } return i.endBlock < than.(*byEndBlockItem).endBlock } func (a *Aggregator) scanStateFiles(files []fs.DirEntry) { typeStrings := make([]string, NumberOfTypes) for fType := FileType(0); fType < NumberOfTypes; fType++ { typeStrings[fType] = fType.String() } re := regexp.MustCompile("(" + strings.Join(typeStrings, "|") + ").([0-9]+)-([0-9]+).(dat|idx)") var err error for _, f := range files { name := f.Name() subs := re.FindStringSubmatch(name) if len(subs) != 5 { if len(subs) != 0 { log.Warn("File ignored by aggregator, more than 4 submatches", "name", name, "submatches", len(subs)) } continue } var startBlock, endBlock uint64 if startBlock, err = strconv.ParseUint(subs[2], 10, 64); err != nil { log.Warn("File ignored by aggregator, parsing startBlock", "error", err, "name", name) continue } if endBlock, err = strconv.ParseUint(subs[3], 10, 64); err != nil { log.Warn("File ignored by aggregator, parsing endBlock", "error", err, "name", name) continue } if startBlock > endBlock { log.Warn("File ignored by aggregator, startBlock > endBlock", "name", name) continue } fType, ok := ParseFileType(subs[1]) if !ok { log.Warn("File ignored by aggregator, type unknown", "type", subs[1]) } var item = &byEndBlockItem{startBlock: startBlock, endBlock: endBlock} var foundI *byEndBlockItem a.files[fType].AscendGreaterOrEqual(&byEndBlockItem{startBlock: endBlock, endBlock: endBlock}, func(i btree.Item) bool { it := i.(*byEndBlockItem) if it.endBlock == endBlock { foundI = it } return false }) if foundI == nil || foundI.startBlock > startBlock { log.Info("Load state file", "name", name, "type", fType.String(), "startBlock", startBlock, "endBlock", endBlock) a.files[fType].ReplaceOrInsert(item) } } } func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, changesets, commitments bool, minArch uint64) (*Aggregator, error) { a := &Aggregator{ diffDir: diffDir, unwindLimit: unwindLimit, aggregationStep: aggregationStep, tracedKeys: map[string]struct{}{}, keccak: sha3.NewLegacyKeccak256(), hph: commitment.NewHexPatriciaHashed(length.Addr, nil, nil, nil, nil, nil), aggChannel: make(chan *AggregationTask), aggBackCh: make(chan struct{}), aggError: make(chan error, 1), mergeChannel: make(chan struct{}, 1), mergeError: make(chan error, 1), historyChannel: make(chan struct{}, 1), historyError: make(chan error, 1), changesets: changesets, commitments: commitments, archHasher: murmur3.New128WithSeed(0), // TODO: Randomise salt } for fType := FirstType; fType < NumberOfTypes; fType++ { a.files[fType] = btree.New(32) } for fType := FirstType; fType < NumberOfStateTypes; fType++ { a.trees[fType] = btree.New(32) } var closeStateFiles = true // It will be set to false in case of success at the end of the function defer func() { // Clean up all decompressor and indices upon error if closeStateFiles { a.Close() } }() // Scan the diff directory and create the mapping of end blocks to files files, err := os.ReadDir(diffDir) if err != nil { return nil, err } a.scanStateFiles(files) // Check for overlaps and holes for fType := FirstType; fType < NumberOfTypes; fType++ { if err := checkOverlaps(fType.String(), a.files[fType]); err != nil { return nil, err } } // Open decompressor and index files for all items in state trees for fType := FirstType; fType < NumberOfTypes; fType++ { if err := a.openFiles(fType, minArch); err != nil { return nil, fmt.Errorf("opening %s state files: %w", fType.String(), err) } } a.changesBtree = btree.New(32) re := regexp.MustCompile(`(account|storage|code|commitment).(keys|before|after).([0-9]+)-([0-9]+).chg`) for _, f := range files { name := f.Name() subs := re.FindStringSubmatch(name) if len(subs) != 5 { if len(subs) != 0 { log.Warn("File ignored by changes scan, more than 4 submatches", "name", name, "submatches", len(subs)) } continue } var startBlock, endBlock uint64 if startBlock, err = strconv.ParseUint(subs[3], 10, 64); err != nil { log.Warn("File ignored by changes scan, parsing startBlock", "error", err, "name", name) continue } if endBlock, err = strconv.ParseUint(subs[4], 10, 64); err != nil { log.Warn("File ignored by changes scan, parsing endBlock", "error", err, "name", name) continue } if startBlock > endBlock { log.Warn("File ignored by changes scan, startBlock > endBlock", "name", name) continue } if endBlock != startBlock+aggregationStep-1 { log.Warn("File ignored by changes scan, endBlock != startBlock+aggregationStep-1", "name", name) continue } var item = &ChangesItem{fileCount: 1, startBlock: startBlock, endBlock: endBlock} i := a.changesBtree.Get(item) if i == nil { a.changesBtree.ReplaceOrInsert(item) } else { item = i.(*ChangesItem) if item.startBlock == startBlock { item.fileCount++ } else { return nil, fmt.Errorf("change files overlap [%d-%d] with [%d-%d]", item.startBlock, item.endBlock, startBlock, endBlock) } } } // Check for holes in change files minStart := uint64(math.MaxUint64) a.changesBtree.Descend(func(i btree.Item) bool { item := i.(*ChangesItem) if item.startBlock < minStart { if item.endBlock >= minStart { err = fmt.Errorf("overlap of change files [%d-%d] with %d", item.startBlock, item.endBlock, minStart) return false } if minStart != math.MaxUint64 && item.endBlock+1 != minStart { err = fmt.Errorf("whole in change files [%d-%d]", item.endBlock, minStart) return false } minStart = item.startBlock } else { err = fmt.Errorf("overlap of change files [%d-%d] with %d", item.startBlock, item.endBlock, minStart) return false } return true }) if err != nil { return nil, err } for fType := FirstType; fType < NumberOfStateTypes; fType++ { if err = checkOverlapWithMinStart(fType.String(), a.files[fType], minStart); err != nil { return nil, err } } if err = a.rebuildRecentState(); err != nil { return nil, fmt.Errorf("rebuilding recent state from change files: %w", err) } closeStateFiles = false a.aggWg.Add(1) go a.backgroundAggregation() a.mergeWg.Add(1) go a.backgroundMerge() if a.changesets { a.historyWg.Add(1) go a.backgroundHistoryMerge() } return a, nil } // rebuildRecentState reads change files and reconstructs the recent state func (a *Aggregator) rebuildRecentState() error { t := time.Now() var err error a.changesBtree.Descend(func(i btree.Item) bool { item := i.(*ChangesItem) for fType := FirstType; fType < NumberOfStateTypes; fType++ { var changes Changes changes.Init(fType.String(), a.aggregationStep, a.diffDir, false /* beforeOn */) if changes.openFiles(item.startBlock, false /* write */); err != nil { return false } if err = changes.aggregateToBtree(a.trees[fType], 0, fType == Commitment); err != nil { return false } if err = changes.closeFiles(); err != nil { return false } } return true }) if err != nil { return err } log.Info("reconstructed recent state", "in", time.Since(t)) return nil } type AggregationTask struct { changes [NumberOfStateTypes]Changes bt [NumberOfStateTypes]*btree.BTree blockFrom uint64 blockTo uint64 } func (a *Aggregator) removeLocked(fType FileType, toRemove []*byEndBlockItem, item *byEndBlockItem) { a.fileLocks[fType].Lock() defer a.fileLocks[fType].Unlock() if len(toRemove) > 1 { for _, ag := range toRemove { a.files[fType].Delete(ag) } a.files[fType].ReplaceOrInsert(item) } } func (a *Aggregator) removeLockedState( accountsToRemove []*byEndBlockItem, accountsItem *byEndBlockItem, codeToRemove []*byEndBlockItem, codeItem *byEndBlockItem, storageToRemove []*byEndBlockItem, storageItem *byEndBlockItem, commitmentToRemove []*byEndBlockItem, commitmentItem *byEndBlockItem, ) { for fType := FirstType; fType < NumberOfStateTypes; fType++ { a.fileLocks[fType].Lock() defer a.fileLocks[fType].Unlock() } if len(accountsToRemove) > 1 { for _, ag := range accountsToRemove { a.files[Account].Delete(ag) } a.files[Account].ReplaceOrInsert(accountsItem) } if len(codeToRemove) > 1 { for _, ag := range codeToRemove { a.files[Code].Delete(ag) } a.files[Code].ReplaceOrInsert(codeItem) } if len(storageToRemove) > 1 { for _, ag := range storageToRemove { a.files[Storage].Delete(ag) } a.files[Storage].ReplaceOrInsert(storageItem) } if len(commitmentToRemove) > 1 { for _, ag := range commitmentToRemove { a.files[Commitment].Delete(ag) } a.files[Commitment].ReplaceOrInsert(commitmentItem) } } func removeFiles(fType FileType, diffDir string, toRemove []*byEndBlockItem) error { // Close all the memory maps etc for _, ag := range toRemove { if err := ag.index.Close(); err != nil { return fmt.Errorf("close index: %w", err) } if err := ag.decompressor.Close(); err != nil { return fmt.Errorf("close decompressor: %w", err) } } // Delete files // TODO: in a non-test version, this is delayed to allow other participants to roll over to the next file for _, ag := range toRemove { if err := os.Remove(path.Join(diffDir, fmt.Sprintf("%s.%d-%d.dat", fType.String(), ag.startBlock, ag.endBlock))); err != nil { return fmt.Errorf("remove decompressor file %s.%d-%d.dat: %w", fType.String(), ag.startBlock, ag.endBlock, err) } if err := os.Remove(path.Join(diffDir, fmt.Sprintf("%s.%d-%d.idx", fType.String(), ag.startBlock, ag.endBlock))); err != nil { return fmt.Errorf("remove index file %s.%d-%d.idx: %w", fType.String(), ag.startBlock, ag.endBlock, err) } } return nil } // backgroundAggregation is the functin that runs in a background go-routine and performs creation of initial state files // allowing the main goroutine to proceed func (a *Aggregator) backgroundAggregation() { defer a.aggWg.Done() for aggTask := range a.aggChannel { typesLimit := Commitment if a.commitments { typesLimit = AccountHistory } for fType := FirstType; fType < typesLimit; fType++ { if fType < NumberOfStateTypes { a.updateArch(aggTask.bt[fType], fType, uint32(aggTask.blockTo)) } a.addLocked(fType, &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo, tree: aggTask.bt[fType]}) } a.aggBackCh <- struct{}{} if a.changesets { if historyD, historyI, bitmapD, bitmapI, err := aggTask.changes[Account].produceChangeSets(aggTask.blockFrom, aggTask.blockTo, AccountHistory, AccountBitmap); err == nil { var historyItem = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} historyItem.decompressor = historyD historyItem.index = historyI historyItem.getter = historyItem.decompressor.MakeGetter() historyItem.getterMerge = historyItem.decompressor.MakeGetter() historyItem.indexReader = recsplit.NewIndexReader(historyItem.index) historyItem.readerMerge = recsplit.NewIndexReader(historyItem.index) a.addLocked(AccountHistory, historyItem) var bitmapItem = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} bitmapItem.decompressor = bitmapD bitmapItem.index = bitmapI bitmapItem.getter = bitmapItem.decompressor.MakeGetter() bitmapItem.getterMerge = bitmapItem.decompressor.MakeGetter() bitmapItem.indexReader = recsplit.NewIndexReader(bitmapItem.index) bitmapItem.readerMerge = recsplit.NewIndexReader(bitmapItem.index) a.addLocked(AccountBitmap, bitmapItem) } else { a.aggError <- fmt.Errorf("produceChangeSets %s: %w", Account.String(), err) return } if historyD, historyI, bitmapD, bitmapI, err := aggTask.changes[Storage].produceChangeSets(aggTask.blockFrom, aggTask.blockTo, StorageHistory, StorageBitmap); err == nil { var historyItem = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} historyItem.decompressor = historyD historyItem.index = historyI historyItem.getter = historyItem.decompressor.MakeGetter() historyItem.getterMerge = historyItem.decompressor.MakeGetter() historyItem.indexReader = recsplit.NewIndexReader(historyItem.index) historyItem.readerMerge = recsplit.NewIndexReader(historyItem.index) a.addLocked(StorageHistory, historyItem) var bitmapItem = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} bitmapItem.decompressor = bitmapD bitmapItem.index = bitmapI bitmapItem.getter = bitmapItem.decompressor.MakeGetter() bitmapItem.getterMerge = bitmapItem.decompressor.MakeGetter() bitmapItem.indexReader = recsplit.NewIndexReader(bitmapItem.index) bitmapItem.readerMerge = recsplit.NewIndexReader(bitmapItem.index) a.addLocked(StorageBitmap, bitmapItem) } else { a.aggError <- fmt.Errorf("produceChangeSets %s: %w", Storage.String(), err) return } if historyD, historyI, bitmapD, bitmapI, err := aggTask.changes[Code].produceChangeSets(aggTask.blockFrom, aggTask.blockTo, CodeHistory, CodeBitmap); err == nil { var historyItem = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} historyItem.decompressor = historyD historyItem.index = historyI historyItem.getter = historyItem.decompressor.MakeGetter() historyItem.getterMerge = historyItem.decompressor.MakeGetter() historyItem.indexReader = recsplit.NewIndexReader(historyItem.index) historyItem.readerMerge = recsplit.NewIndexReader(historyItem.index) a.addLocked(CodeHistory, historyItem) var bitmapItem = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} bitmapItem.decompressor = bitmapD bitmapItem.index = bitmapI bitmapItem.getter = bitmapItem.decompressor.MakeGetter() bitmapItem.getterMerge = bitmapItem.decompressor.MakeGetter() bitmapItem.indexReader = recsplit.NewIndexReader(bitmapItem.index) bitmapItem.readerMerge = recsplit.NewIndexReader(bitmapItem.index) a.addLocked(CodeBitmap, bitmapItem) } else { a.aggError <- fmt.Errorf("produceChangeSets %s: %w", Code.String(), err) return } } for fType := FirstType; fType < typesLimit; fType++ { var err error if err = aggTask.changes[fType].closeFiles(); err != nil { a.aggError <- fmt.Errorf("close %sChanges: %w", fType.String(), err) return } var item = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} if item.decompressor, item.index, err = createDatAndIndex(fType.String(), a.diffDir, aggTask.bt[fType], aggTask.blockFrom, aggTask.blockTo); err != nil { a.aggError <- fmt.Errorf("createDatAndIndex %s: %w", fType.String(), err) return } item.getter = item.decompressor.MakeGetter() item.getterMerge = item.decompressor.MakeGetter() item.indexReader = recsplit.NewIndexReader(item.index) item.readerMerge = recsplit.NewIndexReader(item.index) if err = aggTask.changes[fType].deleteFiles(); err != nil { a.aggError <- fmt.Errorf("delete %sChanges: %w", fType.String(), err) return } a.addLocked(fType, item) } // At this point, 3 new state files (containing latest changes) has been created for accounts, code, and storage // Corresponding items has been added to the registy of state files, and B-tree are not necessary anymore, change files can be removed // What follows can be performed by the 2nd background goroutine select { case a.mergeChannel <- struct{}{}: default: } select { case a.historyChannel <- struct{}{}: default: } } } type CommitmentValTransform struct { pre [NumberOfAccountStorageTypes][]*byEndBlockItem // List of state files before the merge post [NumberOfAccountStorageTypes][]*byEndBlockItem // List of state files aftee the merge } func decodeU64(from []byte) uint64 { var i uint64 for _, b := range from { i = (i << 8) | uint64(b) } return i } func encodeU64(i uint64, to []byte) []byte { // writes i to b in big endian byte order, using the least number of bytes needed to represent i. switch { case i < (1 << 8): return append(to, byte(i)) case i < (1 << 16): return append(to, byte(i>>8), byte(i)) case i < (1 << 24): return append(to, byte(i>>16), byte(i>>8), byte(i)) case i < (1 << 32): return append(to, byte(i>>24), byte(i>>16), byte(i>>8), byte(i)) case i < (1 << 40): return append(to, byte(i>>32), byte(i>>24), byte(i>>16), byte(i>>8), byte(i)) case i < (1 << 48): return append(to, byte(i>>40), byte(i>>32), byte(i>>24), byte(i>>16), byte(i>>8), byte(i)) case i < (1 << 56): return append(to, byte(i>>48), byte(i>>40), byte(i>>32), byte(i>>24), byte(i>>16), byte(i>>8), byte(i)) default: return append(to, byte(i>>56), byte(i>>48), byte(i>>40), byte(i>>32), byte(i>>24), byte(i>>16), byte(i>>8), byte(i)) } } // commitmentValTransform parses the value of of the commitment record to extract references // to accounts and storage items, then looks them up in the new, merged files, and replaces them with // the updated references func (cvt *CommitmentValTransform) commitmentValTransform(val []byte, transValBuf []byte) ([]byte, error) { if len(val) == 0 { return transValBuf, nil } accountPlainKeys, storagePlainKeys, err := commitment.ExtractPlainKeys(val) if err != nil { return nil, err } var transAccountPks [][]byte var transStoragePks [][]byte var apkBuf, spkBuf []byte for _, accountPlainKey := range accountPlainKeys { if len(accountPlainKey) == length.Addr { // Non-optimised key originating from a database record apkBuf = append(apkBuf[:0], accountPlainKey...) } else { // Optimised key referencing a state file record (file number and offset within the file) fileI := int(accountPlainKey[0]) offset := decodeU64(accountPlainKey[1:]) g := cvt.pre[Account][fileI].getterMerge g.Reset(offset) apkBuf, _ = g.Next(apkBuf[:0]) //fmt.Printf("replacing account [%x] from [%x]\n", apkBuf, accountPlainKey) } // Look up apkBuf in the post account files for j := len(cvt.post[Account]); j > 0; j-- { item := cvt.post[Account][j-1] if item.index.Empty() { continue } offset := item.readerMerge.Lookup(apkBuf) g := item.getterMerge g.Reset(offset) if g.HasNext() { if keyMatch, _ := g.Match(apkBuf); keyMatch { accountPlainKey = encodeU64(offset, []byte{byte(j - 1)}) //fmt.Printf("replaced account [%x]=>[%x] for file [%d-%d]\n", apkBuf, accountPlainKey, item.startBlock, item.endBlock) break } } } transAccountPks = append(transAccountPks, accountPlainKey) } for _, storagePlainKey := range storagePlainKeys { if len(storagePlainKey) == length.Addr+length.Hash { // Non-optimised key originating from a database record spkBuf = append(spkBuf[:0], storagePlainKey...) } else { // Optimised key referencing a state file record (file number and offset within the file) fileI := int(storagePlainKey[0]) offset := decodeU64(storagePlainKey[1:]) g := cvt.pre[Storage][fileI].getterMerge g.Reset(offset) spkBuf, _ = g.Next(spkBuf[:0]) //fmt.Printf("replacing storage [%x] from [%x]\n", spkBuf, storagePlainKey) } // Lookup spkBuf in the post storage files for j := len(cvt.post[Storage]); j > 0; j-- { item := cvt.post[Storage][j-1] if item.index.Empty() { continue } offset := item.readerMerge.Lookup(spkBuf) g := item.getterMerge g.Reset(offset) if g.HasNext() { if keyMatch, _ := g.Match(spkBuf); keyMatch { storagePlainKey = encodeU64(offset, []byte{byte(j - 1)}) //fmt.Printf("replaced storage [%x]=>[%x]\n", spkBuf, storagePlainKey) break } } } transStoragePks = append(transStoragePks, storagePlainKey) } if transValBuf, err = commitment.ReplacePlainKeys(val, transAccountPks, transStoragePks, transValBuf); err != nil { return nil, err } return transValBuf, nil } func (a *Aggregator) backgroundMerge() { defer a.mergeWg.Done() for range a.mergeChannel { t := time.Now() var err error var cvt CommitmentValTransform var toRemove [NumberOfStateTypes][]*byEndBlockItem var newItems [NumberOfStateTypes]*byEndBlockItem var blockFrom, blockTo uint64 lastType := Code typesLimit := Commitment if a.commitments { lastType = Commitment typesLimit = AccountHistory } // Lock the set of commitment (or code if commitments are off) files - those are the smallest, because account, storage and code files may be added by the aggregation thread first toRemove[lastType], _, _, blockFrom, blockTo = a.findLargestMerge(lastType, uint64(math.MaxUint64) /* maxBlockTo */, uint64(math.MaxUint64) /* maxSpan */) for fType := FirstType; fType < typesLimit; fType++ { var pre, post []*byEndBlockItem var from, to uint64 if fType == lastType { from = blockFrom to = blockTo } else { toRemove[fType], pre, post, from, to = a.findLargestMerge(fType, blockTo, uint64(math.MaxUint64) /* maxSpan */) if from != blockFrom { a.mergeError <- fmt.Errorf("%sFrom %d != blockFrom %d", fType.String(), from, blockFrom) return } if to != blockTo { a.mergeError <- fmt.Errorf("%sTo %d != blockTo %d", fType.String(), to, blockTo) return } } if len(toRemove[fType]) > 1 { var valTransform func([]byte, []byte) ([]byte, error) var mergeFunc func([]byte, []byte, []byte) ([]byte, error) if fType == Commitment { valTransform = cvt.commitmentValTransform mergeFunc = mergeCommitments } else { mergeFunc = mergeReplace } var prefixLen int if fType == Storage { prefixLen = length.Addr } if newItems[fType], err = a.computeAggregation(fType, toRemove[fType], from, to, valTransform, mergeFunc, true /* valCompressed */, true /* withIndex */, prefixLen); err != nil { a.mergeError <- fmt.Errorf("computeAggreation %s: %w", fType.String(), err) return } post = append(post, newItems[fType]) } if fType < NumberOfAccountStorageTypes { cvt.pre[fType] = pre cvt.post[fType] = post } } // Switch aggregator to new state files, close and remove old files a.removeLockedState(toRemove[Account], newItems[Account], toRemove[Code], newItems[Code], toRemove[Storage], newItems[Storage], toRemove[Commitment], newItems[Commitment]) removed := 0 for fType := FirstType; fType < typesLimit; fType++ { if len(toRemove[fType]) > 1 { removeFiles(fType, a.diffDir, toRemove[fType]) removed += len(toRemove[fType]) - 1 } } mergeTime := time.Since(t) if mergeTime > time.Minute { log.Info("Long merge", "from", blockFrom, "to", blockTo, "files", removed, "time", time.Since(t)) } } } func (a *Aggregator) reduceHistoryFiles(fType FileType, item *byEndBlockItem) error { datTmpPath := filepath.Join(a.diffDir, fmt.Sprintf("%s.%d-%d.dat.tmp", fType.String(), item.startBlock, item.endBlock)) datPath := filepath.Join(a.diffDir, fmt.Sprintf("%s.%d-%d.dat", fType.String(), item.startBlock, item.endBlock)) idxPath := filepath.Join(a.diffDir, fmt.Sprintf("%s.%d-%d.idx", fType.String(), item.startBlock, item.endBlock)) comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datTmpPath, a.diffDir, compress.MinPatternScore, 1) if err != nil { return fmt.Errorf("reduceHistoryFiles create compressor %s: %w", datPath, err) } defer comp.Close() g := item.getter var val []byte var count int g.Reset(0) var key []byte for g.HasNext() { g.Skip() // Skip key on on the first pass val, _ = g.Next(val[:0]) //fmt.Printf("reduce1 [%s.%d-%d] [%x]=>[%x]\n", fType.String(), item.startBlock, item.endBlock, key, val) if err = comp.AddWord(val); err != nil { return fmt.Errorf("reduceHistoryFiles AddWord: %w", err) } count++ } if err = comp.Compress(); err != nil { return fmt.Errorf("reduceHistoryFiles compress: %w", err) } var d *compress.Decompressor if d, err = compress.NewDecompressor(datTmpPath); err != nil { return fmt.Errorf("reduceHistoryFiles create decompressor: %w", err) } var rs *recsplit.RecSplit if rs, err = recsplit.NewRecSplit(recsplit.RecSplitArgs{ KeyCount: count, Enums: false, BucketSize: 2000, LeafSize: 8, TmpDir: a.diffDir, StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, IndexFile: idxPath, }); err != nil { return fmt.Errorf("reduceHistoryFiles NewRecSplit: %w", err) } g1 := d.MakeGetter() for { g.Reset(0) g1.Reset(0) var lastOffset uint64 for g.HasNext() { key, _ = g.Next(key[:0]) g.Skip() // Skip value _, pos := g1.Next(nil) //fmt.Printf("reduce2 [%s.%d-%d] [%x]==>%d\n", fType.String(), item.startBlock, item.endBlock, key, lastOffset) if err = rs.AddKey(key, lastOffset); err != nil { return fmt.Errorf("reduceHistoryFiles %p AddKey: %w", rs, err) } lastOffset = pos } if err = rs.Build(); err != nil { if rs.Collision() { log.Info("Building reduceHistoryFiles. Collision happened. It's ok. Restarting...") rs.ResetNextSalt() } else { return fmt.Errorf("reduceHistoryFiles Build: %w", err) } } else { break } } if err = item.decompressor.Close(); err != nil { return fmt.Errorf("reduceHistoryFiles close decompressor: %w", err) } if err = os.Remove(datPath); err != nil { return fmt.Errorf("reduceHistoryFiles remove: %w", err) } if err = os.Rename(datTmpPath, datPath); err != nil { return fmt.Errorf("reduceHistoryFiles rename: %w", err) } if item.decompressor, err = compress.NewDecompressor(datPath); err != nil { return fmt.Errorf("reduceHistoryFiles create new decompressor: %w", err) } item.getter = item.decompressor.MakeGetter() item.getterMerge = item.decompressor.MakeGetter() if item.index, err = recsplit.OpenIndex(idxPath); err != nil { return fmt.Errorf("reduceHistoryFiles open index: %w", err) } item.indexReader = recsplit.NewIndexReader(item.index) item.readerMerge = recsplit.NewIndexReader(item.index) return nil } func mergeReplace(preval, val, buf []byte) ([]byte, error) { return append(buf, val...), nil } func mergeBitmaps(preval, val, buf []byte) ([]byte, error) { preef, _ := eliasfano32.ReadEliasFano(preval) ef, _ := eliasfano32.ReadEliasFano(val) //fmt.Printf("mergeBitmaps [%x] (count=%d,max=%d) + [%x] (count=%d,max=%d)\n", preval, preef.Count(), preef.Max(), val, ef.Count(), ef.Max()) preIt := preef.Iterator() efIt := ef.Iterator() newEf := eliasfano32.NewEliasFano(preef.Count()+ef.Count(), ef.Max()) for preIt.HasNext() { newEf.AddOffset(preIt.Next()) } for efIt.HasNext() { newEf.AddOffset(efIt.Next()) } newEf.Build() return newEf.AppendBytes(buf), nil } func mergeCommitments(preval, val, buf []byte) ([]byte, error) { return commitment.MergeBranches(preval, val, buf) } func (a *Aggregator) backgroundHistoryMerge() { defer a.historyWg.Done() for range a.historyChannel { t := time.Now() var err error var toRemove [NumberOfTypes][]*byEndBlockItem var newItems [NumberOfTypes]*byEndBlockItem var blockFrom, blockTo uint64 // Lock the set of commitment files - those are the smallest, because account, storage and code files may be added by the aggregation thread first toRemove[CodeBitmap], _, _, blockFrom, blockTo = a.findLargestMerge(CodeBitmap, uint64(math.MaxUint64) /* maxBlockTo */, 500_000 /* maxSpan */) finalMerge := blockTo-blockFrom+1 == 500_000 for fType := AccountHistory; fType < NumberOfTypes; fType++ { var from, to uint64 if fType == CodeBitmap { from = blockFrom to = blockTo } else { toRemove[fType], _, _, from, to = a.findLargestMerge(fType, blockTo, 500_000 /* maxSpan */) if from != blockFrom { a.historyError <- fmt.Errorf("%sFrom %d != blockFrom %d", fType.String(), from, blockFrom) return } if to != blockTo { a.historyError <- fmt.Errorf("%sTo %d != blockTo %d", fType.String(), to, blockTo) return } } if len(toRemove[fType]) > 1 { isBitmap := fType == AccountBitmap || fType == StorageBitmap || fType == CodeBitmap var mergeFunc func([]byte, []byte, []byte) ([]byte, error) if isBitmap { mergeFunc = mergeBitmaps } else if fType == Commitment { mergeFunc = mergeCommitments } else { mergeFunc = mergeReplace } if newItems[fType], err = a.computeAggregation(fType, toRemove[fType], from, to, nil /* valTransform */, mergeFunc, !isBitmap /* valCompressed */, !finalMerge || isBitmap /* withIndex */, 0 /* prefixLen */); err != nil { a.historyError <- fmt.Errorf("computeAggreation %s: %w", fType.String(), err) return } } } if finalMerge { // Special aggregation for blockTo - blockFrom + 1 == 500_000 // Remove keys from the .dat files assuming that they will only be used after querying the bitmap index // and therefore, there is no situation where non-existent key is queried. if err = a.reduceHistoryFiles(AccountHistory, newItems[AccountHistory]); err != nil { a.historyError <- fmt.Errorf("reduceHistoryFiles %s: %w", AccountHistory.String(), err) return } if err = a.reduceHistoryFiles(StorageHistory, newItems[StorageHistory]); err != nil { a.historyError <- fmt.Errorf("reduceHistoryFiles %s: %w", StorageHistory.String(), err) return } if err = a.reduceHistoryFiles(CodeHistory, newItems[CodeHistory]); err != nil { a.historyError <- fmt.Errorf("reduceHistoryFiles %s: %w", CodeHistory.String(), err) return } } for fType := AccountHistory; fType < NumberOfTypes; fType++ { a.removeLocked(fType, toRemove[fType], newItems[fType]) } removed := 0 for fType := AccountHistory; fType < NumberOfTypes; fType++ { if len(toRemove[fType]) > 1 { removeFiles(fType, a.diffDir, toRemove[fType]) removed += len(toRemove[fType]) - 1 } } mergeTime := time.Since(t) if mergeTime > time.Minute { log.Info("Long history merge", "from", blockFrom, "to", blockTo, "files", removed, "time", time.Since(t)) } } } // checkOverlaps does not lock tree, because it is only called from the constructor of aggregator func checkOverlaps(treeName string, tree *btree.BTree) error { var minStart uint64 = math.MaxUint64 var err error tree.Descend(func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.startBlock < minStart { if item.endBlock >= minStart { err = fmt.Errorf("overlap of %s state files [%d-%d] with %d", treeName, item.startBlock, item.endBlock, minStart) return false } if minStart != math.MaxUint64 && item.endBlock+1 != minStart { err = fmt.Errorf("hole in %s state files [%d-%d]", treeName, item.endBlock, minStart) return false } minStart = item.startBlock } return true }) return err } func (a *Aggregator) openFiles(fType FileType, minArch uint64) error { var err error var totalKeys uint64 a.files[fType].Ascend(func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.decompressor, err = compress.NewDecompressor(path.Join(a.diffDir, fmt.Sprintf("%s.%d-%d.dat", fType.String(), item.startBlock, item.endBlock))); err != nil { return false } if item.index, err = recsplit.OpenIndex(path.Join(a.diffDir, fmt.Sprintf("%s.%d-%d.idx", fType.String(), item.startBlock, item.endBlock))); err != nil { return false } totalKeys += item.index.KeyCount() item.getter = item.decompressor.MakeGetter() item.getterMerge = item.decompressor.MakeGetter() item.indexReader = recsplit.NewIndexReader(item.index) item.readerMerge = recsplit.NewIndexReader(item.index) return true }) if fType >= NumberOfStateTypes { return nil } log.Info("Creating arch...", "type", fType.String(), "total keys in all state files", totalKeys) // Allocate arch of double of total keys n := totalKeys * 2 if n < minArch { n = minArch } a.arches[fType] = make([]uint32, n) arch := a.arches[fType] var key []byte h := a.archHasher collisions := 0 a.files[fType].Ascend(func(i btree.Item) bool { item := i.(*byEndBlockItem) g := item.getter g.Reset(0) blockNum := uint32(item.endBlock) for g.HasNext() { key, _ = g.Next(key[:0]) h.Reset() h.Write(key) //nolint:errcheck p, _ := h.Sum128() p = p % n if arch[p] != 0 { collisions++ } arch[p] = blockNum g.Skip() } return true }) log.Info("Created arch", "type", fType.String(), "collisions", collisions) return err } func (a *Aggregator) closeFiles(fType FileType) { a.fileLocks[fType].Lock() defer a.fileLocks[fType].Unlock() a.files[fType].Ascend(func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.decompressor != nil { item.decompressor.Close() } if item.index != nil { item.index.Close() } return true }) } func (a *Aggregator) Close() { close(a.aggChannel) a.aggWg.Wait() // Need to wait for the background aggregation to finish because itsends to merge channels // Drain channel before closing select { case <-a.mergeChannel: default: } close(a.mergeChannel) if a.changesets { // Drain channel before closing select { case <-a.historyChannel: default: } close(a.historyChannel) a.historyWg.Wait() } a.mergeWg.Wait() // Closing state files only after background aggregation goroutine is finished for fType := FirstType; fType < NumberOfTypes; fType++ { a.closeFiles(fType) } } // checkOverlapWithMinStart does not need to lock tree lock, because it is only used in the constructor of Aggregator func checkOverlapWithMinStart(treeName string, tree *btree.BTree, minStart uint64) error { if lastStateI := tree.Max(); lastStateI != nil { item := lastStateI.(*byEndBlockItem) if minStart != math.MaxUint64 && item.endBlock+1 != minStart { return fmt.Errorf("hole or overlap between %s state files and change files [%d-%d]", treeName, item.endBlock, minStart) } } return nil } func (a *Aggregator) readFromFiles(fType FileType, lock bool, blockNum uint64, filekey []byte, trace bool) ([]byte, uint64) { if lock { if fType == Commitment { for lockFType := FirstType; lockFType < NumberOfStateTypes; lockFType++ { a.fileLocks[lockFType].RLock() defer a.fileLocks[lockFType].RUnlock() } } else { a.fileLocks[fType].RLock() defer a.fileLocks[fType].RUnlock() } } h := a.archHasher arch := a.arches[fType] n := uint64(len(arch)) if n > 0 { h.Reset() h.Write(filekey) //nolint:errcheck p, _ := h.Sum128() p = p % n v := uint64(atomic.LoadUint32(&arch[p])) //fmt.Printf("Reading from %s arch key [%x]=%d, %d\n", fType.String(), filekey, p, arch[p]) if v == 0 { return nil, 0 } a.files[fType].AscendGreaterOrEqual(&byEndBlockItem{startBlock: v, endBlock: v}, func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.endBlock < blockNum { blockNum = item.endBlock } return false }) } var val []byte var startBlock uint64 a.files[fType].DescendLessOrEqual(&byEndBlockItem{endBlock: blockNum}, func(i btree.Item) bool { item := i.(*byEndBlockItem) if trace { fmt.Printf("read %s %x: search in file [%d-%d]\n", fType.String(), filekey, item.startBlock, item.endBlock) } if item.tree != nil { ai := item.tree.Get(&AggregateItem{k: filekey}) if ai == nil { return true } val = ai.(*AggregateItem).v startBlock = item.startBlock return false } if item.index.Empty() { return true } offset := item.indexReader.Lookup(filekey) g := item.getter g.Reset(offset) if g.HasNext() { if keyMatch, _ := g.Match(filekey); keyMatch { val, _ = g.Next(nil) if trace { fmt.Printf("read %s %x: found [%x] in file [%d-%d]\n", fType.String(), filekey, val, item.startBlock, item.endBlock) } startBlock = item.startBlock atomic.AddUint64(&a.fileHits, 1) return false } } atomic.AddUint64(&a.fileMisses, 1) return true }) if fType == Commitment { // Transform references if len(val) > 0 { accountPlainKeys, storagePlainKeys, err := commitment.ExtractPlainKeys(val) if err != nil { panic(err) } var transAccountPks [][]byte var transStoragePks [][]byte for _, accountPlainKey := range accountPlainKeys { var apkBuf []byte if len(accountPlainKey) == length.Addr { // Non-optimised key originating from a database record apkBuf = accountPlainKey } else { // Optimised key referencing a state file record (file number and offset within the file) fileI := int(accountPlainKey[0]) offset := decodeU64(accountPlainKey[1:]) apkBuf, _ = a.readByOffset(Account, fileI, offset) } transAccountPks = append(transAccountPks, apkBuf) } for _, storagePlainKey := range storagePlainKeys { var spkBuf []byte if len(storagePlainKey) == length.Addr+length.Hash { // Non-optimised key originating from a database record spkBuf = storagePlainKey } else { // Optimised key referencing a state file record (file number and offset within the file) fileI := int(storagePlainKey[0]) offset := decodeU64(storagePlainKey[1:]) spkBuf, _ = a.readByOffset(Storage, fileI, offset) } transStoragePks = append(transStoragePks, spkBuf) } if val, err = commitment.ReplacePlainKeys(val, transAccountPks, transStoragePks, nil); err != nil { panic(err) } } } return val, startBlock } // readByOffset is assumed to be invoked under a read lock func (a *Aggregator) readByOffset(fType FileType, fileI int, offset uint64) ([]byte, []byte) { var key, val []byte fi := 0 a.files[fType].Ascend(func(i btree.Item) bool { if fi < fileI { fi++ return true } item := i.(*byEndBlockItem) g := item.getter g.Reset(offset) key, _ = g.Next(nil) val, _ = g.Next(nil) return false }) return key, val } func (a *Aggregator) MakeStateReader(blockNum uint64) *Reader { r := &Reader{ a: a, blockNum: blockNum, } return r } type Reader struct { a *Aggregator search AggregateItem blockNum uint64 } func (r *Reader) ReadAccountData(addr []byte, trace bool) []byte { // Look in the summary table first r.search.k = addr if vi := r.a.trees[Account].Get(&r.search); vi != nil { return vi.(*AggregateItem).v } val, _ := r.a.readFromFiles(Account, true /* lock */, r.blockNum, addr, trace) return val } func (r *Reader) ReadAccountStorage(addr []byte, loc []byte, trace bool) *uint256.Int { // Look in the summary table first dbkey := make([]byte, len(addr)+len(loc)) copy(dbkey[0:], addr) copy(dbkey[len(addr):], loc) r.search.k = dbkey var v []byte if vi := r.a.trees[Storage].Get(&r.search); vi != nil { v = vi.(*AggregateItem).v } else { v, _ = r.a.readFromFiles(Storage, true /* lock */, r.blockNum, dbkey, trace) } if v != nil { return new(uint256.Int).SetBytes(v) } return nil } func (r *Reader) ReadAccountCode(addr []byte, trace bool) []byte { // Look in the summary table first r.search.k = addr if vi := r.a.trees[Code].Get(&r.search); vi != nil { return vi.(*AggregateItem).v } // Look in the files val, _ := r.a.readFromFiles(Code, true /* lock */, r.blockNum, addr, trace) return val } func (r *Reader) ReadAccountCodeSize(addr []byte, trace bool) int { // Look in the summary table first r.search.k = addr if vi := r.a.trees[Code].Get(&r.search); vi != nil { return len(vi.(*AggregateItem).v) } // Look in the files. TODO - use specialised function to only lookup size val, _ := r.a.readFromFiles(Code, true /* lock */, r.blockNum, addr, trace) return len(val) } type Writer struct { a *Aggregator search AggregateItem // Aggregate item used to search in trees blockNum uint64 changeFileNum uint64 // Block number associated with the current change files. It is the last block number whose changes will go into that file changes [NumberOfStateTypes]Changes commTree *btree.BTree // BTree used for gathering commitment data } func (a *Aggregator) MakeStateWriter(beforeOn bool) *Writer { w := &Writer{ a: a, commTree: btree.New(32), } for fType := FirstType; fType < NumberOfStateTypes; fType++ { w.changes[fType].Init(fType.String(), a.aggregationStep, a.diffDir, w.a.changesets && fType != Commitment /* we do not unwind commitment ? */) } return w } func (w *Writer) Close() { typesLimit := Commitment if w.a.commitments { typesLimit = AccountHistory } for fType := FirstType; fType < typesLimit; fType++ { w.changes[fType].closeFiles() } } func (w *Writer) Reset(blockNum uint64) error { w.blockNum = blockNum typesLimit := Commitment if w.a.commitments { typesLimit = AccountHistory } if blockNum > w.changeFileNum { for fType := FirstType; fType < typesLimit; fType++ { if err := w.changes[fType].closeFiles(); err != nil { return err } } if w.changeFileNum != 0 { w.a.changesBtree.ReplaceOrInsert(&ChangesItem{startBlock: w.changeFileNum + 1 - w.a.aggregationStep, endBlock: w.changeFileNum, fileCount: 12}) } } if w.changeFileNum == 0 || blockNum > w.changeFileNum { for fType := FirstType; fType < typesLimit; fType++ { if err := w.changes[fType].openFiles(blockNum, true /* write */); err != nil { return err } } w.changeFileNum = blockNum - (blockNum % w.a.aggregationStep) + w.a.aggregationStep - 1 } return nil } type CommitmentItem struct { plainKey []byte hashedKey []byte u commitment.Update } func (i *CommitmentItem) Less(than btree.Item) bool { return bytes.Compare(i.hashedKey, than.(*CommitmentItem).hashedKey) < 0 } func (w *Writer) lockFn() { for fType := FirstType; fType < NumberOfStateTypes; fType++ { w.a.fileLocks[fType].RLock() } } func (w *Writer) unlockFn() { for fType := FirstType; fType < NumberOfStateTypes; fType++ { w.a.fileLocks[fType].RUnlock() } } func (w *Writer) branchFn(prefix []byte) []byte { for lockFType := FirstType; lockFType < NumberOfStateTypes; lockFType++ { w.a.fileLocks[lockFType].RLock() defer w.a.fileLocks[lockFType].RUnlock() } var mergedVal []byte // Look in the summary table first w.search.k = prefix if vi := w.a.trees[Commitment].Get(&w.search); vi != nil { mergedVal = vi.(*AggregateItem).v } // Look in the files and merge, while it becomes complete var startBlock uint64 = w.blockNum + 1 for mergedVal == nil || !commitment.IsComplete(mergedVal) { if startBlock == 0 { panic(fmt.Sprintf("Incomplete branch data prefix [%x], mergeVal=[%x], startBlock=%d\n", commitment.CompactToHex(prefix), mergedVal, startBlock)) } var val []byte val, startBlock = w.a.readFromFiles(Commitment, false /* lock */, startBlock-1, prefix, false /* trace */) if val == nil { if mergedVal == nil { return nil } panic(fmt.Sprintf("Incomplete branch data prefix [%x], mergeVal=[%x], startBlock=%d\n", commitment.CompactToHex(prefix), mergedVal, startBlock)) } var err error //fmt.Printf("Pre-merge prefix [%x] [%x]+[%x], startBlock %d\n", commitment.CompactToHex(prefix), val, mergedVal, startBlock) if mergedVal == nil { mergedVal = val } else if mergedVal, err = commitment.MergeBranches(val, mergedVal, nil); err != nil { panic(err) } //fmt.Printf("Post-merge prefix [%x] [%x], startBlock %d\n", commitment.CompactToHex(prefix), mergedVal, startBlock) } if mergedVal == nil { return nil } //fmt.Printf("Returning branch data prefix [%x], mergeVal=[%x], startBlock=%d\n", commitment.CompactToHex(prefix), mergedVal, startBlock) return mergedVal[2:] // Skip touchMap but keep afterMap } func bytesToUint64(buf []byte) (x uint64) { for i, b := range buf { x = x<<8 + uint64(b) if i == 7 { return } } return } func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) []byte { var enc []byte // Look in the summary table first w.search.k = plainKey if encI := w.a.trees[Account].Get(&w.search); encI != nil { enc = encI.(*AggregateItem).v } else { // Look in the files enc, _ = w.a.readFromFiles(Account, true /* lock */, w.blockNum, plainKey, false /* trace */) } cell.Nonce = 0 cell.Balance.Clear() copy(cell.CodeHash[:], commitment.EmptyCodeHash) if len(enc) > 0 { pos := 0 nonceBytes := int(enc[pos]) pos++ if nonceBytes > 0 { cell.Nonce = bytesToUint64(enc[pos : pos+nonceBytes]) pos += nonceBytes } balanceBytes := int(enc[pos]) pos++ if balanceBytes > 0 { cell.Balance.SetBytes(enc[pos : pos+balanceBytes]) } } w.search.k = plainKey if encI := w.a.trees[Code].Get(&w.search); encI != nil { enc = encI.(*AggregateItem).v } else { // Look in the files enc, _ = w.a.readFromFiles(Code, true /* lock */, w.blockNum, plainKey, false /* trace */) } if len(enc) > 0 { w.a.keccak.Reset() w.a.keccak.Write(enc) w.a.keccak.(io.Reader).Read(cell.CodeHash[:]) } return plainKey } func (w *Writer) storageFn(plainKey []byte, cell *commitment.Cell) []byte { var enc []byte // Look in the summary table first w.search.k = plainKey if encI := w.a.trees[Storage].Get(&w.search); encI != nil { enc = encI.(*AggregateItem).v } else { // Look in the files enc, _ = w.a.readFromFiles(Storage, true /* lock */, w.blockNum, plainKey, false /* trace */) } cell.StorageLen = len(enc) copy(cell.Storage[:], enc) return plainKey } func (w *Writer) captureCommitmentType(fType FileType, trace bool, f func(commTree *btree.BTree, h hash.Hash, key, val []byte)) { lastOffsetKey := 0 lastOffsetVal := 0 for i, offsetKey := range w.changes[fType].keys.wordOffsets { offsetVal := w.changes[fType].after.wordOffsets[i] key := w.changes[fType].keys.words[lastOffsetKey:offsetKey] val := w.changes[fType].after.words[lastOffsetVal:offsetVal] if trace { fmt.Printf("captureCommitmentData %s [%x]=>[%x]\n", fType.String(), key, val) } f(w.commTree, w.a.keccak, key, val) lastOffsetKey = offsetKey lastOffsetVal = offsetVal } } func (w *Writer) captureCommitmentData(trace bool) { if trace { fmt.Printf("captureCommitmentData start w.commTree.Len()=%d\n", w.commTree.Len()) } w.captureCommitmentType(Code, trace, func(commTree *btree.BTree, h hash.Hash, key, val []byte) { h.Reset() h.Write(key) hashedKey := h.Sum(nil) var c = &CommitmentItem{plainKey: common.Copy(key), hashedKey: make([]byte, len(hashedKey)*2)} for i, b := range hashedKey { c.hashedKey[i*2] = (b >> 4) & 0xf c.hashedKey[i*2+1] = b & 0xf } c.u.Flags = commitment.CODE_UPDATE item := commTree.Get(&CommitmentItem{hashedKey: c.hashedKey}) if item != nil { itemC := item.(*CommitmentItem) if itemC.u.Flags&commitment.BALANCE_UPDATE != 0 { c.u.Flags |= commitment.BALANCE_UPDATE c.u.Balance.Set(&itemC.u.Balance) } if itemC.u.Flags&commitment.NONCE_UPDATE != 0 { c.u.Flags |= commitment.NONCE_UPDATE c.u.Nonce = itemC.u.Nonce } if itemC.u.Flags == commitment.DELETE_UPDATE && len(val) == 0 { c.u.Flags = commitment.DELETE_UPDATE } else { h.Reset() h.Write(val) h.(io.Reader).Read(c.u.CodeHashOrStorage[:]) } } else { h.Reset() h.Write(val) h.(io.Reader).Read(c.u.CodeHashOrStorage[:]) } commTree.ReplaceOrInsert(c) }) w.captureCommitmentType(Account, trace, func(commTree *btree.BTree, h hash.Hash, key, val []byte) { h.Reset() h.Write(key) hashedKey := h.Sum(nil) var c = &CommitmentItem{plainKey: common.Copy(key), hashedKey: make([]byte, len(hashedKey)*2)} for i, b := range hashedKey { c.hashedKey[i*2] = (b >> 4) & 0xf c.hashedKey[i*2+1] = b & 0xf } if len(val) == 0 { c.u.Flags = commitment.DELETE_UPDATE } else { c.u.DecodeForStorage(val) c.u.Flags = commitment.BALANCE_UPDATE | commitment.NONCE_UPDATE item := commTree.Get(&CommitmentItem{hashedKey: c.hashedKey}) if item != nil { itemC := item.(*CommitmentItem) if itemC.u.Flags&commitment.CODE_UPDATE != 0 { c.u.Flags |= commitment.CODE_UPDATE copy(c.u.CodeHashOrStorage[:], itemC.u.CodeHashOrStorage[:]) } } } commTree.ReplaceOrInsert(c) }) w.captureCommitmentType(Storage, trace, func(commTree *btree.BTree, h hash.Hash, key, val []byte) { hashedKey := make([]byte, 2*length.Hash) h.Reset() h.Write(key[:length.Addr]) h.(io.Reader).Read(hashedKey[:length.Hash]) h.Reset() h.Write(key[length.Addr:]) h.(io.Reader).Read(hashedKey[length.Hash:]) var c = &CommitmentItem{plainKey: common.Copy(key), hashedKey: make([]byte, len(hashedKey)*2)} for i, b := range hashedKey { c.hashedKey[i*2] = (b >> 4) & 0xf c.hashedKey[i*2+1] = b & 0xf } c.u.ValLength = len(val) if len(val) > 0 { copy(c.u.CodeHashOrStorage[:], val) } if len(val) == 0 { c.u.Flags = commitment.DELETE_UPDATE } else { c.u.Flags = commitment.STORAGE_UPDATE } commTree.ReplaceOrInsert(c) }) if trace { fmt.Printf("captureCommitmentData end w.commTree.Len()=%d\n", w.commTree.Len()) } } // computeCommitment is computing the commitment to the state after // the change would have been applied. // It assumes that the state accessible via the aggregator has already been // modified with the new values // At the moment, it is specific version for hex merkle patricia tree commitment // but it will be extended to support other types of commitments func (w *Writer) computeCommitment(trace bool) ([]byte, error) { if trace { fmt.Printf("computeCommitment w.commTree.Len()=%d\n", w.commTree.Len()) } plainKeys := make([][]byte, w.commTree.Len()) hashedKeys := make([][]byte, w.commTree.Len()) updates := make([]commitment.Update, w.commTree.Len()) j := 0 w.commTree.Ascend(func(i btree.Item) bool { item := i.(*CommitmentItem) plainKeys[j] = item.plainKey hashedKeys[j] = item.hashedKey updates[j] = item.u j++ return true }) w.a.hph.Reset() w.a.hph.ResetFns(w.branchFn, w.accountFn, w.storageFn, w.lockFn, w.unlockFn) w.a.hph.SetTrace(trace) branchNodeUpdates, err := w.a.hph.ProcessUpdates(plainKeys, hashedKeys, updates) if err != nil { return nil, err } for prefixStr, branchNodeUpdate := range branchNodeUpdates { if branchNodeUpdate == nil { continue } prefix := []byte(prefixStr) w.search.k = prefix var prevV *AggregateItem if prevVI := w.a.trees[Commitment].Get(&w.search); prevVI != nil { prevV = prevVI.(*AggregateItem) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Commitment, true /* lock */, w.blockNum, prefix, false) } else { original = prevV.v } if original != nil { var mergedVal []byte if mergedVal, err = commitment.MergeBranches(original, branchNodeUpdate, nil); err == nil { //fmt.Printf("computeCommitment merge [%x] [%x]+[%x]=>[%x]\n", commitment.CompactToHex(prefix), original, branchNodeUpdate, mergedVal) branchNodeUpdate = mergedVal } else { return nil, err } } //fmt.Printf("computeCommitment set [%x] [%x]\n", commitment.CompactToHex(prefix), branchNodeUpdate) if prevV == nil { w.a.trees[Commitment].ReplaceOrInsert(&AggregateItem{k: prefix, v: branchNodeUpdate, count: 1}) } else { prevV.v = branchNodeUpdate prevV.count++ } if len(branchNodeUpdate) == 0 { w.changes[Commitment].delete(prefix, original) } else { if prevV == nil && len(original) == 0 { w.changes[Commitment].insert(prefix, branchNodeUpdate) } else { w.changes[Commitment].update(prefix, original, branchNodeUpdate) } } } var rootHash []byte if rootHash, err = w.a.hph.RootHash(); err != nil { return nil, err } return rootHash, nil } func (w *Writer) FinishTx(txNum uint64, trace bool) error { if w.a.commitments { w.captureCommitmentData(trace) } var err error for fType := FirstType; fType < Commitment; fType++ { if err = w.changes[fType].finish(txNum); err != nil { return fmt.Errorf("finish %sChanges: %w", fType.String(), err) } } return nil } func (w *Writer) ComputeCommitment(trace bool) ([]byte, error) { if !w.a.commitments { return nil, fmt.Errorf("commitments turned off") } comm, err := w.computeCommitment(trace) if err != nil { return nil, fmt.Errorf("compute commitment: %w", err) } w.commTree.Clear(true) if err = w.changes[Commitment].finish(w.blockNum); err != nil { return nil, fmt.Errorf("finish commChanges: %w", err) } return comm, nil } // Aggegate should be called to check if the aggregation is required, and // if it is required, perform it func (w *Writer) Aggregate(trace bool) error { if w.blockNum < w.a.unwindLimit+w.a.aggregationStep-1 { return nil } diff := w.blockNum - w.a.unwindLimit if (diff+1)%w.a.aggregationStep != 0 { return nil } if err := w.aggregateUpto(diff+1-w.a.aggregationStep, diff); err != nil { return fmt.Errorf("aggregateUpto(%d, %d): %w", diff+1-w.a.aggregationStep, diff, err) } return nil } func (w *Writer) UpdateAccountData(addr []byte, account []byte, trace bool) { var prevV *AggregateItem w.search.k = addr if prevVI := w.a.trees[Account].Get(&w.search); prevVI != nil { prevV = prevVI.(*AggregateItem) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Account, true /* lock */, w.blockNum, addr, trace) } else { original = prevV.v } if bytes.Equal(account, original) { // No change return } if prevV == nil { w.a.trees[Account].ReplaceOrInsert(&AggregateItem{k: addr, v: account, count: 1}) } else { prevV.v = account prevV.count++ } if prevV == nil && len(original) == 0 { w.changes[Account].insert(addr, account) } else { w.changes[Account].update(addr, original, account) } if trace { w.a.trace = true w.a.tracedKeys[string(addr)] = struct{}{} } } func (w *Writer) UpdateAccountCode(addr []byte, code []byte, trace bool) { var prevV *AggregateItem w.search.k = addr if prevVI := w.a.trees[Code].Get(&w.search); prevVI != nil { prevV = prevVI.(*AggregateItem) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Code, true /* lock */, w.blockNum, addr, trace) } else { original = prevV.v } if prevV == nil { w.a.trees[Code].ReplaceOrInsert(&AggregateItem{k: addr, v: code, count: 1}) } else { prevV.v = code prevV.count++ } if prevV == nil && len(original) == 0 { w.changes[Code].insert(addr, code) } else { w.changes[Code].update(addr, original, code) } if trace { w.a.trace = true w.a.tracedKeys[string(addr)] = struct{}{} } } type CursorType uint8 const ( FILE_CURSOR CursorType = iota TREE_CURSOR ) // CursorItem is the item in the priority queue used to do merge interation // over storage of a given account type CursorItem struct { t CursorType // Whether this item represents state file or DB record, or tree endBlock uint64 key, val []byte dg *compress.Getter tree *btree.BTree } type CursorHeap []*CursorItem func (ch CursorHeap) Len() int { return len(ch) } func (ch CursorHeap) Less(i, j int) bool { cmp := bytes.Compare(ch[i].key, ch[j].key) if cmp == 0 { // when keys match, the items with later blocks are preferred return ch[i].endBlock > ch[j].endBlock } return cmp < 0 } func (ch *CursorHeap) Swap(i, j int) { (*ch)[i], (*ch)[j] = (*ch)[j], (*ch)[i] } func (ch *CursorHeap) Push(x interface{}) { *ch = append(*ch, x.(*CursorItem)) } func (ch *CursorHeap) Pop() interface{} { old := *ch n := len(old) x := old[n-1] *ch = old[0 : n-1] return x } func (w *Writer) deleteAccount(addr []byte, trace bool) bool { var prevV *AggregateItem w.search.k = addr if prevVI := w.a.trees[Account].Get(&w.search); prevVI != nil { prevV = prevVI.(*AggregateItem) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Account, true /* lock */, w.blockNum, addr, trace) if original == nil { return false } } else { original = prevV.v } if prevV == nil { w.a.trees[Account].ReplaceOrInsert(&AggregateItem{k: addr, v: nil, count: 1}) } else { prevV.v = nil prevV.count++ } w.changes[Account].delete(addr, original) return true } func (w *Writer) deleteCode(addr []byte, trace bool) { var prevV *AggregateItem w.search.k = addr if prevVI := w.a.trees[Code].Get(&w.search); prevVI != nil { prevV = prevVI.(*AggregateItem) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Code, true /* lock */, w.blockNum, addr, trace) if original == nil { // Nothing to do return } } else { original = prevV.v } if prevV == nil { w.a.trees[Code].ReplaceOrInsert(&AggregateItem{k: addr, v: nil, count: 1}) } else { prevV.v = nil prevV.count++ } w.changes[Code].delete(addr, original) } func (w *Writer) DeleteAccount(addr []byte, trace bool) { if deleted := w.deleteAccount(addr, trace); !deleted { return } w.a.fileLocks[Storage].RLock() defer w.a.fileLocks[Storage].RUnlock() w.deleteCode(addr, trace) // Find all storage items for this address var cp CursorHeap heap.Init(&cp) w.search.k = addr foundInTree := false var k, v []byte w.a.trees[Storage].AscendGreaterOrEqual(&w.search, func(i btree.Item) bool { item := i.(*AggregateItem) if bytes.HasPrefix(item.k, addr) { foundInTree = true k = item.k v = item.v } return false }) if foundInTree { heap.Push(&cp, &CursorItem{t: TREE_CURSOR, key: common.Copy(k), val: common.Copy(v), tree: w.a.trees[Storage], endBlock: w.blockNum}) } w.a.files[Storage].Ascend(func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.tree != nil { item.tree.AscendGreaterOrEqual(&AggregateItem{k: addr}, func(ai btree.Item) bool { aitem := ai.(*AggregateItem) if !bytes.HasPrefix(aitem.k, addr) { return false } if len(aitem.k) == len(addr) { return true } heap.Push(&cp, &CursorItem{t: TREE_CURSOR, key: aitem.k, val: aitem.v, tree: item.tree, endBlock: item.endBlock}) return false }) return true } if item.index.Empty() { return true } offset := item.indexReader.Lookup(addr) g := item.getter g.Reset(offset) if g.HasNext() { if keyMatch, _ := g.Match(addr); !keyMatch { //fmt.Printf("DeleteAccount %x - not found anchor in file [%d-%d]\n", addr, item.startBlock, item.endBlock) return true } g.Skip() } if g.HasNext() { key, _ := g.Next(nil) if bytes.HasPrefix(key, addr) { val, _ := g.Next(nil) heap.Push(&cp, &CursorItem{t: FILE_CURSOR, key: key, val: val, dg: g, endBlock: item.endBlock}) } } return true }) for cp.Len() > 0 { lastKey := common.Copy(cp[0].key) lastVal := common.Copy(cp[0].val) // Advance all the items that have this key (including the top) for cp.Len() > 0 && bytes.Equal(cp[0].key, lastKey) { ci1 := cp[0] switch ci1.t { case FILE_CURSOR: if ci1.dg.HasNext() { ci1.key, _ = ci1.dg.Next(ci1.key[:0]) if bytes.HasPrefix(ci1.key, addr) { ci1.val, _ = ci1.dg.Next(ci1.val[:0]) heap.Fix(&cp, 0) } else { heap.Pop(&cp) } } else { heap.Pop(&cp) } case TREE_CURSOR: skip := true var aitem *AggregateItem ci1.tree.AscendGreaterOrEqual(&AggregateItem{k: ci1.key}, func(ai btree.Item) bool { if skip { skip = false return true } aitem = ai.(*AggregateItem) return false }) if aitem != nil && bytes.HasPrefix(aitem.k, addr) { ci1.key = aitem.k ci1.val = aitem.v heap.Fix(&cp, 0) } else { heap.Pop(&cp) } } } var prevV *AggregateItem w.search.k = lastKey if prevVI := w.a.trees[Storage].Get(&w.search); prevVI != nil { prevV = prevVI.(*AggregateItem) } if prevV == nil { w.a.trees[Storage].ReplaceOrInsert(&AggregateItem{k: lastKey, v: nil, count: 1}) } else { prevV.v = nil prevV.count++ } w.changes[Storage].delete(lastKey, lastVal) } if trace { w.a.trace = true w.a.tracedKeys[string(addr)] = struct{}{} } } func (w *Writer) WriteAccountStorage(addr []byte, loc []byte, value *uint256.Int, trace bool) { dbkey := make([]byte, len(addr)+len(loc)) copy(dbkey[0:], addr) copy(dbkey[len(addr):], loc) w.search.k = dbkey var prevV *AggregateItem if prevVI := w.a.trees[Storage].Get(&w.search); prevVI != nil { prevV = prevVI.(*AggregateItem) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Storage, true /* lock */, w.blockNum, dbkey, trace) } else { original = prevV.v } vLen := value.ByteLen() v := make([]byte, vLen) value.WriteToSlice(v) if bytes.Equal(v, original) { // No change return } if prevV == nil { w.a.trees[Storage].ReplaceOrInsert(&AggregateItem{k: dbkey, v: v, count: 1}) } else { prevV.v = v prevV.count++ } if prevV == nil && len(original) == 0 { w.changes[Storage].insert(dbkey, v) } else { w.changes[Storage].update(dbkey, original, v) } if trace { w.a.trace = true w.a.tracedKeys[string(dbkey)] = struct{}{} } } // findLargestMerge looks through the state files of the speficied type and determines the largest merge that can be undertaken // a state file block [a; b] is valid if its length is a divisor of its starting block, or `(b-a+1) = 0 mod a` func (a *Aggregator) findLargestMerge(fType FileType, maxTo uint64, maxSpan uint64) (toAggregate []*byEndBlockItem, pre []*byEndBlockItem, post []*byEndBlockItem, aggFrom uint64, aggTo uint64) { a.fileLocks[fType].RLock() defer a.fileLocks[fType].RUnlock() var maxEndBlock uint64 a.files[fType].DescendLessOrEqual(&byEndBlockItem{endBlock: maxTo}, func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.decompressor == nil { return true } maxEndBlock = item.endBlock return false }) if maxEndBlock == 0 { return } a.files[fType].Ascend(func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.decompressor == nil { return true // Skip B-tree based items } pre = append(pre, item) if aggTo == 0 { var doubleEnd uint64 nextDouble := item.endBlock for nextDouble <= maxEndBlock && nextDouble-item.startBlock < maxSpan { doubleEnd = nextDouble nextDouble = doubleEnd + (doubleEnd - item.startBlock) + 1 } if doubleEnd != item.endBlock { aggFrom = item.startBlock aggTo = doubleEnd } else { post = append(post, item) return true } } toAggregate = append(toAggregate, item) return item.endBlock < aggTo }) return } func (a *Aggregator) computeAggregation(fType FileType, toAggregate []*byEndBlockItem, aggFrom uint64, aggTo uint64, valTransform func(val []byte, transValBuf []byte) ([]byte, error), mergeFunc func(preval, val, buf []byte) ([]byte, error), valCompressed bool, withIndex bool, prefixLen int) (*byEndBlockItem, error) { var item2 = &byEndBlockItem{startBlock: aggFrom, endBlock: aggTo} var cp CursorHeap heap.Init(&cp) for _, ag := range toAggregate { g := ag.decompressor.MakeGetter() g.Reset(0) if g.HasNext() { key, _ := g.Next(nil) val, _ := g.Next(nil) heap.Push(&cp, &CursorItem{t: FILE_CURSOR, dg: g, key: key, val: val, endBlock: ag.endBlock}) } } var err error var count int if item2.decompressor, count, err = a.mergeIntoStateFile(&cp, prefixLen, fType, aggFrom, aggTo, a.diffDir, valTransform, mergeFunc, valCompressed); err != nil { return nil, fmt.Errorf("mergeIntoStateFile %s [%d-%d]: %w", fType.String(), aggFrom, aggTo, err) } item2.getter = item2.decompressor.MakeGetter() item2.getterMerge = item2.decompressor.MakeGetter() if withIndex { idxPath := filepath.Join(a.diffDir, fmt.Sprintf("%s.%d-%d.idx", fType.String(), aggFrom, aggTo)) if item2.index, err = buildIndex(item2.decompressor, idxPath, a.diffDir, count); err != nil { return nil, fmt.Errorf("mergeIntoStateFile buildIndex %s [%d-%d]: %w", fType.String(), aggFrom, aggTo, err) } item2.indexReader = recsplit.NewIndexReader(item2.index) item2.readerMerge = recsplit.NewIndexReader(item2.index) } return item2, nil } func createDatAndIndex(treeName string, diffDir string, bt *btree.BTree, blockFrom uint64, blockTo uint64) (*compress.Decompressor, *recsplit.Index, error) { datPath := filepath.Join(diffDir, fmt.Sprintf("%s.%d-%d.dat", treeName, blockFrom, blockTo)) idxPath := filepath.Join(diffDir, fmt.Sprintf("%s.%d-%d.idx", treeName, blockFrom, blockTo)) count, err := btreeToFile(bt, datPath, diffDir, false /* trace */, 1 /* workers */) if err != nil { return nil, nil, fmt.Errorf("createDatAndIndex %s build btree: %w", treeName, err) } var d *compress.Decompressor if d, err = compress.NewDecompressor(datPath); err != nil { return nil, nil, fmt.Errorf("createDatAndIndex %s decompressor: %w", treeName, err) } var index *recsplit.Index if index, err = buildIndex(d, idxPath, diffDir, count); err != nil { return nil, nil, fmt.Errorf("createDatAndIndex %s buildIndex: %w", treeName, err) } return d, index, nil } func (a *Aggregator) addLocked(fType FileType, item *byEndBlockItem) { a.fileLocks[fType].Lock() defer a.fileLocks[fType].Unlock() a.files[fType].ReplaceOrInsert(item) } func (w *Writer) aggregateUpto(blockFrom, blockTo uint64) error { // React on any previous error of aggregation or merge select { case err := <-w.a.aggError: return err case err := <-w.a.mergeError: return err case err := <-w.a.historyError: return err default: } typesLimit := Commitment if w.a.commitments { typesLimit = AccountHistory } t := time.Now() i := w.a.changesBtree.Get(&ChangesItem{startBlock: blockFrom, endBlock: blockTo}) if i == nil { return fmt.Errorf("did not find change files for [%d-%d], w.a.changesBtree.Len() = %d", blockFrom, blockTo, w.a.changesBtree.Len()) } item := i.(*ChangesItem) if item.startBlock != blockFrom { return fmt.Errorf("expected change files[%d-%d], got [%d-%d]", blockFrom, blockTo, item.startBlock, item.endBlock) } w.a.changesBtree.Delete(i) var aggTask AggregationTask for fType := FirstType; fType < typesLimit; fType++ { aggTask.changes[fType].Init(fType.String(), w.a.aggregationStep, w.a.diffDir, w.a.changesets && fType != Commitment) } var err error for fType := FirstType; fType < typesLimit; fType++ { var prefixLen int if fType == Storage { prefixLen = length.Addr } if aggTask.bt[fType], err = aggTask.changes[fType].aggregate(blockFrom, blockTo, prefixLen, w.a.trees[fType], fType == Commitment); err != nil { return fmt.Errorf("aggregate %sChanges: %w", fType.String(), err) } } aggTask.blockFrom = blockFrom aggTask.blockTo = blockTo aggTime := time.Since(t) t = time.Now() // At this point, all the changes are gathered in 4 B-trees (accounts, code, storage and commitment) and removed from the database // What follows can be done in the 1st background goroutine w.a.aggChannel <- &aggTask <-w.a.aggBackCh // Waiting for the B-tree based items have been added handoverTime := time.Since(t) if handoverTime > time.Second { log.Info("Long handover to background aggregation", "from", blockFrom, "to", blockTo, "composition", aggTime, "handover", time.Since(t)) } return nil } // mergeIntoStateFile assumes that all entries in the cp heap have type FILE_CURSOR func (a *Aggregator) mergeIntoStateFile(cp *CursorHeap, prefixLen int, fType FileType, startBlock, endBlock uint64, dir string, valTransform func(val []byte, transValBuf []byte) ([]byte, error), mergeFunc func(preval, val, buf []byte) ([]byte, error), valCompressed bool, ) (*compress.Decompressor, int, error) { datPath := filepath.Join(dir, fmt.Sprintf("%s.%d-%d.dat", fType.String(), startBlock, endBlock)) comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datPath, dir, compress.MinPatternScore, 1) if err != nil { return nil, 0, fmt.Errorf("compressor %s: %w", datPath, err) } defer comp.Close() count := 0 // In the loop below, the pair `keyBuf=>valBuf` is always 1 item behind `lastKey=>lastVal`. // `lastKey` and `lastVal` are taken from the top of the multi-way merge (assisted by the CursorHeap cp), but not processed right away // instead, the pair from the previous iteration is processed first - `keyBuf=>valBuf`. After that, `keyBuf` and `valBuf` are assigned // to `lastKey` and `lastVal` correspondingly, and the next step of multi-way merge happens. Therefore, after the multi-way merge loop // (when CursorHeap cp is empty), there is a need to process the last pair `keyBuf=>valBuf`, because it was one step behind var keyBuf, valBuf, transValBuf []byte for cp.Len() > 0 { lastKey := common.Copy((*cp)[0].key) lastVal := common.Copy((*cp)[0].val) var mergedOnce bool if a.trace { if _, ok := a.tracedKeys[string(lastKey)]; ok { fmt.Printf("looking at key %x val [%x] endBlock %d to merge into [%d-%d]\n", lastKey, lastVal, (*cp)[0].endBlock, startBlock, endBlock) } } // Advance all the items that have this key (including the top) for cp.Len() > 0 && bytes.Equal((*cp)[0].key, lastKey) { ci1 := (*cp)[0] if a.trace { if _, ok := a.tracedKeys[string(ci1.key)]; ok { fmt.Printf("skipping same key %x val [%x] endBlock %d to merge into [%d-%d]\n", ci1.key, ci1.val, ci1.endBlock, startBlock, endBlock) } } if ci1.t != FILE_CURSOR { return nil, 0, fmt.Errorf("mergeIntoStateFile: cursor of unexpected type: %d", ci1.t) } if mergedOnce { //fmt.Printf("mergeIntoStateFile pre-merge prefix [%x], [%x]+[%x]\n", commitment.CompactToHex(lastKey), ci1.val, lastVal) if lastVal, err = mergeFunc(ci1.val, lastVal, nil); err != nil { return nil, 0, fmt.Errorf("mergeIntoStateFile: merge values: %w", err) } //fmt.Printf("mergeIntoStateFile post-merge prefix [%x], [%x]\n", commitment.CompactToHex(lastKey), lastVal) } else { mergedOnce = true } if ci1.dg.HasNext() { ci1.key, _ = ci1.dg.Next(ci1.key[:0]) if valCompressed { ci1.val, _ = ci1.dg.Next(ci1.val[:0]) } else { ci1.val, _ = ci1.dg.NextUncompressed() } heap.Fix(cp, 0) } else { heap.Pop(cp) } } var skip bool switch fType { case Storage: // Inside storage files, there is a special item with empty value, and the key equal to the contract's address // This special item is inserted before the contract storage items, in order to find them using un-ordered index // (for the purposes of SELF-DESTRUCT and some RPC methods that require enumeration of contract storage) // We will only skip this special item if there are no more corresponding storage items left // (this is checked further down with `bytes.HasPrefix(lastKey, keyBuf)`) skip = startBlock == 0 && len(lastVal) == 0 && len(lastKey) != prefixLen case Commitment: // For commitments, the 3rd and 4th bytes of the value (zero-based 2 and 3) contain so-called `afterMap` // Its bit are set for children that are present in the tree, and unset for those that are not (deleted, for example) // If all bits are zero (check below), this branch can be skipped, since it is empty skip = startBlock == 0 && len(lastVal) >= 4 && lastVal[2] == 0 && lastVal[3] == 0 case AccountHistory, StorageHistory, CodeHistory: skip = false default: // For the rest of types, empty value means deletion skip = startBlock == 0 && len(lastVal) == 0 } if skip { // Deleted marker can be skipped if we merge into the first file, except for the storage addr marker if _, ok := a.tracedKeys[string(keyBuf)]; ok { fmt.Printf("skipped key %x for [%d-%d]\n", keyBuf, startBlock, endBlock) } } else { // The check `bytes.HasPrefix(lastKey, keyBuf)` is checking whether the `lastKey` is the first item // of some contract's storage, and `keyBuf` (the item just before that) is the special item with the // key being contract's address. If so, the special item (keyBuf => []) needs to be preserved if keyBuf != nil && (prefixLen == 0 || len(keyBuf) != prefixLen || bytes.HasPrefix(lastKey, keyBuf)) { if err = comp.AddWord(keyBuf); err != nil { return nil, 0, err } if a.trace { if _, ok := a.tracedKeys[string(keyBuf)]; ok { fmt.Printf("merge key %x val [%x] into [%d-%d]\n", keyBuf, valBuf, startBlock, endBlock) } } count++ // Only counting keys, not values if valTransform != nil { if transValBuf, err = valTransform(valBuf, transValBuf[:0]); err != nil { return nil, 0, fmt.Errorf("mergeIntoStateFile valTransform [%x]: %w", valBuf, err) } if err = comp.AddWord(transValBuf); err != nil { return nil, 0, err } } else if valCompressed { if err = comp.AddWord(valBuf); err != nil { return nil, 0, err } } else { if err = comp.AddUncompressedWord(valBuf); err != nil { return nil, 0, err } } //if fType == AccountHistory { // fmt.Printf("merge %s.%d-%d [%x]=>[%x]\n", fType.String(), startBlock, endBlock, keyBuf, valBuf) //} } keyBuf = append(keyBuf[:0], lastKey...) valBuf = append(valBuf[:0], lastVal...) } } if keyBuf != nil { if err = comp.AddWord(keyBuf); err != nil { return nil, 0, err } if a.trace { if _, ok := a.tracedKeys[string(keyBuf)]; ok { fmt.Printf("merge key %x val [%x] into [%d-%d]\n", keyBuf, valBuf, startBlock, endBlock) } } count++ // Only counting keys, not values if valTransform != nil { if transValBuf, err = valTransform(valBuf, transValBuf[:0]); err != nil { return nil, 0, fmt.Errorf("mergeIntoStateFile valTransform [%x]: %w", valBuf, err) } if err = comp.AddWord(transValBuf); err != nil { return nil, 0, err } } else if valCompressed { if err = comp.AddWord(valBuf); err != nil { return nil, 0, err } } else { if err = comp.AddUncompressedWord(valBuf); err != nil { return nil, 0, err } } //if fType == AccountHistory { // fmt.Printf("merge %s.%d-%d [%x]=>[%x]\n", fType.String(), startBlock, endBlock, keyBuf, valBuf) //} } if err = comp.Compress(); err != nil { return nil, 0, err } var d *compress.Decompressor if d, err = compress.NewDecompressor(datPath); err != nil { return nil, 0, fmt.Errorf("decompressor: %w", err) } return d, count, nil } func (a *Aggregator) stats(fType FileType) (count int, datSize, idxSize int64) { a.fileLocks[fType].RLock() defer a.fileLocks[fType].RUnlock() count = 0 datSize = 0 idxSize = 0 a.files[fType].Ascend(func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.decompressor != nil { count++ datSize += item.decompressor.Size() count++ idxSize += item.index.Size() } return true }) return } type FilesStats struct { AccountsCount int AccountsDatSize int64 AccountsIdxSize int64 CodeCount int CodeDatSize int64 CodeIdxSize int64 StorageCount int StorageDatSize int64 StorageIdxSize int64 CommitmentCount int CommitmentDatSize int64 CommitmentIdxSize int64 Hits uint64 Misses uint64 } func (a *Aggregator) Stats() FilesStats { var fs FilesStats fs.AccountsCount, fs.AccountsDatSize, fs.AccountsIdxSize = a.stats(Account) fs.CodeCount, fs.CodeDatSize, fs.CodeIdxSize = a.stats(Code) fs.StorageCount, fs.StorageDatSize, fs.StorageIdxSize = a.stats(Storage) fs.CommitmentCount, fs.CommitmentDatSize, fs.CommitmentIdxSize = a.stats(Commitment) fs.Hits = atomic.LoadUint64(&a.fileHits) fs.Misses = atomic.LoadUint64(&a.fileMisses) return fs }