/* Copyright 2022 Erigon contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package aggregator import ( "bufio" "bytes" "container/heap" "context" "encoding/binary" "errors" "fmt" "hash" "io" "io/fs" "math" "os" "path" "path/filepath" "regexp" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/RoaringBitmap/roaring/roaring64" "github.com/google/btree" "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/log/v3" "github.com/spaolacci/murmur3" "golang.org/x/crypto/sha3" "golang.org/x/exp/slices" "github.com/ledgerwatch/erigon-lib/commitment" "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/length" "github.com/ledgerwatch/erigon-lib/compress" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/recsplit" "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32" ) // Aggregator of multiple state files to support state reader and state writer // The convension for the file names are as follows // State is composed of three types of files: // 1. Accounts. keys are addresses (20 bytes), values are encoding of accounts // 2. Contract storage. Keys are concatenation of addresses (20 bytes) and storage locations (32 bytes), values have their leading zeroes removed // 3. Contract codes. Keys are addresses (20 bytes), values are bycodes // Within each type, any file can cover an interval of block numbers, for example, `accounts.1-16` represents changes in accounts // that were effected by the blocks from 1 to 16, inclusively. The second component of the interval will be called "end block" for the file. // Finally, for each type and interval, there are two files - one with the compressed data (extension `dat`), // and another with the index (extension `idx`) consisting of the minimal perfect hash table mapping keys to the offsets of corresponding keys // in the data file // Aggregator consists (apart from the file it is aggregating) of the 4 parts: // 1. Persistent table of expiration time for each of the files. Key - name of the file, value - timestamp, at which the file can be removed // 2. Transient (in-memory) mapping the "end block" of each file to the objects required for accessing the file (compress.Decompressor and resplit.Index) // 3. Persistent tables (one for accounts, one for contract storage, and one for contract code) summarising all the 1-block state diff files // that were not yet merged together to form larger files. In these tables, keys are the same as keys in the state diff files, but values are also // augemented by the number of state diff files this key is present. This number gets decremented every time when a 1-block state diff files is removed // from the summary table (due to being merged). And when this number gets to 0, the record is deleted from the summary table. // This number is encoded into first 4 bytes of the value // 4. Aggregating persistent hash table. Maps state keys to the block numbers for the use in the part 2 (which is not necessarily the block number where // the item last changed, but it is guaranteed to find correct element in the Transient mapping of part 2 type FileType int const ( Account FileType = iota Storage Code Commitment AccountHistory StorageHistory CodeHistory AccountBitmap StorageBitmap CodeBitmap NumberOfTypes ) const ( FirstType = Account NumberOfAccountStorageTypes = Code NumberOfStateTypes = AccountHistory ) func (ft FileType) String() string { switch ft { case Account: return "account" case Storage: return "storage" case Code: return "code" case Commitment: return "commitment" case AccountHistory: return "ahistory" case CodeHistory: return "chistory" case StorageHistory: return "shistory" case AccountBitmap: return "abitmap" case CodeBitmap: return "cbitmap" case StorageBitmap: return "sbitmap" default: panic(fmt.Sprintf("unknown file type: %d", ft)) } } func (ft FileType) Table() string { switch ft { case Account: return kv.StateAccounts case Storage: return kv.StateStorage case Code: return kv.StateCode case Commitment: return kv.StateCommitment default: panic(fmt.Sprintf("unknown file type: %d", ft)) } } func ParseFileType(s string) (FileType, bool) { switch s { case "account": return Account, true case "storage": return Storage, true case "code": return Code, true case "commitment": return Commitment, true case "ahistory": return AccountHistory, true case "chistory": return CodeHistory, true case "shistory": return StorageHistory, true case "abitmap": return AccountBitmap, true case "cbitmap": return CodeBitmap, true case "sbitmap": return StorageBitmap, true default: return NumberOfTypes, false } } type Aggregator struct { files [NumberOfTypes]*btree.BTree hph commitment.Trie //*commitment.HexPatriciaHashed archHasher murmur3.Hash128 keccak hash.Hash historyChannel chan struct{} mergeChannel chan struct{} tracedKeys map[string]struct{} // Set of keys being traced during aggregations changesBtree *btree.BTree // btree of ChangesItem historyError chan error mergeError chan error aggChannel chan *AggregationTask aggError chan error diffDir string // Directory where the state diff files are stored arches [NumberOfStateTypes][]uint32 // Over-arching hash tables containing the block number of last aggregation historyWg sync.WaitGroup aggWg sync.WaitGroup mergeWg sync.WaitGroup unwindLimit uint64 // How far the chain may unwind aggregationStep uint64 // How many items (block, but later perhaps txs or changes) are required to form one state diff file fileHits uint64 // Counter for state file hit ratio fileMisses uint64 // Counter for state file hit ratio fileLocks [NumberOfTypes]sync.RWMutex commitments bool // Whether to calculate commitments changesets bool // Whether to generate changesets (off by default) trace bool // Turns on tracing for specific accounts and locations } type ChangeFile struct { r *bufio.Reader rTx *bufio.Reader w *bufio.Writer fileTx *os.File wTx *bufio.Writer file *os.File pathTx string path string dir string namebase string words []byte // Words pending for the next block record, in the same slice wordOffsets []int // Offsets of words in the `words` slice step uint64 txNum uint64 // Currently read transaction number txRemaining uint64 // Remaining number of bytes to read in the current transaction } func (cf *ChangeFile) closeFile() error { if len(cf.wordOffsets) > 0 { return fmt.Errorf("closeFile without finish") } if cf.w != nil { if err := cf.w.Flush(); err != nil { return err } cf.w = nil } if cf.file != nil { if err := cf.file.Close(); err != nil { return err } cf.file = nil } if cf.wTx != nil { if err := cf.wTx.Flush(); err != nil { return err } cf.wTx = nil } if cf.fileTx != nil { if err := cf.fileTx.Close(); err != nil { return err } cf.fileTx = nil } return nil } func (cf *ChangeFile) openFile(blockNum uint64, write bool) error { if len(cf.wordOffsets) > 0 { return fmt.Errorf("openFile without finish") } rem := blockNum % cf.step startBlock := blockNum - rem endBlock := startBlock + cf.step - 1 if cf.w == nil { cf.path = filepath.Join(cf.dir, fmt.Sprintf("%s.%d-%d.chg", cf.namebase, startBlock, endBlock)) cf.pathTx = filepath.Join(cf.dir, fmt.Sprintf("%s.%d-%d.ctx", cf.namebase, startBlock, endBlock)) var err error if write { if cf.file, err = os.OpenFile(cf.path, os.O_RDWR|os.O_CREATE, 0755); err != nil { return err } if cf.fileTx, err = os.OpenFile(cf.pathTx, os.O_RDWR|os.O_CREATE, 0755); err != nil { return err } if _, err = cf.file.Seek(0, 2 /* relative to the end of the file */); err != nil { return err } if _, err = cf.fileTx.Seek(0, 2 /* relative to the end of the file */); err != nil { return err } } else { if cf.file, err = os.Open(cf.path); err != nil { return err } if cf.fileTx, err = os.Open(cf.pathTx); err != nil { return err } } if write { cf.w = bufio.NewWriter(cf.file) cf.wTx = bufio.NewWriter(cf.fileTx) } cf.r = bufio.NewReader(cf.file) cf.rTx = bufio.NewReader(cf.fileTx) } return nil } func (cf *ChangeFile) rewind() error { var err error if _, err = cf.file.Seek(0, 0); err != nil { return err } cf.r = bufio.NewReader(cf.file) if _, err = cf.fileTx.Seek(0, 0); err != nil { return err } cf.rTx = bufio.NewReader(cf.fileTx) return nil } func (cf *ChangeFile) add(word []byte) { cf.words = append(cf.words, word...) cf.wordOffsets = append(cf.wordOffsets, len(cf.words)) } func (cf *ChangeFile) finish(txNum uint64) error { var numBuf [10]byte // Write out words lastOffset := 0 var size uint64 for _, offset := range cf.wordOffsets { word := cf.words[lastOffset:offset] n := binary.PutUvarint(numBuf[:], uint64(len(word))) if _, err := cf.w.Write(numBuf[:n]); err != nil { return err } if len(word) > 0 { if _, err := cf.w.Write(word); err != nil { return err } } size += uint64(n + len(word)) lastOffset = offset } cf.words = cf.words[:0] cf.wordOffsets = cf.wordOffsets[:0] n := binary.PutUvarint(numBuf[:], txNum) if _, err := cf.wTx.Write(numBuf[:n]); err != nil { return err } n = binary.PutUvarint(numBuf[:], size) if _, err := cf.wTx.Write(numBuf[:n]); err != nil { return err } return nil } // prevTx positions the reader to the beginning // of the transaction func (cf *ChangeFile) nextTx() (bool, error) { var err error if cf.txNum, err = binary.ReadUvarint(cf.rTx); err != nil { if errors.Is(err, io.EOF) { return false, nil } return false, err } if cf.txRemaining, err = binary.ReadUvarint(cf.rTx); err != nil { return false, err } return true, nil } func (cf *ChangeFile) nextWord(wordBuf []byte) ([]byte, bool, error) { if cf.txRemaining == 0 { return wordBuf, false, nil } ws, err := binary.ReadUvarint(cf.r) if err != nil { return wordBuf, false, fmt.Errorf("word size: %w", err) } var buf []byte if total := len(wordBuf) + int(ws); cap(wordBuf) >= total { buf = wordBuf[:total] // Reuse the space in wordBuf, is it has enough capacity } else { buf = make([]byte, total) copy(buf, wordBuf) } if _, err = io.ReadFull(cf.r, buf[len(wordBuf):]); err != nil { return wordBuf, false, fmt.Errorf("read word (%d %d): %w", ws, len(buf[len(wordBuf):]), err) } var numBuf [10]byte n := binary.PutUvarint(numBuf[:], ws) cf.txRemaining -= uint64(n) + ws return buf, true, nil } func (cf *ChangeFile) deleteFile() error { if err := os.Remove(cf.path); err != nil { return err } if err := os.Remove(cf.pathTx); err != nil { return err } return nil } type Changes struct { namebase string dir string keys ChangeFile before ChangeFile after ChangeFile step uint64 beforeOn bool } func (c *Changes) Init(namebase string, step uint64, dir string, beforeOn bool) { c.namebase = namebase c.step = step c.dir = dir c.keys.namebase = namebase + ".keys" c.keys.dir = dir c.keys.step = step c.before.namebase = namebase + ".before" c.before.dir = dir c.before.step = step c.after.namebase = namebase + ".after" c.after.dir = dir c.after.step = step c.beforeOn = beforeOn } func (c *Changes) closeFiles() error { if err := c.keys.closeFile(); err != nil { return err } if c.beforeOn { if err := c.before.closeFile(); err != nil { return err } } if err := c.after.closeFile(); err != nil { return err } return nil } func (c *Changes) openFiles(blockNum uint64, write bool) error { if err := c.keys.openFile(blockNum, write); err != nil { return err } if c.beforeOn { if err := c.before.openFile(blockNum, write); err != nil { return err } } if err := c.after.openFile(blockNum, write); err != nil { return err } return nil } func (c *Changes) insert(key, after []byte) { c.keys.add(key) if c.beforeOn { c.before.add(nil) } c.after.add(after) } func (c *Changes) update(key, before, after []byte) { c.keys.add(key) if c.beforeOn { c.before.add(before) } c.after.add(after) } func (c *Changes) delete(key, before []byte) { c.keys.add(key) if c.beforeOn { c.before.add(before) } c.after.add(nil) } func (c *Changes) finish(txNum uint64) error { if err := c.keys.finish(txNum); err != nil { return err } if c.beforeOn { if err := c.before.finish(txNum); err != nil { return err } } if err := c.after.finish(txNum); err != nil { return err } return nil } func (c *Changes) nextTx() (bool, uint64, error) { bkeys, err := c.keys.nextTx() if err != nil { return false, 0, err } var bbefore, bafter bool if c.beforeOn { if bbefore, err = c.before.nextTx(); err != nil { return false, 0, err } } if bafter, err = c.after.nextTx(); err != nil { return false, 0, err } if c.beforeOn && bkeys != bbefore { return false, 0, fmt.Errorf("inconsistent tx iteration") } if bkeys != bafter { return false, 0, fmt.Errorf("inconsistent tx iteration") } txNum := c.keys.txNum if c.beforeOn { if txNum != c.before.txNum { return false, 0, fmt.Errorf("inconsistent txNum, keys: %d, before: %d", txNum, c.before.txNum) } } if txNum != c.after.txNum { return false, 0, fmt.Errorf("inconsistent txNum, keys: %d, after: %d", txNum, c.after.txNum) } return bkeys, txNum, nil } func (c *Changes) rewind() error { if err := c.keys.rewind(); err != nil { return err } if c.beforeOn { if err := c.before.rewind(); err != nil { return err } } if err := c.after.rewind(); err != nil { return err } return nil } func (c *Changes) nextTriple(keyBuf, beforeBuf, afterBuf []byte) ([]byte, []byte, []byte, bool, error) { key, bkeys, err := c.keys.nextWord(keyBuf) if err != nil { return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("next key: %w", err) } var before, after []byte var bbefore, bafter bool if c.beforeOn { if before, bbefore, err = c.before.nextWord(beforeBuf); err != nil { return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("next before: %w", err) } } if c.beforeOn && bkeys != bbefore { return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("inconsistent word iteration") } if after, bafter, err = c.after.nextWord(afterBuf); err != nil { return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("next after: %w", err) } if bkeys != bafter { return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("inconsistent word iteration") } return key, before, after, bkeys, nil } func (c *Changes) deleteFiles() error { if err := c.keys.deleteFile(); err != nil { return err } if c.beforeOn { if err := c.before.deleteFile(); err != nil { return err } } if err := c.after.deleteFile(); err != nil { return err } return nil } func buildIndex(d *compress.Decompressor, idxPath, tmpDir string, count int) (*recsplit.Index, error) { var rs *recsplit.RecSplit var err error if rs, err = recsplit.NewRecSplit(recsplit.RecSplitArgs{ KeyCount: count, Enums: false, BucketSize: 2000, LeafSize: 8, TmpDir: tmpDir, IndexFile: idxPath, EtlBufLimit: etl.BufferOptimalSize / 2, }); err != nil { return nil, err } defer rs.Close() word := make([]byte, 0, 256) var pos uint64 g := d.MakeGetter() for { g.Reset(0) for g.HasNext() { word, _ = g.Next(word[:0]) if err = rs.AddKey(word, pos); err != nil { return nil, err } // Skip value pos = g.Skip() } if err = rs.Build(); err != nil { if rs.Collision() { log.Info("Building recsplit. Collision happened. It's ok. Restarting...") rs.ResetNextSalt() } else { return nil, err } } else { break } } var idx *recsplit.Index if idx, err = recsplit.OpenIndex(idxPath); err != nil { return nil, err } return idx, nil } // aggregate gathers changes from the changefiles into a B-tree, and "removes" them from the database // This function is time-critical because it needs to be run in the same go-routine (thread) as the general // execution (due to read-write tx). After that, we can optimistically execute the rest in the background func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, tx kv.RwTx, table string, commitMerger commitmentMerger) (*btree.BTreeG[*AggregateItem], error) { if err := c.openFiles(blockTo, false /* write */); err != nil { return nil, fmt.Errorf("open files: %w", err) } bt := btree.NewG[*AggregateItem](32, AggregateItemLess) err := c.aggregateToBtree(bt, prefixLen, commitMerger) if err != nil { return nil, fmt.Errorf("aggregateToBtree: %w", err) } // Clean up the DB table var e error bt.Ascend(func(item *AggregateItem) bool { if item.count == 0 { return true } dbPrefix := item.k prevV, err := tx.GetOne(table, dbPrefix) if err != nil { e = err return false } if prevV == nil { e = fmt.Errorf("record not found in db for %s key %x", table, dbPrefix) return false } prevNum := binary.BigEndian.Uint32(prevV[:4]) if prevNum < item.count { e = fmt.Errorf("record count too low for %s key %s count %d, subtracting %d", table, dbPrefix, prevNum, item.count) return false } if prevNum == item.count { if e = tx.Delete(table, dbPrefix); e != nil { return false } } else { v := make([]byte, len(prevV)) binary.BigEndian.PutUint32(v[:4], prevNum-item.count) copy(v[4:], prevV[4:]) if e = tx.Put(table, dbPrefix, v); e != nil { return false } } return true }) if e != nil { return nil, fmt.Errorf("clean up table %s after aggregation: %w", table, e) } return bt, nil } func (a *Aggregator) updateArch(bt *btree.BTreeG[*AggregateItem], fType FileType, blockNum32 uint32) { arch := a.arches[fType] h := a.archHasher n := uint64(len(arch)) if n == 0 { return } bt.Ascend(func(item *AggregateItem) bool { if item.count == 0 { return true } h.Reset() h.Write(item.k) //nolint:errcheck p, _ := h.Sum128() p = p % n v := atomic.LoadUint32(&arch[p]) if v < blockNum32 { //fmt.Printf("Updated %s arch [%x]=%d %d\n", fType.String(), item.k, p, blockNum32) atomic.StoreUint32(&arch[p], blockNum32) } return true }) } type AggregateItem struct { k, v []byte count uint32 } func AggregateItemLess(a, than *AggregateItem) bool { return bytes.Compare(a.k, than.k) < 0 } func (i *AggregateItem) Less(than btree.Item) bool { return bytes.Compare(i.k, than.(*AggregateItem).k) < 0 } func (c *Changes) produceChangeSets(blockFrom, blockTo uint64, historyType, bitmapType FileType) (*compress.Decompressor, *recsplit.Index, *compress.Decompressor, *recsplit.Index, error) { chsetDatPath := filepath.Join(c.dir, fmt.Sprintf("%s.%d-%d.dat", historyType.String(), blockFrom, blockTo)) chsetIdxPath := filepath.Join(c.dir, fmt.Sprintf("%s.%d-%d.idx", historyType.String(), blockFrom, blockTo)) bitmapDatPath := filepath.Join(c.dir, fmt.Sprintf("%s.%d-%d.dat", bitmapType.String(), blockFrom, blockTo)) bitmapIdxPath := filepath.Join(c.dir, fmt.Sprintf("%s.%d-%d.idx", bitmapType.String(), blockFrom, blockTo)) var blockSuffix [8]byte binary.BigEndian.PutUint64(blockSuffix[:], blockTo) bitmaps := map[string]*roaring64.Bitmap{} comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, chsetDatPath, c.dir, compress.MinPatternScore, 1, log.LvlDebug) if err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets NewCompressor: %w", err) } defer func() { if comp != nil { comp.Close() } }() var totalRecords int var b bool var e error var txNum uint64 var key, before, after []byte if err = c.rewind(); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets rewind: %w", err) } var txKey = make([]byte, 8, 60) for b, txNum, e = c.nextTx(); b && e == nil; b, txNum, e = c.nextTx() { binary.BigEndian.PutUint64(txKey[:8], txNum) for key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]); b && e == nil; key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]) { totalRecords++ txKey = append(txKey[:8], key...) // In the inital files and most merged file, the txKey is added to the file, but it gets removed in the final merge if err = comp.AddUncompressedWord(txKey); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets AddWord key: %w", err) } if err = comp.AddUncompressedWord(before); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets AddWord before: %w", err) } //if historyType == AccountHistory { // fmt.Printf("produce %s.%d-%d [%x]=>[%x]\n", historyType.String(), blockFrom, blockTo, txKey, before) //} var bitmap *roaring64.Bitmap var ok bool if bitmap, ok = bitmaps[string(key)]; !ok { bitmap = roaring64.New() bitmaps[string(key)] = bitmap } bitmap.Add(txNum) } if e != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets nextTriple: %w", e) } } if e != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets prevTx: %w", e) } if err = comp.Compress(); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets Compress: %w", err) } comp.Close() comp = nil var d *compress.Decompressor var index *recsplit.Index if d, err = compress.NewDecompressor(chsetDatPath); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets changeset decompressor: %w", err) } if index, err = buildIndex(d, chsetIdxPath, c.dir, totalRecords); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets changeset buildIndex: %w", err) } // Create bitmap files bitmapC, err := compress.NewCompressor(context.Background(), AggregatorPrefix, bitmapDatPath, c.dir, compress.MinPatternScore, 1, log.LvlDebug) if err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap NewCompressor: %w", err) } defer func() { if bitmapC != nil { bitmapC.Close() } }() idxKeys := make([]string, len(bitmaps)) i := 0 var buf []byte for key := range bitmaps { idxKeys[i] = key i++ } slices.Sort(idxKeys) for _, key := range idxKeys { if err = bitmapC.AddUncompressedWord([]byte(key)); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap add key: %w", err) } bitmap := bitmaps[key] ef := eliasfano32.NewEliasFano(bitmap.GetCardinality(), bitmap.Maximum()) it := bitmap.Iterator() for it.HasNext() { v := it.Next() ef.AddOffset(v) } ef.Build() buf = ef.AppendBytes(buf[:0]) if err = bitmapC.AddUncompressedWord(buf); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap add val: %w", err) } } if err = bitmapC.Compress(); err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap Compress: %w", err) } bitmapC.Close() bitmapC = nil bitmapD, err := compress.NewDecompressor(bitmapDatPath) if err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap decompressor: %w", err) } bitmapI, err := buildIndex(bitmapD, bitmapIdxPath, c.dir, len(idxKeys)) if err != nil { return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap buildIndex: %w", err) } return d, index, bitmapD, bitmapI, nil } // aggregateToBtree iterates over all available changes in the change files covered by this instance `c` // (there are 3 of them, one for "keys", one for values "before" every change, and one for values "after" every change) // and create a B-tree where each key is only represented once, with the value corresponding to the "after" value // of the latest change. func (c *Changes) aggregateToBtree(bt *btree.BTreeG[*AggregateItem], prefixLen int, commitMerge commitmentMerger) error { var b bool var e error var key, before, after []byte var ai AggregateItem var prefix []byte // Note that the following loop iterates over transactions forwards, therefore it replace entries in the B-tree for b, _, e = c.nextTx(); b && e == nil; b, _, e = c.nextTx() { // Within each transaction, keys are unique, but they can appear in any order for key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]); b && e == nil; key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]) { if prefixLen > 0 && !bytes.Equal(prefix, key[:prefixLen]) { prefix = common.Copy(key[:prefixLen]) item := &AggregateItem{k: prefix, count: 0} bt.ReplaceOrInsert(item) } ai.k = key i, ok := bt.Get(&ai) if !ok || i == nil { item := &AggregateItem{k: common.Copy(key), v: common.Copy(after), count: 1} bt.ReplaceOrInsert(item) continue } item := i if commitMerge != nil { mergedVal, err := commitMerge(item.v, after, nil) if err != nil { return fmt.Errorf("merge branches (%T) : %w", commitMerge, err) } //fmt.Printf("aggregateToBtree prefix [%x], [%x]+[%x]=>[%x]\n", commitment.CompactToHex(key), after, item.v, mergedVal) item.v = mergedVal } else { item.v = common.Copy(after) } item.count++ } if e != nil { return fmt.Errorf("aggregateToBtree nextTriple: %w", e) } } if e != nil { return fmt.Errorf("aggregateToBtree prevTx: %w", e) } return nil } const AggregatorPrefix = "aggregator" func btreeToFile(bt *btree.BTreeG[*AggregateItem], datPath, tmpdir string, trace bool, workers int) (int, error) { comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datPath, tmpdir, compress.MinPatternScore, workers, log.LvlDebug) if err != nil { return 0, err } defer comp.Close() comp.SetTrace(trace) count := 0 bt.Ascend(func(item *AggregateItem) bool { //fmt.Printf("btreeToFile %s [%x]=>[%x]\n", datPath, item.k, item.v) if err = comp.AddUncompressedWord(item.k); err != nil { return false } count++ // Only counting keys, not values if err = comp.AddUncompressedWord(item.v); err != nil { return false } return true }) if err != nil { return 0, err } if err = comp.Compress(); err != nil { return 0, err } return count, nil } type ChangesItem struct { endBlock uint64 startBlock uint64 fileCount int } func (i *ChangesItem) Less(than btree.Item) bool { if i.endBlock == than.(*ChangesItem).endBlock { // Larger intevals will come last return i.startBlock > than.(*ChangesItem).startBlock } return i.endBlock < than.(*ChangesItem).endBlock } type byEndBlockItem struct { decompressor *compress.Decompressor getter *compress.Getter // reader for the decompressor getterMerge *compress.Getter // reader for the decompressor used in the background merge thread index *recsplit.Index indexReader *recsplit.IndexReader // reader for the index readerMerge *recsplit.IndexReader // index reader for the background merge thread tree *btree.BTreeG[*AggregateItem] // Substitute for decompressor+index combination startBlock uint64 endBlock uint64 } func ByEndBlockItemLess(i, than *byEndBlockItem) bool { if i.endBlock == than.endBlock { return i.startBlock > than.startBlock } return i.endBlock < than.endBlock } func (i *byEndBlockItem) Less(than btree.Item) bool { if i.endBlock == than.(*byEndBlockItem).endBlock { return i.startBlock > than.(*byEndBlockItem).startBlock } return i.endBlock < than.(*byEndBlockItem).endBlock } func (a *Aggregator) scanStateFiles(files []fs.DirEntry) { typeStrings := make([]string, NumberOfTypes) for fType := FileType(0); fType < NumberOfTypes; fType++ { typeStrings[fType] = fType.String() } re := regexp.MustCompile("^(" + strings.Join(typeStrings, "|") + ").([0-9]+)-([0-9]+).(dat|idx)$") var err error for _, f := range files { name := f.Name() subs := re.FindStringSubmatch(name) if len(subs) != 5 { if len(subs) != 0 { log.Warn("File ignored by aggregator, more than 4 submatches", "name", name, "submatches", len(subs)) } continue } var startBlock, endBlock uint64 if startBlock, err = strconv.ParseUint(subs[2], 10, 64); err != nil { log.Warn("File ignored by aggregator, parsing startBlock", "error", err, "name", name) continue } if endBlock, err = strconv.ParseUint(subs[3], 10, 64); err != nil { log.Warn("File ignored by aggregator, parsing endBlock", "error", err, "name", name) continue } if startBlock > endBlock { log.Warn("File ignored by aggregator, startBlock > endBlock", "name", name) continue } fType, ok := ParseFileType(subs[1]) if !ok { log.Warn("File ignored by aggregator, type unknown", "type", subs[1]) } var item = &byEndBlockItem{startBlock: startBlock, endBlock: endBlock} var foundI *byEndBlockItem a.files[fType].AscendGreaterOrEqual(&byEndBlockItem{startBlock: endBlock, endBlock: endBlock}, func(i btree.Item) bool { it := i.(*byEndBlockItem) if it.endBlock == endBlock { foundI = it } return false }) if foundI == nil || foundI.startBlock > startBlock { log.Info("Load state file", "name", name, "type", fType.String(), "startBlock", startBlock, "endBlock", endBlock) a.files[fType].ReplaceOrInsert(item) } } } func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, changesets, commitments bool, minArch uint64, trie commitment.Trie, tx kv.RwTx) (*Aggregator, error) { a := &Aggregator{ diffDir: diffDir, unwindLimit: unwindLimit, aggregationStep: aggregationStep, tracedKeys: map[string]struct{}{}, keccak: sha3.NewLegacyKeccak256(), hph: trie, aggChannel: make(chan *AggregationTask, 1024), aggError: make(chan error, 1), mergeChannel: make(chan struct{}, 1), mergeError: make(chan error, 1), historyChannel: make(chan struct{}, 1), historyError: make(chan error, 1), changesets: changesets, commitments: commitments, archHasher: murmur3.New128WithSeed(0), // TODO: Randomise salt } for fType := FirstType; fType < NumberOfTypes; fType++ { a.files[fType] = btree.New(32) } var closeStateFiles = true // It will be set to false in case of success at the end of the function defer func() { // Clean up all decompressor and indices upon error if closeStateFiles { a.Close() } }() // Scan the diff directory and create the mapping of end blocks to files files, err := os.ReadDir(diffDir) if err != nil { return nil, err } a.scanStateFiles(files) // Check for overlaps and holes for fType := FirstType; fType < NumberOfTypes; fType++ { if err := checkOverlaps(fType.String(), a.files[fType]); err != nil { return nil, err } } // Open decompressor and index files for all items in state trees for fType := FirstType; fType < NumberOfTypes; fType++ { if err := a.openFiles(fType, minArch); err != nil { return nil, fmt.Errorf("opening %s state files: %w", fType.String(), err) } } a.changesBtree = btree.New(32) re := regexp.MustCompile(`^(account|storage|code|commitment).(keys|before|after).([0-9]+)-([0-9]+).chg$`) for _, f := range files { name := f.Name() subs := re.FindStringSubmatch(name) if len(subs) != 5 { if len(subs) != 0 { log.Warn("File ignored by changes scan, more than 4 submatches", "name", name, "submatches", len(subs)) } continue } var startBlock, endBlock uint64 if startBlock, err = strconv.ParseUint(subs[3], 10, 64); err != nil { log.Warn("File ignored by changes scan, parsing startBlock", "error", err, "name", name) continue } if endBlock, err = strconv.ParseUint(subs[4], 10, 64); err != nil { log.Warn("File ignored by changes scan, parsing endBlock", "error", err, "name", name) continue } if startBlock > endBlock { log.Warn("File ignored by changes scan, startBlock > endBlock", "name", name) continue } if endBlock != startBlock+aggregationStep-1 { log.Warn("File ignored by changes scan, endBlock != startBlock+aggregationStep-1", "name", name) continue } var item = &ChangesItem{fileCount: 1, startBlock: startBlock, endBlock: endBlock} i := a.changesBtree.Get(item) if i == nil { a.changesBtree.ReplaceOrInsert(item) } else { item = i.(*ChangesItem) if item.startBlock == startBlock { item.fileCount++ } else { return nil, fmt.Errorf("change files overlap [%d-%d] with [%d-%d]", item.startBlock, item.endBlock, startBlock, endBlock) } } } // Check for holes in change files minStart := uint64(math.MaxUint64) a.changesBtree.Descend(func(i btree.Item) bool { item := i.(*ChangesItem) if item.startBlock < minStart { if item.endBlock >= minStart { err = fmt.Errorf("overlap of change files [%d-%d] with %d", item.startBlock, item.endBlock, minStart) return false } if minStart != math.MaxUint64 && item.endBlock+1 != minStart { err = fmt.Errorf("whole in change files [%d-%d]", item.endBlock, minStart) return false } minStart = item.startBlock } else { err = fmt.Errorf("overlap of change files [%d-%d] with %d", item.startBlock, item.endBlock, minStart) return false } return true }) if err != nil { return nil, err } for fType := FirstType; fType < NumberOfStateTypes; fType++ { if err = checkOverlapWithMinStart(fType.String(), a.files[fType], minStart); err != nil { return nil, err } } if err = a.rebuildRecentState(tx); err != nil { return nil, fmt.Errorf("rebuilding recent state from change files: %w", err) } closeStateFiles = false a.aggWg.Add(1) go a.backgroundAggregation() a.mergeWg.Add(1) go a.backgroundMerge() if a.changesets { a.historyWg.Add(1) go a.backgroundHistoryMerge() } return a, nil } // rebuildRecentState reads change files and reconstructs the recent state func (a *Aggregator) rebuildRecentState(tx kv.RwTx) error { t := time.Now() var err error trees := map[FileType]*btree.BTreeG[*AggregateItem]{} a.changesBtree.Ascend(func(i btree.Item) bool { item := i.(*ChangesItem) for fType := FirstType; fType < NumberOfStateTypes; fType++ { tree, ok := trees[fType] if !ok { tree = btree.NewG[*AggregateItem](32, AggregateItemLess) trees[fType] = tree } var changes Changes changes.Init(fType.String(), a.aggregationStep, a.diffDir, false /* beforeOn */) if err = changes.openFiles(item.startBlock, false /* write */); err != nil { return false } var prefixLen int if fType == Storage { prefixLen = length.Addr } var commitMerger commitmentMerger if fType == Commitment { commitMerger = mergeCommitments } if err = changes.aggregateToBtree(tree, prefixLen, commitMerger); err != nil { return false } if err = changes.closeFiles(); err != nil { return false } } return true }) if err != nil { return err } for fType, tree := range trees { table := fType.Table() tree.Ascend(func(item *AggregateItem) bool { if len(item.v) == 0 { return true } var v []byte if v, err = tx.GetOne(table, item.k); err != nil { return false } if item.count != binary.BigEndian.Uint32(v[:4]) { err = fmt.Errorf("mismatched count for %x: change file %d, db: %d", item.k, item.count, binary.BigEndian.Uint32(v[:4])) return false } if !bytes.Equal(item.v, v[4:]) { err = fmt.Errorf("mismatched v for %x: change file [%x], db: [%x]", item.k, item.v, v[4:]) return false } return true }) } if err != nil { return err } log.Info("reconstructed recent state", "in", time.Since(t)) return nil } type AggregationTask struct { bt [NumberOfStateTypes]*btree.BTreeG[*AggregateItem] changes [NumberOfStateTypes]Changes blockFrom uint64 blockTo uint64 } func (a *Aggregator) removeLocked(fType FileType, toRemove []*byEndBlockItem, item *byEndBlockItem) { a.fileLocks[fType].Lock() defer a.fileLocks[fType].Unlock() if len(toRemove) > 1 { for _, ag := range toRemove { a.files[fType].Delete(ag) } a.files[fType].ReplaceOrInsert(item) } } func (a *Aggregator) removeLockedState( accountsToRemove []*byEndBlockItem, accountsItem *byEndBlockItem, codeToRemove []*byEndBlockItem, codeItem *byEndBlockItem, storageToRemove []*byEndBlockItem, storageItem *byEndBlockItem, commitmentToRemove []*byEndBlockItem, commitmentItem *byEndBlockItem, ) { for fType := FirstType; fType < NumberOfStateTypes; fType++ { a.fileLocks[fType].Lock() defer a.fileLocks[fType].Unlock() } if len(accountsToRemove) > 1 { for _, ag := range accountsToRemove { a.files[Account].Delete(ag) } a.files[Account].ReplaceOrInsert(accountsItem) } if len(codeToRemove) > 1 { for _, ag := range codeToRemove { a.files[Code].Delete(ag) } a.files[Code].ReplaceOrInsert(codeItem) } if len(storageToRemove) > 1 { for _, ag := range storageToRemove { a.files[Storage].Delete(ag) } a.files[Storage].ReplaceOrInsert(storageItem) } if len(commitmentToRemove) > 1 { for _, ag := range commitmentToRemove { a.files[Commitment].Delete(ag) } a.files[Commitment].ReplaceOrInsert(commitmentItem) } } func removeFiles(fType FileType, diffDir string, toRemove []*byEndBlockItem) error { // Close all the memory maps etc for _, ag := range toRemove { if err := ag.index.Close(); err != nil { return fmt.Errorf("close index: %w", err) } if err := ag.decompressor.Close(); err != nil { return fmt.Errorf("close decompressor: %w", err) } } // Delete files // TODO: in a non-test version, this is delayed to allow other participants to roll over to the next file for _, ag := range toRemove { if err := os.Remove(path.Join(diffDir, fmt.Sprintf("%s.%d-%d.dat", fType.String(), ag.startBlock, ag.endBlock))); err != nil { return fmt.Errorf("remove decompressor file %s.%d-%d.dat: %w", fType.String(), ag.startBlock, ag.endBlock, err) } if err := os.Remove(path.Join(diffDir, fmt.Sprintf("%s.%d-%d.idx", fType.String(), ag.startBlock, ag.endBlock))); err != nil { return fmt.Errorf("remove index file %s.%d-%d.idx: %w", fType.String(), ag.startBlock, ag.endBlock, err) } } return nil } // backgroundAggregation is the functin that runs in a background go-routine and performs creation of initial state files // allowing the main goroutine to proceed func (a *Aggregator) backgroundAggregation() { defer a.aggWg.Done() for aggTask := range a.aggChannel { if a.changesets { if historyD, historyI, bitmapD, bitmapI, err := aggTask.changes[Account].produceChangeSets(aggTask.blockFrom, aggTask.blockTo, AccountHistory, AccountBitmap); err == nil { var historyItem = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} historyItem.decompressor = historyD historyItem.index = historyI historyItem.getter = historyItem.decompressor.MakeGetter() historyItem.getterMerge = historyItem.decompressor.MakeGetter() historyItem.indexReader = recsplit.NewIndexReader(historyItem.index) historyItem.readerMerge = recsplit.NewIndexReader(historyItem.index) a.addLocked(AccountHistory, historyItem) var bitmapItem = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} bitmapItem.decompressor = bitmapD bitmapItem.index = bitmapI bitmapItem.getter = bitmapItem.decompressor.MakeGetter() bitmapItem.getterMerge = bitmapItem.decompressor.MakeGetter() bitmapItem.indexReader = recsplit.NewIndexReader(bitmapItem.index) bitmapItem.readerMerge = recsplit.NewIndexReader(bitmapItem.index) a.addLocked(AccountBitmap, bitmapItem) } else { a.aggError <- fmt.Errorf("produceChangeSets %s: %w", Account.String(), err) return } if historyD, historyI, bitmapD, bitmapI, err := aggTask.changes[Storage].produceChangeSets(aggTask.blockFrom, aggTask.blockTo, StorageHistory, StorageBitmap); err == nil { var historyItem = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} historyItem.decompressor = historyD historyItem.index = historyI historyItem.getter = historyItem.decompressor.MakeGetter() historyItem.getterMerge = historyItem.decompressor.MakeGetter() historyItem.indexReader = recsplit.NewIndexReader(historyItem.index) historyItem.readerMerge = recsplit.NewIndexReader(historyItem.index) a.addLocked(StorageHistory, historyItem) var bitmapItem = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} bitmapItem.decompressor = bitmapD bitmapItem.index = bitmapI bitmapItem.getter = bitmapItem.decompressor.MakeGetter() bitmapItem.getterMerge = bitmapItem.decompressor.MakeGetter() bitmapItem.indexReader = recsplit.NewIndexReader(bitmapItem.index) bitmapItem.readerMerge = recsplit.NewIndexReader(bitmapItem.index) a.addLocked(StorageBitmap, bitmapItem) } else { a.aggError <- fmt.Errorf("produceChangeSets %s: %w", Storage.String(), err) return } if historyD, historyI, bitmapD, bitmapI, err := aggTask.changes[Code].produceChangeSets(aggTask.blockFrom, aggTask.blockTo, CodeHistory, CodeBitmap); err == nil { var historyItem = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} historyItem.decompressor = historyD historyItem.index = historyI historyItem.getter = historyItem.decompressor.MakeGetter() historyItem.getterMerge = historyItem.decompressor.MakeGetter() historyItem.indexReader = recsplit.NewIndexReader(historyItem.index) historyItem.readerMerge = recsplit.NewIndexReader(historyItem.index) a.addLocked(CodeHistory, historyItem) var bitmapItem = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} bitmapItem.decompressor = bitmapD bitmapItem.index = bitmapI bitmapItem.getter = bitmapItem.decompressor.MakeGetter() bitmapItem.getterMerge = bitmapItem.decompressor.MakeGetter() bitmapItem.indexReader = recsplit.NewIndexReader(bitmapItem.index) bitmapItem.readerMerge = recsplit.NewIndexReader(bitmapItem.index) a.addLocked(CodeBitmap, bitmapItem) } else { a.aggError <- fmt.Errorf("produceChangeSets %s: %w", Code.String(), err) return } } typesLimit := Commitment if a.commitments { typesLimit = AccountHistory } for fType := FirstType; fType < typesLimit; fType++ { var err error if err = aggTask.changes[fType].closeFiles(); err != nil { a.aggError <- fmt.Errorf("close %sChanges: %w", fType.String(), err) return } var item = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} if item.decompressor, item.index, err = createDatAndIndex(fType.String(), a.diffDir, aggTask.bt[fType], aggTask.blockFrom, aggTask.blockTo); err != nil { a.aggError <- fmt.Errorf("createDatAndIndex %s: %w", fType.String(), err) return } item.getter = item.decompressor.MakeGetter() item.getterMerge = item.decompressor.MakeGetter() item.indexReader = recsplit.NewIndexReader(item.index) item.readerMerge = recsplit.NewIndexReader(item.index) if err = aggTask.changes[fType].deleteFiles(); err != nil { a.aggError <- fmt.Errorf("delete %sChanges: %w", fType.String(), err) return } a.addLocked(fType, item) } // At this point, 3 new state files (containing latest changes) has been created for accounts, code, and storage // Corresponding items has been added to the registy of state files, and B-tree are not necessary anymore, change files can be removed // What follows can be performed by the 2nd background goroutine select { case a.mergeChannel <- struct{}{}: default: } select { case a.historyChannel <- struct{}{}: default: } } } type CommitmentValTransform struct { pre [NumberOfAccountStorageTypes][]*byEndBlockItem // List of state files before the merge post [NumberOfAccountStorageTypes][]*byEndBlockItem // List of state files after the merge } func decodeU64(from []byte) uint64 { var i uint64 for _, b := range from { i = (i << 8) | uint64(b) } return i } func encodeU64(i uint64, to []byte) []byte { // writes i to b in big endian byte order, using the least number of bytes needed to represent i. switch { case i < (1 << 8): return append(to, byte(i)) case i < (1 << 16): return append(to, byte(i>>8), byte(i)) case i < (1 << 24): return append(to, byte(i>>16), byte(i>>8), byte(i)) case i < (1 << 32): return append(to, byte(i>>24), byte(i>>16), byte(i>>8), byte(i)) case i < (1 << 40): return append(to, byte(i>>32), byte(i>>24), byte(i>>16), byte(i>>8), byte(i)) case i < (1 << 48): return append(to, byte(i>>40), byte(i>>32), byte(i>>24), byte(i>>16), byte(i>>8), byte(i)) case i < (1 << 56): return append(to, byte(i>>48), byte(i>>40), byte(i>>32), byte(i>>24), byte(i>>16), byte(i>>8), byte(i)) default: return append(to, byte(i>>56), byte(i>>48), byte(i>>40), byte(i>>32), byte(i>>24), byte(i>>16), byte(i>>8), byte(i)) } } // commitmentValTransform parses the value of the commitment record to extract references // to accounts and storage items, then looks them up in the new, merged files, and replaces them with // the updated references func (cvt *CommitmentValTransform) commitmentValTransform(val, transValBuf commitment.BranchData) ([]byte, error) { if len(val) == 0 { return transValBuf, nil } accountPlainKeys, storagePlainKeys, err := val.ExtractPlainKeys() if err != nil { return nil, err } transAccountPks := make([][]byte, 0, len(accountPlainKeys)) var apkBuf, spkBuf []byte for _, accountPlainKey := range accountPlainKeys { if len(accountPlainKey) == length.Addr { // Non-optimised key originating from a database record apkBuf = append(apkBuf[:0], accountPlainKey...) } else { // Optimised key referencing a state file record (file number and offset within the file) fileI := int(accountPlainKey[0]) offset := decodeU64(accountPlainKey[1:]) g := cvt.pre[Account][fileI].getterMerge g.Reset(offset) apkBuf, _ = g.Next(apkBuf[:0]) //fmt.Printf("replacing account [%x] from [%x]\n", apkBuf, accountPlainKey) } // Look up apkBuf in the post account files for j := len(cvt.post[Account]); j > 0; j-- { item := cvt.post[Account][j-1] if item.index.Empty() { continue } offset := item.readerMerge.Lookup(apkBuf) g := item.getterMerge g.Reset(offset) if g.HasNext() { if keyMatch, _ := g.Match(apkBuf); keyMatch { accountPlainKey = encodeU64(offset, []byte{byte(j - 1)}) //fmt.Printf("replaced account [%x]=>[%x] for file [%d-%d]\n", apkBuf, accountPlainKey, item.startBlock, item.endBlock) break } else if j == 0 { fmt.Printf("could not find replacement key [%x], file=%s.%d-%d]\n\n", apkBuf, Account.String(), item.startBlock, item.endBlock) } } } transAccountPks = append(transAccountPks, accountPlainKey) } transStoragePks := make([][]byte, 0, len(storagePlainKeys)) for _, storagePlainKey := range storagePlainKeys { if len(storagePlainKey) == length.Addr+length.Hash { // Non-optimised key originating from a database record spkBuf = append(spkBuf[:0], storagePlainKey...) } else { // Optimised key referencing a state file record (file number and offset within the file) fileI := int(storagePlainKey[0]) offset := decodeU64(storagePlainKey[1:]) g := cvt.pre[Storage][fileI].getterMerge g.Reset(offset) //fmt.Printf("offsetToKey storage [%x] offset=%d, file=%d-%d\n", storagePlainKey, offset, cvt.pre[Storage][fileI].startBlock, cvt.pre[Storage][fileI].endBlock) spkBuf, _ = g.Next(spkBuf[:0]) } // Lookup spkBuf in the post storage files for j := len(cvt.post[Storage]); j > 0; j-- { item := cvt.post[Storage][j-1] if item.index.Empty() { continue } offset := item.readerMerge.Lookup(spkBuf) g := item.getterMerge g.Reset(offset) if g.HasNext() { if keyMatch, _ := g.Match(spkBuf); keyMatch { storagePlainKey = encodeU64(offset, []byte{byte(j - 1)}) //fmt.Printf("replacing storage [%x] => [fileI=%d, offset=%d, file=%s.%d-%d]\n", spkBuf, j-1, offset, Storage.String(), item.startBlock, item.endBlock) break } else if j == 0 { fmt.Printf("could not find replacement key [%x], file=%s.%d-%d]\n\n", spkBuf, Storage.String(), item.startBlock, item.endBlock) } } } transStoragePks = append(transStoragePks, storagePlainKey) } if transValBuf, err = val.ReplacePlainKeys(transAccountPks, transStoragePks, transValBuf); err != nil { return nil, err } return transValBuf, nil } func (a *Aggregator) backgroundMerge() { defer a.mergeWg.Done() for range a.mergeChannel { t := time.Now() var err error var cvt CommitmentValTransform var toRemove [NumberOfStateTypes][]*byEndBlockItem var newItems [NumberOfStateTypes]*byEndBlockItem var blockFrom, blockTo uint64 lastType := Code typesLimit := Commitment if a.commitments { lastType = Commitment typesLimit = AccountHistory } // Lock the set of commitment (or code if commitments are off) files - those are the smallest, because account, storage and code files may be added by the aggregation thread first toRemove[lastType], _, _, blockFrom, blockTo = a.findLargestMerge(lastType, uint64(math.MaxUint64) /* maxBlockTo */, uint64(math.MaxUint64) /* maxSpan */) for fType := FirstType; fType < typesLimit; fType++ { var pre, post []*byEndBlockItem var from, to uint64 if fType == lastType { from = blockFrom to = blockTo } else { toRemove[fType], pre, post, from, to = a.findLargestMerge(fType, blockTo, uint64(math.MaxUint64) /* maxSpan */) if from != blockFrom { a.mergeError <- fmt.Errorf("%sFrom %d != blockFrom %d", fType.String(), from, blockFrom) return } if to != blockTo { a.mergeError <- fmt.Errorf("%sTo %d != blockTo %d", fType.String(), to, blockTo) return } } if len(toRemove[fType]) > 1 { var valTransform func(commitment.BranchData, commitment.BranchData) ([]byte, error) var mergeFunc commitmentMerger if fType == Commitment { valTransform = cvt.commitmentValTransform mergeFunc = mergeCommitments } else { mergeFunc = mergeReplace } var prefixLen int if fType == Storage { prefixLen = length.Addr } if newItems[fType], err = a.computeAggregation(fType, toRemove[fType], from, to, valTransform, mergeFunc, true /* valCompressed */, true /* withIndex */, prefixLen); err != nil { a.mergeError <- fmt.Errorf("computeAggreation %s: %w", fType.String(), err) return } post = append(post, newItems[fType]) } if fType < NumberOfAccountStorageTypes { cvt.pre[fType] = pre cvt.post[fType] = post } } // Switch aggregator to new state files, close and remove old files a.removeLockedState(toRemove[Account], newItems[Account], toRemove[Code], newItems[Code], toRemove[Storage], newItems[Storage], toRemove[Commitment], newItems[Commitment]) removed := 0 for fType := FirstType; fType < typesLimit; fType++ { if len(toRemove[fType]) > 1 { removeFiles(fType, a.diffDir, toRemove[fType]) removed += len(toRemove[fType]) - 1 } } mergeTime := time.Since(t) if mergeTime > time.Minute { log.Info("Long merge", "from", blockFrom, "to", blockTo, "files", removed, "time", time.Since(t)) } } } func (a *Aggregator) reduceHistoryFiles(fType FileType, item *byEndBlockItem) error { datTmpPath := filepath.Join(a.diffDir, fmt.Sprintf("%s.%d-%d.dat.tmp", fType.String(), item.startBlock, item.endBlock)) datPath := filepath.Join(a.diffDir, fmt.Sprintf("%s.%d-%d.dat", fType.String(), item.startBlock, item.endBlock)) idxPath := filepath.Join(a.diffDir, fmt.Sprintf("%s.%d-%d.idx", fType.String(), item.startBlock, item.endBlock)) comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datTmpPath, a.diffDir, compress.MinPatternScore, 1, log.LvlDebug) if err != nil { return fmt.Errorf("reduceHistoryFiles create compressor %s: %w", datPath, err) } defer comp.Close() g := item.getter var val []byte var count int g.Reset(0) var key []byte for g.HasNext() { g.Skip() // Skip key on on the first pass val, _ = g.Next(val[:0]) //fmt.Printf("reduce1 [%s.%d-%d] [%x]=>[%x]\n", fType.String(), item.startBlock, item.endBlock, key, val) if err = comp.AddWord(val); err != nil { return fmt.Errorf("reduceHistoryFiles AddWord: %w", err) } count++ } if err = comp.Compress(); err != nil { return fmt.Errorf("reduceHistoryFiles compress: %w", err) } var d *compress.Decompressor if d, err = compress.NewDecompressor(datTmpPath); err != nil { return fmt.Errorf("reduceHistoryFiles create decompressor: %w", err) } var rs *recsplit.RecSplit if rs, err = recsplit.NewRecSplit(recsplit.RecSplitArgs{ KeyCount: count, Enums: false, BucketSize: 2000, LeafSize: 8, TmpDir: a.diffDir, IndexFile: idxPath, }); err != nil { return fmt.Errorf("reduceHistoryFiles NewRecSplit: %w", err) } g1 := d.MakeGetter() for { g.Reset(0) g1.Reset(0) var lastOffset uint64 for g.HasNext() { key, _ = g.Next(key[:0]) g.Skip() // Skip value _, pos := g1.Next(nil) //fmt.Printf("reduce2 [%s.%d-%d] [%x]==>%d\n", fType.String(), item.startBlock, item.endBlock, key, lastOffset) if err = rs.AddKey(key, lastOffset); err != nil { return fmt.Errorf("reduceHistoryFiles %p AddKey: %w", rs, err) } lastOffset = pos } if err = rs.Build(); err != nil { if rs.Collision() { log.Info("Building reduceHistoryFiles. Collision happened. It's ok. Restarting...") rs.ResetNextSalt() } else { return fmt.Errorf("reduceHistoryFiles Build: %w", err) } } else { break } } if err = item.decompressor.Close(); err != nil { return fmt.Errorf("reduceHistoryFiles close decompressor: %w", err) } if err = os.Remove(datPath); err != nil { return fmt.Errorf("reduceHistoryFiles remove: %w", err) } if err = os.Rename(datTmpPath, datPath); err != nil { return fmt.Errorf("reduceHistoryFiles rename: %w", err) } if item.decompressor, err = compress.NewDecompressor(datPath); err != nil { return fmt.Errorf("reduceHistoryFiles create new decompressor: %w", err) } item.getter = item.decompressor.MakeGetter() item.getterMerge = item.decompressor.MakeGetter() if item.index, err = recsplit.OpenIndex(idxPath); err != nil { return fmt.Errorf("reduceHistoryFiles open index: %w", err) } item.indexReader = recsplit.NewIndexReader(item.index) item.readerMerge = recsplit.NewIndexReader(item.index) return nil } type commitmentMerger func(prev, current, target commitment.BranchData) (commitment.BranchData, error) func mergeReplace(preval, val, buf commitment.BranchData) (commitment.BranchData, error) { return append(buf, val...), nil } func mergeBitmaps(preval, val, buf commitment.BranchData) (commitment.BranchData, error) { preef, _ := eliasfano32.ReadEliasFano(preval) ef, _ := eliasfano32.ReadEliasFano(val) //fmt.Printf("mergeBitmaps [%x] (count=%d,max=%d) + [%x] (count=%d,max=%d)\n", preval, preef.Count(), preef.Max(), val, ef.Count(), ef.Max()) preIt := preef.Iterator() efIt := ef.Iterator() newEf := eliasfano32.NewEliasFano(preef.Count()+ef.Count(), ef.Max()) for preIt.HasNext() { newEf.AddOffset(preIt.Next()) } for efIt.HasNext() { newEf.AddOffset(efIt.Next()) } newEf.Build() return newEf.AppendBytes(buf), nil } func mergeCommitments(preval, val, buf commitment.BranchData) (commitment.BranchData, error) { return preval.MergeHexBranches(val, buf) } func (a *Aggregator) backgroundHistoryMerge() { defer a.historyWg.Done() for range a.historyChannel { t := time.Now() var err error var toRemove [NumberOfTypes][]*byEndBlockItem var newItems [NumberOfTypes]*byEndBlockItem var blockFrom, blockTo uint64 // Lock the set of commitment files - those are the smallest, because account, storage and code files may be added by the aggregation thread first toRemove[CodeBitmap], _, _, blockFrom, blockTo = a.findLargestMerge(CodeBitmap, uint64(math.MaxUint64) /* maxBlockTo */, 500_000 /* maxSpan */) finalMerge := blockTo-blockFrom+1 == 500_000 for fType := AccountHistory; fType < NumberOfTypes; fType++ { var from, to uint64 if fType == CodeBitmap { from = blockFrom to = blockTo } else { toRemove[fType], _, _, from, to = a.findLargestMerge(fType, blockTo, 500_000 /* maxSpan */) if from != blockFrom { a.historyError <- fmt.Errorf("%sFrom %d != blockFrom %d", fType.String(), from, blockFrom) return } if to != blockTo { a.historyError <- fmt.Errorf("%sTo %d != blockTo %d", fType.String(), to, blockTo) return } } if len(toRemove[fType]) > 1 { isBitmap := fType == AccountBitmap || fType == StorageBitmap || fType == CodeBitmap var mergeFunc commitmentMerger switch { case isBitmap: mergeFunc = mergeBitmaps case fType == Commitment: mergeFunc = mergeCommitments default: mergeFunc = mergeReplace } if newItems[fType], err = a.computeAggregation(fType, toRemove[fType], from, to, nil /* valTransform */, mergeFunc, !isBitmap /* valCompressed */, !finalMerge || isBitmap /* withIndex */, 0 /* prefixLen */); err != nil { a.historyError <- fmt.Errorf("computeAggreation %s: %w", fType.String(), err) return } } } if finalMerge { // Special aggregation for blockTo - blockFrom + 1 == 500_000 // Remove keys from the .dat files assuming that they will only be used after querying the bitmap index // and therefore, there is no situation where non-existent key is queried. if err = a.reduceHistoryFiles(AccountHistory, newItems[AccountHistory]); err != nil { a.historyError <- fmt.Errorf("reduceHistoryFiles %s: %w", AccountHistory.String(), err) return } if err = a.reduceHistoryFiles(StorageHistory, newItems[StorageHistory]); err != nil { a.historyError <- fmt.Errorf("reduceHistoryFiles %s: %w", StorageHistory.String(), err) return } if err = a.reduceHistoryFiles(CodeHistory, newItems[CodeHistory]); err != nil { a.historyError <- fmt.Errorf("reduceHistoryFiles %s: %w", CodeHistory.String(), err) return } } for fType := AccountHistory; fType < NumberOfTypes; fType++ { a.removeLocked(fType, toRemove[fType], newItems[fType]) } removed := 0 for fType := AccountHistory; fType < NumberOfTypes; fType++ { if len(toRemove[fType]) > 1 { removeFiles(fType, a.diffDir, toRemove[fType]) removed += len(toRemove[fType]) - 1 } } mergeTime := time.Since(t) if mergeTime > time.Minute { log.Info("Long history merge", "from", blockFrom, "to", blockTo, "files", removed, "time", time.Since(t)) } } } // checkOverlaps does not lock tree, because it is only called from the constructor of aggregator func checkOverlaps(treeName string, tree *btree.BTree) error { var minStart uint64 = math.MaxUint64 var err error tree.Descend(func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.startBlock < minStart { if item.endBlock >= minStart { err = fmt.Errorf("overlap of %s state files [%d-%d] with %d", treeName, item.startBlock, item.endBlock, minStart) return false } if minStart != math.MaxUint64 && item.endBlock+1 != minStart { err = fmt.Errorf("hole in %s state files [%d-%d]", treeName, item.endBlock, minStart) return false } minStart = item.startBlock } return true }) return err } func (a *Aggregator) openFiles(fType FileType, minArch uint64) error { var err error var totalKeys uint64 a.files[fType].Ascend(func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.decompressor, err = compress.NewDecompressor(path.Join(a.diffDir, fmt.Sprintf("%s.%d-%d.dat", fType.String(), item.startBlock, item.endBlock))); err != nil { return false } if item.index, err = recsplit.OpenIndex(path.Join(a.diffDir, fmt.Sprintf("%s.%d-%d.idx", fType.String(), item.startBlock, item.endBlock))); err != nil { return false } totalKeys += item.index.KeyCount() item.getter = item.decompressor.MakeGetter() item.getterMerge = item.decompressor.MakeGetter() item.indexReader = recsplit.NewIndexReader(item.index) item.readerMerge = recsplit.NewIndexReader(item.index) return true }) if fType >= NumberOfStateTypes { return nil } log.Info("Creating arch...", "type", fType.String(), "total keys in all state files", totalKeys) // Allocate arch of double of total keys n := totalKeys * 2 if n < minArch { n = minArch } a.arches[fType] = make([]uint32, n) arch := a.arches[fType] var key []byte h := a.archHasher collisions := 0 a.files[fType].Ascend(func(i btree.Item) bool { item := i.(*byEndBlockItem) g := item.getter g.Reset(0) blockNum := uint32(item.endBlock) for g.HasNext() { key, _ = g.Next(key[:0]) h.Reset() h.Write(key) //nolint:errcheck p, _ := h.Sum128() p = p % n if arch[p] != 0 { collisions++ } arch[p] = blockNum g.Skip() } return true }) log.Info("Created arch", "type", fType.String(), "collisions", collisions) return err } func (a *Aggregator) closeFiles(fType FileType) { a.fileLocks[fType].Lock() defer a.fileLocks[fType].Unlock() a.files[fType].Ascend(func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.decompressor != nil { item.decompressor.Close() } if item.index != nil { item.index.Close() } return true }) } func (a *Aggregator) Close() { close(a.aggChannel) a.aggWg.Wait() // Need to wait for the background aggregation to finish because it sends to merge channels // Drain channel before closing select { case <-a.mergeChannel: default: } close(a.mergeChannel) if a.changesets { // Drain channel before closing select { case <-a.historyChannel: default: } close(a.historyChannel) a.historyWg.Wait() } a.mergeWg.Wait() // Closing state files only after background aggregation goroutine is finished for fType := FirstType; fType < NumberOfTypes; fType++ { a.closeFiles(fType) } } // checkOverlapWithMinStart does not need to lock tree lock, because it is only used in the constructor of Aggregator func checkOverlapWithMinStart(treeName string, tree *btree.BTree, minStart uint64) error { if lastStateI := tree.Max(); lastStateI != nil { item := lastStateI.(*byEndBlockItem) if minStart != math.MaxUint64 && item.endBlock+1 != minStart { return fmt.Errorf("hole or overlap between %s state files and change files [%d-%d]", treeName, item.endBlock, minStart) } } return nil } func (a *Aggregator) readFromFiles(fType FileType, lock bool, blockNum uint64, filekey []byte, trace bool) ([]byte, uint64) { if lock { if fType == Commitment { for lockFType := FirstType; lockFType < NumberOfStateTypes; lockFType++ { a.fileLocks[lockFType].RLock() defer a.fileLocks[lockFType].RUnlock() } } else { a.fileLocks[fType].RLock() defer a.fileLocks[fType].RUnlock() } } h := a.archHasher arch := a.arches[fType] n := uint64(len(arch)) if n > 0 { h.Reset() h.Write(filekey) //nolint:errcheck p, _ := h.Sum128() p = p % n v := uint64(atomic.LoadUint32(&arch[p])) //fmt.Printf("Reading from %s arch key [%x]=%d, %d\n", fType.String(), filekey, p, arch[p]) if v == 0 { return nil, 0 } a.files[fType].AscendGreaterOrEqual(&byEndBlockItem{startBlock: v, endBlock: v}, func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.endBlock < blockNum { blockNum = item.endBlock } return false }) } var val []byte var startBlock uint64 a.files[fType].DescendLessOrEqual(&byEndBlockItem{endBlock: blockNum}, func(i btree.Item) bool { item := i.(*byEndBlockItem) if trace { fmt.Printf("read %s %x: search in file [%d-%d]\n", fType.String(), filekey, item.startBlock, item.endBlock) } if item.tree != nil { ai, ok := item.tree.Get(&AggregateItem{k: filekey}) if !ok { return true } if ai == nil { return true } val = ai.v startBlock = item.startBlock return false } if item.index.Empty() { return true } offset := item.indexReader.Lookup(filekey) g := item.getter g.Reset(offset) if g.HasNext() { if keyMatch, _ := g.Match(filekey); keyMatch { val, _ = g.Next(nil) if trace { fmt.Printf("read %s %x: found [%x] in file [%d-%d]\n", fType.String(), filekey, val, item.startBlock, item.endBlock) } startBlock = item.startBlock atomic.AddUint64(&a.fileHits, 1) return false } } atomic.AddUint64(&a.fileMisses, 1) return true }) if fType == Commitment { // Transform references if len(val) > 0 { accountPlainKeys, storagePlainKeys, err := commitment.BranchData(val).ExtractPlainKeys() if err != nil { panic(fmt.Errorf("value %x: %w", val, err)) } var transAccountPks [][]byte var transStoragePks [][]byte for _, accountPlainKey := range accountPlainKeys { var apkBuf []byte if len(accountPlainKey) == length.Addr { // Non-optimised key originating from a database record apkBuf = accountPlainKey } else { // Optimised key referencing a state file record (file number and offset within the file) fileI := int(accountPlainKey[0]) offset := decodeU64(accountPlainKey[1:]) apkBuf, _ = a.readByOffset(Account, fileI, offset) } transAccountPks = append(transAccountPks, apkBuf) } for _, storagePlainKey := range storagePlainKeys { var spkBuf []byte if len(storagePlainKey) == length.Addr+length.Hash { // Non-optimised key originating from a database record spkBuf = storagePlainKey } else { // Optimised key referencing a state file record (file number and offset within the file) fileI := int(storagePlainKey[0]) offset := decodeU64(storagePlainKey[1:]) //fmt.Printf("readbyOffset(comm file %d-%d) file=%d offset=%d\n", ii.startBlock, ii.endBlock, fileI, offset) spkBuf, _ = a.readByOffset(Storage, fileI, offset) } transStoragePks = append(transStoragePks, spkBuf) } if val, err = commitment.BranchData(val).ReplacePlainKeys(transAccountPks, transStoragePks, nil); err != nil { panic(err) } } } return val, startBlock } // readByOffset is assumed to be invoked under a read lock func (a *Aggregator) readByOffset(fType FileType, fileI int, offset uint64) ([]byte, []byte) { var key, val []byte fi := 0 a.files[fType].Ascend(func(i btree.Item) bool { if fi < fileI { fi++ return true } item := i.(*byEndBlockItem) //fmt.Printf("fileI=%d, file=%s.%d-%d\n", fileI, fType.String(), item.startBlock, item.endBlock) g := item.getter g.Reset(offset) key, _ = g.Next(nil) val, _ = g.Next(nil) return false }) return key, val } func (a *Aggregator) MakeStateReader(blockNum uint64, tx kv.Tx) *Reader { r := &Reader{ a: a, blockNum: blockNum, tx: tx, } return r } type Reader struct { a *Aggregator tx kv.Getter blockNum uint64 } func (r *Reader) ReadAccountData(addr []byte, trace bool) ([]byte, error) { v, err := r.tx.GetOne(kv.StateAccounts, addr) if err != nil { return nil, err } if v != nil { return v[4:], nil } v, _ = r.a.readFromFiles(Account, true /* lock */, r.blockNum, addr, trace) return v, nil } func (r *Reader) ReadAccountStorage(addr []byte, loc []byte, trace bool) ([]byte, error) { // Look in the summary table first dbkey := make([]byte, len(addr)+len(loc)) copy(dbkey[0:], addr) copy(dbkey[len(addr):], loc) v, err := r.tx.GetOne(kv.StateStorage, dbkey) if err != nil { return nil, err } if v != nil { if len(v) == 4 { return nil, nil } return v[4:], nil } v, _ = r.a.readFromFiles(Storage, true /* lock */, r.blockNum, dbkey, trace) return v, nil } func (r *Reader) ReadAccountCode(addr []byte, trace bool) ([]byte, error) { // Look in the summary table first v, err := r.tx.GetOne(kv.StateCode, addr) if err != nil { return nil, err } if v != nil { if len(v) == 4 { return nil, nil } return v[4:], nil } // Look in the files v, _ = r.a.readFromFiles(Code, true /* lock */, r.blockNum, addr, trace) return v, nil } func (r *Reader) ReadAccountCodeSize(addr []byte, trace bool) (int, error) { // Look in the summary table first v, err := r.tx.GetOne(kv.StateCode, addr) if err != nil { return 0, err } if v != nil { return len(v) - 4, nil } // Look in the files. TODO - use specialised function to only lookup size v, _ = r.a.readFromFiles(Code, true /* lock */, r.blockNum, addr, trace) return len(v), nil } type Writer struct { tx kv.RwTx a *Aggregator commTree *btree.BTreeG[*CommitmentItem] // BTree used for gathering commitment data changes [NumberOfStateTypes]Changes blockNum uint64 changeFileNum uint64 // Block number associated with the current change files. It is the last block number whose changes will go into that file } func (a *Aggregator) MakeStateWriter(beforeOn bool) *Writer { w := &Writer{ a: a, commTree: btree.NewG[*CommitmentItem](32, commitmentItemLess), } for fType := FirstType; fType < NumberOfStateTypes; fType++ { w.changes[fType].Init(fType.String(), a.aggregationStep, a.diffDir, w.a.changesets && fType != Commitment /* we do not unwind commitment ? */) } return w } func (w *Writer) Close() { typesLimit := Commitment if w.a.commitments { typesLimit = AccountHistory } for fType := FirstType; fType < typesLimit; fType++ { w.changes[fType].closeFiles() } } func (w *Writer) Reset(blockNum uint64, tx kv.RwTx) error { w.tx = tx w.blockNum = blockNum typesLimit := Commitment if w.a.commitments { typesLimit = AccountHistory } if blockNum > w.changeFileNum { for fType := FirstType; fType < typesLimit; fType++ { if err := w.changes[fType].closeFiles(); err != nil { return err } } if w.changeFileNum != 0 { w.a.changesBtree.ReplaceOrInsert(&ChangesItem{startBlock: w.changeFileNum + 1 - w.a.aggregationStep, endBlock: w.changeFileNum, fileCount: 12}) } } if w.changeFileNum == 0 || blockNum > w.changeFileNum { for fType := FirstType; fType < typesLimit; fType++ { if err := w.changes[fType].openFiles(blockNum, true /* write */); err != nil { return err } } w.changeFileNum = blockNum - (blockNum % w.a.aggregationStep) + w.a.aggregationStep - 1 } return nil } type CommitmentItem struct { plainKey []byte hashedKey []byte u commitment.Update } func commitmentItemLess(i, j *CommitmentItem) bool { return bytes.Compare(i.hashedKey, j.hashedKey) < 0 } func (i *CommitmentItem) Less(than btree.Item) bool { return bytes.Compare(i.hashedKey, than.(*CommitmentItem).hashedKey) < 0 } func (w *Writer) branchFn(prefix []byte) ([]byte, error) { for lockFType := FirstType; lockFType < NumberOfStateTypes; lockFType++ { w.a.fileLocks[lockFType].RLock() defer w.a.fileLocks[lockFType].RUnlock() } // Look in the summary table first mergedVal, err := w.tx.GetOne(kv.StateCommitment, prefix) if err != nil { return nil, err } if mergedVal != nil { mergedVal = mergedVal[4:] } // Look in the files and merge, while it becomes complete var startBlock = w.blockNum + 1 for mergedVal == nil || !commitment.BranchData(mergedVal).IsComplete() { if startBlock == 0 { panic(fmt.Sprintf("Incomplete branch data prefix [%x], mergeVal=[%x], startBlock=%d\n", commitment.CompactedKeyToHex(prefix), mergedVal, startBlock)) } var val commitment.BranchData val, startBlock = w.a.readFromFiles(Commitment, false /* lock */, startBlock-1, prefix, false /* trace */) if val == nil { if mergedVal == nil { return nil, nil } panic(fmt.Sprintf("Incomplete branch data prefix [%x], mergeVal=[%x], startBlock=%d\n", commitment.CompactedKeyToHex(prefix), mergedVal, startBlock)) } var err error //fmt.Printf("Pre-merge prefix [%x] [%x]+[%x], startBlock %d\n", commitment.CompactToHex(prefix), val, mergedVal, startBlock) if mergedVal == nil { mergedVal = val } else if mergedVal, err = val.MergeHexBranches(mergedVal, nil); err != nil { return nil, err } //fmt.Printf("Post-merge prefix [%x] [%x], startBlock %d\n", commitment.CompactToHex(prefix), mergedVal, startBlock) } if mergedVal == nil { return nil, nil } //fmt.Printf("Returning branch data prefix [%x], mergeVal=[%x], startBlock=%d\n", commitment.CompactToHex(prefix), mergedVal, startBlock) return mergedVal[2:], nil // Skip touchMap but keep afterMap } func bytesToUint64(buf []byte) (x uint64) { for i, b := range buf { x = x<<8 + uint64(b) if i == 7 { return } } return } func (w *Writer) accountFn(plainKey []byte, cell *commitment.Cell) error { // Look in the summary table first enc, err := w.tx.GetOne(kv.StateAccounts, plainKey) if err != nil { return err } if enc != nil { enc = enc[4:] } else { // Look in the files enc, _ = w.a.readFromFiles(Account, true /* lock */, w.blockNum, plainKey, false /* trace */) } cell.Nonce = 0 cell.Balance.Clear() copy(cell.CodeHash[:], commitment.EmptyCodeHash) if len(enc) > 0 { pos := 0 nonceBytes := int(enc[pos]) pos++ if nonceBytes > 0 { cell.Nonce = bytesToUint64(enc[pos : pos+nonceBytes]) pos += nonceBytes } balanceBytes := int(enc[pos]) pos++ if balanceBytes > 0 { cell.Balance.SetBytes(enc[pos : pos+balanceBytes]) } } enc, err = w.tx.GetOne(kv.StateCode, plainKey) if err != nil { return err } if enc != nil { enc = enc[4:] } else { // Look in the files enc, _ = w.a.readFromFiles(Code, true /* lock */, w.blockNum, plainKey, false /* trace */) } if len(enc) > 0 { w.a.keccak.Reset() w.a.keccak.Write(enc) w.a.keccak.(io.Reader).Read(cell.CodeHash[:]) } return nil } func (w *Writer) storageFn(plainKey []byte, cell *commitment.Cell) error { // Look in the summary table first enc, err := w.tx.GetOne(kv.StateStorage, plainKey) if err != nil { return err } if enc != nil { enc = enc[4:] } else { // Look in the files enc, _ = w.a.readFromFiles(Storage, true /* lock */, w.blockNum, plainKey, false /* trace */) } cell.StorageLen = len(enc) copy(cell.Storage[:], enc) return nil } func (w *Writer) captureCommitmentType(fType FileType, trace bool, f func(commTree *btree.BTreeG[*CommitmentItem], h hash.Hash, key, val []byte)) { lastOffsetKey := 0 lastOffsetVal := 0 for i, offsetKey := range w.changes[fType].keys.wordOffsets { offsetVal := w.changes[fType].after.wordOffsets[i] key := w.changes[fType].keys.words[lastOffsetKey:offsetKey] val := w.changes[fType].after.words[lastOffsetVal:offsetVal] if trace { fmt.Printf("captureCommitmentData %s [%x]=>[%x]\n", fType.String(), key, val) } f(w.commTree, w.a.keccak, key, val) lastOffsetKey = offsetKey lastOffsetVal = offsetVal } } func (w *Writer) captureCommitmentData(trace bool) { if trace { fmt.Printf("captureCommitmentData start w.commTree.Len()=%d\n", w.commTree.Len()) } w.captureCommitmentType(Code, trace, func(commTree *btree.BTreeG[*CommitmentItem], h hash.Hash, key, val []byte) { h.Reset() h.Write(key) hashedKey := h.Sum(nil) var c = &CommitmentItem{plainKey: common.Copy(key), hashedKey: make([]byte, len(hashedKey)*2)} for i, b := range hashedKey { c.hashedKey[i*2] = (b >> 4) & 0xf c.hashedKey[i*2+1] = b & 0xf } c.u.Flags = commitment.CODE_UPDATE item, found := commTree.Get(&CommitmentItem{hashedKey: c.hashedKey}) if found && item != nil { if item.u.Flags&commitment.BALANCE_UPDATE != 0 { c.u.Flags |= commitment.BALANCE_UPDATE c.u.Balance.Set(&item.u.Balance) } if item.u.Flags&commitment.NONCE_UPDATE != 0 { c.u.Flags |= commitment.NONCE_UPDATE c.u.Nonce = item.u.Nonce } if item.u.Flags == commitment.DELETE_UPDATE && len(val) == 0 { c.u.Flags = commitment.DELETE_UPDATE } else { h.Reset() h.Write(val) h.(io.Reader).Read(c.u.CodeHashOrStorage[:]) } } else { h.Reset() h.Write(val) h.(io.Reader).Read(c.u.CodeHashOrStorage[:]) } commTree.ReplaceOrInsert(c) }) w.captureCommitmentType(Account, trace, func(commTree *btree.BTreeG[*CommitmentItem], h hash.Hash, key, val []byte) { h.Reset() h.Write(key) hashedKey := h.Sum(nil) var c = &CommitmentItem{plainKey: common.Copy(key), hashedKey: make([]byte, len(hashedKey)*2)} for i, b := range hashedKey { c.hashedKey[i*2] = (b >> 4) & 0xf c.hashedKey[i*2+1] = b & 0xf } if len(val) == 0 { c.u.Flags = commitment.DELETE_UPDATE } else { c.u.DecodeForStorage(val) c.u.Flags = commitment.BALANCE_UPDATE | commitment.NONCE_UPDATE item, found := commTree.Get(&CommitmentItem{hashedKey: c.hashedKey}) if found && item != nil { if item.u.Flags&commitment.CODE_UPDATE != 0 { c.u.Flags |= commitment.CODE_UPDATE copy(c.u.CodeHashOrStorage[:], item.u.CodeHashOrStorage[:]) } } } commTree.ReplaceOrInsert(c) }) w.captureCommitmentType(Storage, trace, func(commTree *btree.BTreeG[*CommitmentItem], h hash.Hash, key, val []byte) { hashedKey := make([]byte, 2*length.Hash) h.Reset() h.Write(key[:length.Addr]) h.(io.Reader).Read(hashedKey[:length.Hash]) h.Reset() h.Write(key[length.Addr:]) h.(io.Reader).Read(hashedKey[length.Hash:]) var c = &CommitmentItem{plainKey: common.Copy(key), hashedKey: make([]byte, len(hashedKey)*2)} for i, b := range hashedKey { c.hashedKey[i*2] = (b >> 4) & 0xf c.hashedKey[i*2+1] = b & 0xf } c.u.ValLength = len(val) if len(val) > 0 { copy(c.u.CodeHashOrStorage[:], val) } if len(val) == 0 { c.u.Flags = commitment.DELETE_UPDATE } else { c.u.Flags = commitment.STORAGE_UPDATE } commTree.ReplaceOrInsert(c) }) if trace { fmt.Printf("captureCommitmentData end w.commTree.Len()=%d\n", w.commTree.Len()) } } // computeCommitment is computing the commitment to the state after // the change would have been applied. // It assumes that the state accessible via the aggregator has already been // modified with the new values // At the moment, it is specific version for hex merkle patricia tree commitment // but it will be extended to support other types of commitments func (w *Writer) computeCommitment(trace bool) ([]byte, error) { if trace { fmt.Printf("computeCommitment w.commTree.Len()=%d\n", w.commTree.Len()) } plainKeys := make([][]byte, w.commTree.Len()) hashedKeys := make([][]byte, w.commTree.Len()) updates := make([]commitment.Update, w.commTree.Len()) j := 0 w.commTree.Ascend(func(item *CommitmentItem) bool { plainKeys[j] = item.plainKey hashedKeys[j] = item.hashedKey updates[j] = item.u j++ return true }) if len(plainKeys) == 0 { return w.a.hph.RootHash() } w.a.hph.Reset() w.a.hph.ResetFns(w.branchFn, w.accountFn, w.storageFn) w.a.hph.SetTrace(trace) rootHash, branchNodeUpdates, err := w.a.hph.ProcessUpdates(plainKeys, hashedKeys, updates) if err != nil { return nil, err } for prefixStr, branchNodeUpdate := range branchNodeUpdates { if branchNodeUpdate == nil { continue } prefix := []byte(prefixStr) var prevV []byte var prevNum uint32 if prevV, err = w.tx.GetOne(kv.StateCommitment, prefix); err != nil { return nil, err } if prevV != nil { prevNum = binary.BigEndian.Uint32(prevV[:4]) } var original commitment.BranchData if prevV == nil { original, _ = w.a.readFromFiles(Commitment, true /* lock */, w.blockNum, prefix, false) } else { original = prevV[4:] } if original != nil { // try to merge previous (original) and current (branchNodeUpdate) into one update mergedVal, err := original.MergeHexBranches(branchNodeUpdate, nil) if err != nil { return nil, err } if w.a.trace { fmt.Printf("computeCommitment merge [%x] [%x]+[%x]=>[%x]\n", commitment.CompactedKeyToHex(prefix), original, branchNodeUpdate, mergedVal) } branchNodeUpdate = mergedVal } //fmt.Printf("computeCommitment set [%x] [%x]\n", commitment.CompactToHex(prefix), branchNodeUpdate) v := make([]byte, 4+len(branchNodeUpdate)) binary.BigEndian.PutUint32(v[:4], prevNum+1) copy(v[4:], branchNodeUpdate) if err = w.tx.Put(kv.StateCommitment, prefix, v); err != nil { return nil, err } if len(branchNodeUpdate) == 0 { w.changes[Commitment].delete(prefix, original) } else { if prevV == nil && len(original) == 0 { w.changes[Commitment].insert(prefix, branchNodeUpdate) } else { w.changes[Commitment].update(prefix, original, branchNodeUpdate) } } } return rootHash, nil } func (w *Writer) FinishTx(txNum uint64, trace bool) error { if w.a.commitments { w.captureCommitmentData(trace) } var err error for fType := FirstType; fType < Commitment; fType++ { if err = w.changes[fType].finish(txNum); err != nil { return fmt.Errorf("finish %sChanges: %w", fType.String(), err) } } return nil } func (w *Writer) ComputeCommitment(trace bool) ([]byte, error) { if !w.a.commitments { return nil, fmt.Errorf("commitments turned off") } comm, err := w.computeCommitment(trace) if err != nil { return nil, fmt.Errorf("compute commitment: %w", err) } w.commTree.Clear(true) if err = w.changes[Commitment].finish(w.blockNum); err != nil { return nil, fmt.Errorf("finish commChanges: %w", err) } return comm, nil } // Aggegate should be called to check if the aggregation is required, and // if it is required, perform it func (w *Writer) Aggregate(trace bool) error { if w.blockNum < w.a.unwindLimit+w.a.aggregationStep-1 { return nil } diff := w.blockNum - w.a.unwindLimit if (diff+1)%w.a.aggregationStep != 0 { return nil } if err := w.aggregateUpto(diff+1-w.a.aggregationStep, diff); err != nil { return fmt.Errorf("aggregateUpto(%d, %d): %w", diff+1-w.a.aggregationStep, diff, err) } return nil } func (w *Writer) UpdateAccountData(addr []byte, account []byte, trace bool) error { var prevNum uint32 prevV, err := w.tx.GetOne(kv.StateAccounts, addr) if err != nil { return err } if prevV != nil { prevNum = binary.BigEndian.Uint32(prevV[:4]) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Account, true /* lock */, w.blockNum, addr, trace) } else { original = prevV[4:] } if bytes.Equal(account, original) { // No change return nil } v := make([]byte, 4+len(account)) binary.BigEndian.PutUint32(v[:4], prevNum+1) copy(v[4:], account) if err = w.tx.Put(kv.StateAccounts, addr, v); err != nil { return err } if prevV == nil && len(original) == 0 { w.changes[Account].insert(addr, account) } else { w.changes[Account].update(addr, original, account) } if trace { w.a.trace = true w.a.tracedKeys[string(addr)] = struct{}{} } return nil } func (w *Writer) UpdateAccountCode(addr []byte, code []byte, trace bool) error { var prevNum uint32 prevV, err := w.tx.GetOne(kv.StateCode, addr) if err != nil { return err } if prevV != nil { prevNum = binary.BigEndian.Uint32(prevV[:4]) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Code, true /* lock */, w.blockNum, addr, trace) } else { original = prevV[4:] } v := make([]byte, 4+len(code)) binary.BigEndian.PutUint32(v[:4], prevNum+1) copy(v[4:], code) if err = w.tx.Put(kv.StateCode, addr, v); err != nil { return err } if prevV == nil && len(original) == 0 { w.changes[Code].insert(addr, code) } else { w.changes[Code].update(addr, original, code) } if trace { w.a.trace = true w.a.tracedKeys[string(addr)] = struct{}{} } return nil } type CursorType uint8 const ( FILE_CURSOR CursorType = iota DB_CURSOR TREE_CURSOR ) // CursorItem is the item in the priority queue used to do merge interation // over storage of a given account type CursorItem struct { c kv.Cursor dg *compress.Getter tree *btree.BTreeG[*AggregateItem] key []byte val []byte endBlock uint64 t CursorType // Whether this item represents state file or DB record, or tree } type CursorHeap []*CursorItem func (ch CursorHeap) Len() int { return len(ch) } func (ch CursorHeap) Less(i, j int) bool { cmp := bytes.Compare(ch[i].key, ch[j].key) if cmp == 0 { // when keys match, the items with later blocks are preferred return ch[i].endBlock > ch[j].endBlock } return cmp < 0 } func (ch *CursorHeap) Swap(i, j int) { (*ch)[i], (*ch)[j] = (*ch)[j], (*ch)[i] } func (ch *CursorHeap) Push(x interface{}) { *ch = append(*ch, x.(*CursorItem)) } func (ch *CursorHeap) Pop() interface{} { old := *ch n := len(old) x := old[n-1] old[n-1] = nil *ch = old[0 : n-1] return x } func (w *Writer) deleteAccount(addr []byte, trace bool) (bool, error) { prevV, err := w.tx.GetOne(kv.StateAccounts, addr) if err != nil { return false, err } var prevNum uint32 if prevV != nil { prevNum = binary.BigEndian.Uint32(prevV[:4]) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Account, true /* lock */, w.blockNum, addr, trace) if original == nil { return false, nil } } else { original = prevV[4:] } v := make([]byte, 4) binary.BigEndian.PutUint32(v[:4], prevNum+1) if err = w.tx.Put(kv.StateAccounts, addr, v); err != nil { return false, err } w.changes[Account].delete(addr, original) return true, nil } func (w *Writer) deleteCode(addr []byte, trace bool) error { prevV, err := w.tx.GetOne(kv.StateCode, addr) if err != nil { return err } var prevNum uint32 if prevV != nil { prevNum = binary.BigEndian.Uint32(prevV[:4]) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Code, true /* lock */, w.blockNum, addr, trace) if original == nil { // Nothing to do return nil } } else { original = prevV[4:] } v := make([]byte, 4) binary.BigEndian.PutUint32(v[:4], prevNum+1) if err = w.tx.Put(kv.StateCode, addr, v); err != nil { return err } w.changes[Code].delete(addr, original) return nil } func (w *Writer) DeleteAccount(addr []byte, trace bool) error { deleted, err := w.deleteAccount(addr, trace) if err != nil { return err } if !deleted { return nil } w.a.fileLocks[Storage].RLock() defer w.a.fileLocks[Storage].RUnlock() w.deleteCode(addr, trace) // Find all storage items for this address var cp CursorHeap heap.Init(&cp) var c kv.Cursor if c, err = w.tx.Cursor(kv.StateStorage); err != nil { return err } defer c.Close() var k, v []byte if k, v, err = c.Seek(addr); err != nil { return err } if k != nil && bytes.HasPrefix(k, addr) { heap.Push(&cp, &CursorItem{t: DB_CURSOR, key: common.Copy(k), val: common.Copy(v), c: c, endBlock: w.blockNum}) } w.a.files[Storage].Ascend(func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.tree != nil { item.tree.AscendGreaterOrEqual(&AggregateItem{k: addr}, func(aitem *AggregateItem) bool { if !bytes.HasPrefix(aitem.k, addr) { return false } if len(aitem.k) == len(addr) { return true } heap.Push(&cp, &CursorItem{t: TREE_CURSOR, key: aitem.k, val: aitem.v, tree: item.tree, endBlock: item.endBlock}) return false }) return true } if item.index.Empty() { return true } offset := item.indexReader.Lookup(addr) g := item.getter g.Reset(offset) if g.HasNext() { if keyMatch, _ := g.Match(addr); !keyMatch { //fmt.Printf("DeleteAccount %x - not found anchor in file [%d-%d]\n", addr, item.startBlock, item.endBlock) return true } g.Skip() } if g.HasNext() { key, _ := g.Next(nil) if bytes.HasPrefix(key, addr) { val, _ := g.Next(nil) heap.Push(&cp, &CursorItem{t: FILE_CURSOR, key: key, val: val, dg: g, endBlock: item.endBlock}) } } return true }) for cp.Len() > 0 { lastKey := common.Copy(cp[0].key) lastVal := common.Copy(cp[0].val) // Advance all the items that have this key (including the top) for cp.Len() > 0 && bytes.Equal(cp[0].key, lastKey) { ci1 := cp[0] switch ci1.t { case FILE_CURSOR: if ci1.dg.HasNext() { ci1.key, _ = ci1.dg.Next(ci1.key[:0]) if bytes.HasPrefix(ci1.key, addr) { ci1.val, _ = ci1.dg.Next(ci1.val[:0]) heap.Fix(&cp, 0) } else { heap.Pop(&cp) } } else { heap.Pop(&cp) } case DB_CURSOR: k, v, err = ci1.c.Next() if err != nil { return err } if k != nil && bytes.HasPrefix(k, addr) { ci1.key = common.Copy(k) ci1.val = common.Copy(v) heap.Fix(&cp, 0) } else { heap.Pop(&cp) } case TREE_CURSOR: skip := true var aitem *AggregateItem ci1.tree.AscendGreaterOrEqual(&AggregateItem{k: ci1.key}, func(ai *AggregateItem) bool { if skip { skip = false return true } aitem = ai return false }) if aitem != nil && bytes.HasPrefix(aitem.k, addr) { ci1.key = aitem.k ci1.val = aitem.v heap.Fix(&cp, 0) } else { heap.Pop(&cp) } } } var prevV []byte prevV, err = w.tx.GetOne(kv.StateStorage, lastKey) if err != nil { return err } var prevNum uint32 if prevV != nil { prevNum = binary.BigEndian.Uint32(prevV[:4]) } v = make([]byte, 4) binary.BigEndian.PutUint32(v[:4], prevNum+1) if err = w.tx.Put(kv.StateStorage, lastKey, v); err != nil { return err } w.changes[Storage].delete(lastKey, lastVal) } if trace { w.a.trace = true w.a.tracedKeys[string(addr)] = struct{}{} } return nil } func (w *Writer) WriteAccountStorage(addr, loc []byte, value []byte, trace bool) error { dbkey := make([]byte, len(addr)+len(loc)) copy(dbkey[0:], addr) copy(dbkey[len(addr):], loc) prevV, err := w.tx.GetOne(kv.StateStorage, dbkey) if err != nil { return err } var prevNum uint32 if prevV != nil { prevNum = binary.BigEndian.Uint32(prevV[:4]) } var original []byte if prevV == nil { original, _ = w.a.readFromFiles(Storage, true /* lock */, w.blockNum, dbkey, trace) } else { original = prevV[4:] } if bytes.Equal(value, original) { // No change return nil } v := make([]byte, 4+len(value)) binary.BigEndian.PutUint32(v[:4], prevNum+1) copy(v[4:], value) if err = w.tx.Put(kv.StateStorage, dbkey, v); err != nil { return err } if prevV == nil && len(original) == 0 { w.changes[Storage].insert(dbkey, value) } else { w.changes[Storage].update(dbkey, original, value) } if trace { w.a.trace = true w.a.tracedKeys[string(dbkey)] = struct{}{} } return nil } // findLargestMerge looks through the state files of the speficied type and determines the largest merge that can be undertaken // a state file block [a; b] is valid if its length is a divisor of its starting block, or `(b-a+1) = 0 mod a` func (a *Aggregator) findLargestMerge(fType FileType, maxTo uint64, maxSpan uint64) (toAggregate []*byEndBlockItem, pre []*byEndBlockItem, post []*byEndBlockItem, aggFrom uint64, aggTo uint64) { a.fileLocks[fType].RLock() defer a.fileLocks[fType].RUnlock() var maxEndBlock uint64 a.files[fType].DescendLessOrEqual(&byEndBlockItem{endBlock: maxTo}, func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.decompressor == nil { return true } maxEndBlock = item.endBlock return false }) if maxEndBlock == 0 { return } a.files[fType].Ascend(func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.decompressor == nil { return true // Skip B-tree based items } pre = append(pre, item) if aggTo == 0 { var doubleEnd uint64 nextDouble := item.endBlock for nextDouble <= maxEndBlock && nextDouble-item.startBlock < maxSpan { doubleEnd = nextDouble nextDouble = doubleEnd + (doubleEnd - item.startBlock) + 1 } if doubleEnd != item.endBlock { aggFrom = item.startBlock aggTo = doubleEnd } else { post = append(post, item) return true } } toAggregate = append(toAggregate, item) return item.endBlock < aggTo }) return } func (a *Aggregator) computeAggregation(fType FileType, toAggregate []*byEndBlockItem, aggFrom uint64, aggTo uint64, valTransform func(val, transValBuf commitment.BranchData) ([]byte, error), mergeFunc commitmentMerger, valCompressed bool, withIndex bool, prefixLen int) (*byEndBlockItem, error) { var item2 = &byEndBlockItem{startBlock: aggFrom, endBlock: aggTo} var cp CursorHeap heap.Init(&cp) for _, ag := range toAggregate { g := ag.decompressor.MakeGetter() g.Reset(0) if g.HasNext() { key, _ := g.Next(nil) val, _ := g.Next(nil) heap.Push(&cp, &CursorItem{t: FILE_CURSOR, dg: g, key: key, val: val, endBlock: ag.endBlock}) } } var err error var count int if item2.decompressor, count, err = a.mergeIntoStateFile(&cp, prefixLen, fType, aggFrom, aggTo, a.diffDir, valTransform, mergeFunc, valCompressed); err != nil { return nil, fmt.Errorf("mergeIntoStateFile %s [%d-%d]: %w", fType.String(), aggFrom, aggTo, err) } item2.getter = item2.decompressor.MakeGetter() item2.getterMerge = item2.decompressor.MakeGetter() if withIndex { idxPath := filepath.Join(a.diffDir, fmt.Sprintf("%s.%d-%d.idx", fType.String(), aggFrom, aggTo)) if item2.index, err = buildIndex(item2.decompressor, idxPath, a.diffDir, count); err != nil { return nil, fmt.Errorf("mergeIntoStateFile buildIndex %s [%d-%d]: %w", fType.String(), aggFrom, aggTo, err) } item2.indexReader = recsplit.NewIndexReader(item2.index) item2.readerMerge = recsplit.NewIndexReader(item2.index) } return item2, nil } func createDatAndIndex(treeName string, diffDir string, bt *btree.BTreeG[*AggregateItem], blockFrom uint64, blockTo uint64) (*compress.Decompressor, *recsplit.Index, error) { datPath := filepath.Join(diffDir, fmt.Sprintf("%s.%d-%d.dat", treeName, blockFrom, blockTo)) idxPath := filepath.Join(diffDir, fmt.Sprintf("%s.%d-%d.idx", treeName, blockFrom, blockTo)) count, err := btreeToFile(bt, datPath, diffDir, false /* trace */, 1 /* workers */) if err != nil { return nil, nil, fmt.Errorf("createDatAndIndex %s build btree: %w", treeName, err) } var d *compress.Decompressor if d, err = compress.NewDecompressor(datPath); err != nil { return nil, nil, fmt.Errorf("createDatAndIndex %s decompressor: %w", treeName, err) } var index *recsplit.Index if index, err = buildIndex(d, idxPath, diffDir, count); err != nil { return nil, nil, fmt.Errorf("createDatAndIndex %s buildIndex: %w", treeName, err) } return d, index, nil } func (a *Aggregator) addLocked(fType FileType, item *byEndBlockItem) { a.fileLocks[fType].Lock() defer a.fileLocks[fType].Unlock() a.files[fType].ReplaceOrInsert(item) } func (w *Writer) aggregateUpto(blockFrom, blockTo uint64) error { // React on any previous error of aggregation or merge select { case err := <-w.a.aggError: return err case err := <-w.a.mergeError: return err case err := <-w.a.historyError: return err default: } typesLimit := Commitment if w.a.commitments { typesLimit = AccountHistory } t0 := time.Now() t := time.Now() i := w.a.changesBtree.Get(&ChangesItem{startBlock: blockFrom, endBlock: blockTo}) if i == nil { return fmt.Errorf("did not find change files for [%d-%d], w.a.changesBtree.Len() = %d", blockFrom, blockTo, w.a.changesBtree.Len()) } item := i.(*ChangesItem) if item.startBlock != blockFrom { return fmt.Errorf("expected change files[%d-%d], got [%d-%d]", blockFrom, blockTo, item.startBlock, item.endBlock) } w.a.changesBtree.Delete(i) var aggTask AggregationTask for fType := FirstType; fType < typesLimit; fType++ { aggTask.changes[fType].Init(fType.String(), w.a.aggregationStep, w.a.diffDir, w.a.changesets && fType != Commitment) } var err error for fType := FirstType; fType < typesLimit; fType++ { var prefixLen int if fType == Storage { prefixLen = length.Addr } var commitMerger commitmentMerger if fType == Commitment { commitMerger = mergeCommitments } if aggTask.bt[fType], err = aggTask.changes[fType].aggregate(blockFrom, blockTo, prefixLen, w.tx, fType.Table(), commitMerger); err != nil { return fmt.Errorf("aggregate %sChanges: %w", fType.String(), err) } } aggTask.blockFrom = blockFrom aggTask.blockTo = blockTo aggTime := time.Since(t) t = time.Now() // At this point, all the changes are gathered in 4 B-trees (accounts, code, storage and commitment) and removed from the database // What follows can be done in the 1st background goroutine for fType := FirstType; fType < typesLimit; fType++ { if fType < NumberOfStateTypes { w.a.updateArch(aggTask.bt[fType], fType, uint32(aggTask.blockTo)) } } updateArchTime := time.Since(t) t = time.Now() for fType := FirstType; fType < typesLimit; fType++ { w.a.addLocked(fType, &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo, tree: aggTask.bt[fType]}) } switchTime := time.Since(t) w.a.aggChannel <- &aggTask handoverTime := time.Since(t0) if handoverTime > time.Second { log.Info("Long handover to background aggregation", "from", blockFrom, "to", blockTo, "composition", aggTime, "arch update", updateArchTime, "switch", switchTime) } return nil } // mergeIntoStateFile assumes that all entries in the cp heap have type FILE_CURSOR func (a *Aggregator) mergeIntoStateFile(cp *CursorHeap, prefixLen int, fType FileType, startBlock, endBlock uint64, dir string, valTransform func(val, transValBuf commitment.BranchData) ([]byte, error), mergeFunc commitmentMerger, valCompressed bool, ) (*compress.Decompressor, int, error) { datPath := filepath.Join(dir, fmt.Sprintf("%s.%d-%d.dat", fType.String(), startBlock, endBlock)) comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datPath, dir, compress.MinPatternScore, 1, log.LvlDebug) if err != nil { return nil, 0, fmt.Errorf("compressor %s: %w", datPath, err) } defer comp.Close() count := 0 // In the loop below, the pair `keyBuf=>valBuf` is always 1 item behind `lastKey=>lastVal`. // `lastKey` and `lastVal` are taken from the top of the multi-way merge (assisted by the CursorHeap cp), but not processed right away // instead, the pair from the previous iteration is processed first - `keyBuf=>valBuf`. After that, `keyBuf` and `valBuf` are assigned // to `lastKey` and `lastVal` correspondingly, and the next step of multi-way merge happens. Therefore, after the multi-way merge loop // (when CursorHeap cp is empty), there is a need to process the last pair `keyBuf=>valBuf`, because it was one step behind var keyBuf, valBuf, transValBuf []byte for cp.Len() > 0 { lastKey := common.Copy((*cp)[0].key) lastVal := common.Copy((*cp)[0].val) var mergedOnce bool if a.trace { if _, ok := a.tracedKeys[string(lastKey)]; ok { fmt.Printf("looking at key %x val [%x] endBlock %d to merge into [%d-%d]\n", lastKey, lastVal, (*cp)[0].endBlock, startBlock, endBlock) } } // Advance all the items that have this key (including the top) for cp.Len() > 0 && bytes.Equal((*cp)[0].key, lastKey) { ci1 := (*cp)[0] if a.trace { if _, ok := a.tracedKeys[string(ci1.key)]; ok { fmt.Printf("skipping same key %x val [%x] endBlock %d to merge into [%d-%d]\n", ci1.key, ci1.val, ci1.endBlock, startBlock, endBlock) } } if ci1.t != FILE_CURSOR { return nil, 0, fmt.Errorf("mergeIntoStateFile: cursor of unexpected type: %d", ci1.t) } if mergedOnce { //fmt.Printf("mergeIntoStateFile pre-merge prefix [%x], [%x]+[%x]\n", commitment.CompactToHex(lastKey), ci1.val, lastVal) if lastVal, err = mergeFunc(ci1.val, lastVal, nil); err != nil { return nil, 0, fmt.Errorf("mergeIntoStateFile: merge values: %w", err) } //fmt.Printf("mergeIntoStateFile post-merge prefix [%x], [%x]\n", commitment.CompactToHex(lastKey), lastVal) } else { mergedOnce = true } if ci1.dg.HasNext() { ci1.key, _ = ci1.dg.Next(ci1.key[:0]) if valCompressed { ci1.val, _ = ci1.dg.Next(ci1.val[:0]) } else { ci1.val, _ = ci1.dg.NextUncompressed() } heap.Fix(cp, 0) } else { heap.Pop(cp) } } var skip bool switch fType { case Storage: // Inside storage files, there is a special item with empty value, and the key equal to the contract's address // This special item is inserted before the contract storage items, in order to find them using un-ordered index // (for the purposes of SELF-DESTRUCT and some RPC methods that require enumeration of contract storage) // We will only skip this special item if there are no more corresponding storage items left // (this is checked further down with `bytes.HasPrefix(lastKey, keyBuf)`) skip = startBlock == 0 && len(lastVal) == 0 && len(lastKey) != prefixLen case Commitment: // For commitments, the 3rd and 4th bytes of the value (zero-based 2 and 3) contain so-called `afterMap` // Its bit are set for children that are present in the tree, and unset for those that are not (deleted, for example) // If all bits are zero (check below), this branch can be skipped, since it is empty skip = startBlock == 0 && len(lastVal) >= 4 && lastVal[2] == 0 && lastVal[3] == 0 case AccountHistory, StorageHistory, CodeHistory: skip = false default: // For the rest of types, empty value means deletion skip = startBlock == 0 && len(lastVal) == 0 } if skip { // Deleted marker can be skipped if we merge into the first file, except for the storage addr marker if _, ok := a.tracedKeys[string(keyBuf)]; ok { fmt.Printf("skipped key %x for [%d-%d]\n", keyBuf, startBlock, endBlock) } } else { // The check `bytes.HasPrefix(lastKey, keyBuf)` is checking whether the `lastKey` is the first item // of some contract's storage, and `keyBuf` (the item just before that) is the special item with the // key being contract's address. If so, the special item (keyBuf => []) needs to be preserved if keyBuf != nil && (prefixLen == 0 || len(keyBuf) != prefixLen || bytes.HasPrefix(lastKey, keyBuf)) { if err = comp.AddWord(keyBuf); err != nil { return nil, 0, err } if a.trace { if _, ok := a.tracedKeys[string(keyBuf)]; ok { fmt.Printf("merge key %x val [%x] into [%d-%d]\n", keyBuf, valBuf, startBlock, endBlock) } } count++ // Only counting keys, not values if valTransform != nil { if transValBuf, err = valTransform(valBuf, transValBuf[:0]); err != nil { return nil, 0, fmt.Errorf("mergeIntoStateFile -valTransform [%x]: %w", valBuf, err) } if err = comp.AddWord(transValBuf); err != nil { return nil, 0, err } } else if valCompressed { if err = comp.AddWord(valBuf); err != nil { return nil, 0, err } } else { if err = comp.AddUncompressedWord(valBuf); err != nil { return nil, 0, err } } //if fType == Storage { // fmt.Printf("merge %s.%d-%d [%x]=>[%x]\n", fType.String(), startBlock, endBlock, keyBuf, valBuf) //} } keyBuf = append(keyBuf[:0], lastKey...) valBuf = append(valBuf[:0], lastVal...) } } if keyBuf != nil { if err = comp.AddWord(keyBuf); err != nil { return nil, 0, err } if a.trace { if _, ok := a.tracedKeys[string(keyBuf)]; ok { fmt.Printf("merge key %x val [%x] into [%d-%d]\n", keyBuf, valBuf, startBlock, endBlock) } } count++ // Only counting keys, not values if valTransform != nil { if transValBuf, err = valTransform(valBuf, transValBuf[:0]); err != nil { return nil, 0, fmt.Errorf("mergeIntoStateFile valTransform [%x]: %w", valBuf, err) } if err = comp.AddWord(transValBuf); err != nil { return nil, 0, err } } else if valCompressed { if err = comp.AddWord(valBuf); err != nil { return nil, 0, err } } else { if err = comp.AddUncompressedWord(valBuf); err != nil { return nil, 0, err } } //if fType == Storage { // fmt.Printf("merge %s.%d-%d [%x]=>[%x]\n", fType.String(), startBlock, endBlock, keyBuf, valBuf) //} } if err = comp.Compress(); err != nil { return nil, 0, err } var d *compress.Decompressor if d, err = compress.NewDecompressor(datPath); err != nil { return nil, 0, fmt.Errorf("decompressor: %w", err) } return d, count, nil } func (a *Aggregator) stats(fType FileType) (count int, datSize, idxSize int64) { a.fileLocks[fType].RLock() defer a.fileLocks[fType].RUnlock() count = 0 datSize = 0 idxSize = 0 a.files[fType].Ascend(func(i btree.Item) bool { item := i.(*byEndBlockItem) if item.decompressor != nil { count++ datSize += item.decompressor.Size() count++ idxSize += item.index.Size() } return true }) return } type FilesStats struct { AccountsCount int AccountsDatSize int64 AccountsIdxSize int64 CodeCount int CodeDatSize int64 CodeIdxSize int64 StorageCount int StorageDatSize int64 StorageIdxSize int64 CommitmentCount int CommitmentDatSize int64 CommitmentIdxSize int64 Hits uint64 Misses uint64 } func (a *Aggregator) Stats() FilesStats { var fs FilesStats fs.AccountsCount, fs.AccountsDatSize, fs.AccountsIdxSize = a.stats(Account) fs.CodeCount, fs.CodeDatSize, fs.CodeIdxSize = a.stats(Code) fs.StorageCount, fs.StorageDatSize, fs.StorageIdxSize = a.stats(Storage) fs.CommitmentCount, fs.CommitmentDatSize, fs.CommitmentIdxSize = a.stats(Commitment) fs.Hits = atomic.LoadUint64(&a.fileHits) fs.Misses = atomic.LoadUint64(&a.fileMisses) return fs }