erigon-pulse/state/domain_committed.go

644 lines
19 KiB
Go
Raw Normal View History

package state
import (
"bytes"
"container/heap"
"context"
"encoding/binary"
"fmt"
"hash"
"path/filepath"
"github.com/google/btree"
"github.com/ledgerwatch/log/v3"
"golang.org/x/crypto/sha3"
"github.com/ledgerwatch/erigon-lib/commitment"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/recsplit"
)
// Defines how to evaluate commitments
type CommitmentMode uint
const (
CommitmentModeDisabled CommitmentMode = 0
CommitmentModeDirect CommitmentMode = 1
CommitmentModeUpdate CommitmentMode = 2
)
type ValueMerger func(prev, current []byte) (merged []byte, err error)
type DomainCommitted struct {
*Domain
mode CommitmentMode
trace bool
commTree *btree.BTreeG[*CommitmentItem]
keccak hash.Hash
patriciaTrie *commitment.HexPatriciaHashed
keyReplaceFn ValueMerger // defines logic performed with stored values during files merge
branchMerger *commitment.BranchMerger
}
func NewCommittedDomain(d *Domain, mode CommitmentMode) *DomainCommitted {
return &DomainCommitted{
Domain: d,
patriciaTrie: commitment.NewHexPatriciaHashed(length.Addr, nil, nil, nil),
commTree: btree.NewG[*CommitmentItem](32, commitmentItemLess),
keccak: sha3.NewLegacyKeccak256(),
mode: mode,
branchMerger: commitment.NewHexBranchMerger(8192),
}
}
func (d *DomainCommitted) SetKeyReplacer(vm ValueMerger) { d.keyReplaceFn = vm }
func (d *DomainCommitted) SetCommitmentMode(m CommitmentMode) { d.mode = m }
// TouchPlainKey marks plainKey as updated and applies different fn for different key types
// (different behaviour for Code, Account and Storage key modifications).
func (d *DomainCommitted) TouchPlainKey(key, val []byte, fn func(c *CommitmentItem, val []byte)) {
if d.mode == CommitmentModeDisabled {
return
}
c := &CommitmentItem{plainKey: common.Copy(key), hashedKey: d.hashAndNibblizeKey(key)}
if d.mode > CommitmentModeDirect {
fn(c, val)
}
d.commTree.ReplaceOrInsert(c)
}
func (d *DomainCommitted) TouchPlainKeyAccount(c *CommitmentItem, val []byte) {
if len(val) == 0 {
c.update.Flags = commitment.DELETE_UPDATE
return
}
c.update.DecodeForStorage(val)
c.update.Flags = commitment.BALANCE_UPDATE | commitment.NONCE_UPDATE
item, found := d.commTree.Get(&CommitmentItem{hashedKey: c.hashedKey})
if !found {
return
}
if item.update.Flags&commitment.CODE_UPDATE != 0 {
c.update.Flags |= commitment.CODE_UPDATE
copy(c.update.CodeHashOrStorage[:], item.update.CodeHashOrStorage[:])
}
}
func (d *DomainCommitted) TouchPlainKeyStorage(c *CommitmentItem, val []byte) {
c.update.ValLength = len(val)
if len(val) == 0 {
c.update.Flags = commitment.DELETE_UPDATE
} else {
c.update.Flags = commitment.STORAGE_UPDATE
copy(c.update.CodeHashOrStorage[:], val)
}
}
func (d *DomainCommitted) TouchPlainKeyCode(c *CommitmentItem, val []byte) {
c.update.Flags = commitment.CODE_UPDATE
item, found := d.commTree.Get(c)
if !found {
d.keccak.Reset()
d.keccak.Write(val)
copy(c.update.CodeHashOrStorage[:], d.keccak.Sum(nil))
return
}
if item.update.Flags&commitment.BALANCE_UPDATE != 0 {
c.update.Flags |= commitment.BALANCE_UPDATE
c.update.Balance.Set(&item.update.Balance)
}
if item.update.Flags&commitment.NONCE_UPDATE != 0 {
c.update.Flags |= commitment.NONCE_UPDATE
c.update.Nonce = item.update.Nonce
}
if item.update.Flags == commitment.DELETE_UPDATE && len(val) == 0 {
c.update.Flags = commitment.DELETE_UPDATE
} else {
d.keccak.Reset()
d.keccak.Write(val)
copy(c.update.CodeHashOrStorage[:], d.keccak.Sum(nil))
}
}
type CommitmentItem struct {
plainKey []byte
hashedKey []byte
update commitment.Update
}
func commitmentItemLess(i, j *CommitmentItem) bool {
return bytes.Compare(i.hashedKey, j.hashedKey) < 0
}
// Returns list of both plain and hashed keys. If .mode is CommitmentModeUpdate, updates also returned.
func (d *DomainCommitted) TouchedKeyList() ([][]byte, [][]byte, []commitment.Update) {
plainKeys := make([][]byte, d.commTree.Len())
hashedKeys := make([][]byte, d.commTree.Len())
updates := make([]commitment.Update, d.commTree.Len())
j := 0
d.commTree.Ascend(func(item *CommitmentItem) bool {
plainKeys[j] = item.plainKey
hashedKeys[j] = item.hashedKey
updates[j] = item.update
j++
return true
})
d.commTree.Clear(true)
return plainKeys, hashedKeys, updates
}
// TODO(awskii): let trie define hashing function
func (d *DomainCommitted) hashAndNibblizeKey(key []byte) []byte {
hashedKey := make([]byte, length.Hash)
d.keccak.Reset()
d.keccak.Write(key[:length.Addr])
copy(hashedKey[:length.Hash], d.keccak.Sum(nil))
if len(key[length.Addr:]) > 0 {
hashedKey = append(hashedKey, make([]byte, length.Hash)...)
d.keccak.Reset()
d.keccak.Write(key[length.Addr:])
copy(hashedKey[length.Hash:], d.keccak.Sum(nil))
}
nibblized := make([]byte, len(hashedKey)*2)
for i, b := range hashedKey {
nibblized[i*2] = (b >> 4) & 0xf
nibblized[i*2+1] = b & 0xf
}
return nibblized
}
func (d *DomainCommitted) storeCommitmentState(blockNum, txNum uint64) error {
state, err := d.patriciaTrie.EncodeCurrentState(nil)
if err != nil {
return err
}
cs := &commitmentState{txNum: txNum, trieState: state, blockNum: blockNum}
encoded, err := cs.Encode()
if err != nil {
return err
}
var stepbuf [2]byte
step := uint16(txNum / d.aggregationStep)
binary.BigEndian.PutUint16(stepbuf[:], step)
if err = d.Domain.Put(keyCommitmentState, stepbuf[:], encoded); err != nil {
return err
}
return nil
}
// nolint
func (d *DomainCommitted) replaceKeyWithReference(fullKey, shortKey []byte, typeAS string, list ...*filesItem) bool {
numBuf := [2]byte{}
var found bool
for _, item := range list {
g := item.decompressor.MakeGetter()
index := recsplit.NewIndexReader(item.index)
offset := index.Lookup(fullKey)
g.Reset(offset)
if !g.HasNext() {
continue
}
if keyMatch, _ := g.Match(fullKey); keyMatch {
step := uint16(item.endTxNum / d.aggregationStep)
binary.BigEndian.PutUint16(numBuf[:], step)
shortKey = encodeU64(offset, numBuf[:])
if d.trace {
fmt.Printf("replacing %s [%x] => {%x} [step=%d, offset=%d, file=%s.%d-%d]\n", typeAS, fullKey, shortKey, step, offset, typeAS, item.startTxNum, item.endTxNum)
}
found = true
break
}
}
return found
}
func (d *DomainCommitted) lookupShortenedKey(shortKey, fullKey []byte, typAS string, list []*filesItem) bool {
fileStep, offset := shortenedKey(shortKey)
expected := uint64(fileStep) * d.aggregationStep
var size uint64
switch typAS {
case "account":
size = length.Addr
case "storage":
size = length.Addr + length.Hash
default:
return false
}
var found bool
for _, item := range list {
if item.startTxNum > expected || item.endTxNum < expected {
continue
}
g := item.decompressor.MakeGetter()
if uint64(g.Size()) <= offset+size {
continue
}
g.Reset(offset)
fullKey, _ = g.Next(fullKey[:0])
if d.trace {
fmt.Printf("offsetToKey %s [%x]=>{%x} step=%d offset=%d, file=%s.%d-%d.kv\n", typAS, fullKey, shortKey, fileStep, offset, typAS, item.startTxNum, item.endTxNum)
}
found = true
break
}
return found
}
// commitmentValTransform parses the value of the commitment record to extract references
// to accounts and storage items, then looks them up in the new, merged files, and replaces them with
// the updated references
func (d *DomainCommitted) commitmentValTransform(files *SelectedStaticFiles, merged *MergedFiles, val commitment.BranchData) ([]byte, error) {
if len(val) == 0 {
return nil, nil
}
accountPlainKeys, storagePlainKeys, err := val.ExtractPlainKeys()
if err != nil {
return nil, err
}
transAccountPks := make([][]byte, 0, len(accountPlainKeys))
var apkBuf, spkBuf []byte
for _, accountPlainKey := range accountPlainKeys {
if len(accountPlainKey) == length.Addr {
// Non-optimised key originating from a database record
apkBuf = append(apkBuf[:0], accountPlainKey...)
} else {
f := d.lookupShortenedKey(accountPlainKey, apkBuf, "account", files.accounts)
if !f {
fmt.Printf("lost key %x\n", accountPlainKeys)
}
}
d.replaceKeyWithReference(apkBuf, accountPlainKey, "account", merged.accounts)
transAccountPks = append(transAccountPks, accountPlainKey)
}
transStoragePks := make([][]byte, 0, len(storagePlainKeys))
for _, storagePlainKey := range storagePlainKeys {
if len(storagePlainKey) == length.Addr+length.Hash {
// Non-optimised key originating from a database record
spkBuf = append(spkBuf[:0], storagePlainKey...)
} else {
// Optimised key referencing a state file record (file number and offset within the file)
f := d.lookupShortenedKey(storagePlainKey, spkBuf, "storage", files.storage)
if !f {
fmt.Printf("lost skey %x\n", storagePlainKey)
}
}
d.replaceKeyWithReference(spkBuf, storagePlainKey, "storage", merged.storage)
transStoragePks = append(transStoragePks, storagePlainKey)
}
transValBuf, err := val.ReplacePlainKeys(transAccountPks, transStoragePks, nil)
if err != nil {
return nil, err
}
return transValBuf, nil
}
func (d *DomainCommitted) mergeFiles(ctx context.Context, oldFiles SelectedStaticFiles, mergedFiles MergedFiles, r DomainRanges, workers int) (valuesIn, indexIn, historyIn *filesItem, err error) {
if !r.any() {
return
}
valuesFiles := oldFiles.commitment
indexFiles := oldFiles.commitmentIdx
historyFiles := oldFiles.commitmentHist
var comp *compress.Compressor
var closeItem bool = true
defer func() {
if closeItem {
if comp != nil {
comp.Close()
}
if indexIn != nil {
if indexIn.decompressor != nil {
indexIn.decompressor.Close()
}
if indexIn.index != nil {
indexIn.index.Close()
}
}
if historyIn != nil {
if historyIn.decompressor != nil {
historyIn.decompressor.Close()
}
if historyIn.index != nil {
historyIn.index.Close()
}
}
if valuesIn != nil {
if valuesIn.decompressor != nil {
valuesIn.decompressor.Close()
}
if valuesIn.index != nil {
valuesIn.index.Close()
}
}
}
}()
if indexIn, historyIn, err = d.History.mergeFiles(ctx, indexFiles, historyFiles,
HistoryRanges{
historyStartTxNum: r.historyStartTxNum,
historyEndTxNum: r.historyEndTxNum,
history: r.history,
indexStartTxNum: r.indexStartTxNum,
indexEndTxNum: r.indexEndTxNum,
index: r.index}, workers); err != nil {
return nil, nil, nil, err
}
if r.values {
datPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kv", d.filenameBase, r.valuesStartTxNum/d.aggregationStep, r.valuesEndTxNum/d.aggregationStep))
if comp, err = compress.NewCompressor(context.Background(), "merge", datPath, d.dir, compress.MinPatternScore, workers, log.LvlDebug); err != nil {
return nil, nil, nil, fmt.Errorf("merge %s history compressor: %w", d.filenameBase, err)
}
var cp CursorHeap
heap.Init(&cp)
for _, item := range valuesFiles {
g := item.decompressor.MakeGetter()
g.Reset(0)
if g.HasNext() {
key, _ := g.NextUncompressed()
var val []byte
if d.compressVals {
val, _ = g.Next(nil)
} else {
val, _ = g.NextUncompressed()
}
if d.trace {
fmt.Printf("merge: read value '%x'\n", key)
}
heap.Push(&cp, &CursorItem{
t: FILE_CURSOR,
dg: g,
key: key,
val: val,
endTxNum: item.endTxNum,
reverse: true,
})
}
}
keyCount := 0
// In the loop below, the pair `keyBuf=>valBuf` is always 1 item behind `lastKey=>lastVal`.
// `lastKey` and `lastVal` are taken from the top of the multi-way merge (assisted by the CursorHeap cp), but not processed right away
// instead, the pair from the previous iteration is processed first - `keyBuf=>valBuf`. After that, `keyBuf` and `valBuf` are assigned
// to `lastKey` and `lastVal` correspondingly, and the next step of multi-way merge happens. Therefore, after the multi-way merge loop
// (when CursorHeap cp is empty), there is a need to process the last pair `keyBuf=>valBuf`, because it was one step behind
var keyBuf, valBuf []byte
for cp.Len() > 0 {
lastKey := common.Copy(cp[0].key)
lastVal := common.Copy(cp[0].val)
// Advance all the items that have this key (including the top)
for cp.Len() > 0 && bytes.Equal(cp[0].key, lastKey) {
ci1 := cp[0]
if ci1.dg.HasNext() {
ci1.key, _ = ci1.dg.NextUncompressed()
if d.compressVals {
ci1.val, _ = ci1.dg.Next(ci1.val[:0])
} else {
ci1.val, _ = ci1.dg.NextUncompressed()
}
heap.Fix(&cp, 0)
} else {
heap.Pop(&cp)
}
}
var skip bool
if d.prefixLen > 0 {
skip = r.valuesStartTxNum == 0 && len(lastVal) == 0 && len(lastKey) != d.prefixLen
} else {
// For the rest of types, empty value means deletion
skip = r.valuesStartTxNum == 0 && len(lastVal) == 0
}
if !skip {
if keyBuf != nil && (d.prefixLen == 0 || len(keyBuf) != d.prefixLen || bytes.HasPrefix(lastKey, keyBuf)) {
if err = comp.AddUncompressedWord(keyBuf); err != nil {
return nil, nil, nil, err
}
keyCount++ // Only counting keys, not values
if d.trace {
fmt.Printf("merge: multi-way key %x, total keys %d\n", keyBuf, keyCount)
}
valBuf, err = d.commitmentValTransform(&oldFiles, &mergedFiles, valBuf)
if err != nil {
return nil, nil, nil, fmt.Errorf("merge: valTransform [%x] %w", valBuf, err)
}
if d.compressVals {
if err = comp.AddWord(valBuf); err != nil {
return nil, nil, nil, err
}
} else {
if err = comp.AddUncompressedWord(valBuf); err != nil {
return nil, nil, nil, err
}
}
}
keyBuf = append(keyBuf[:0], lastKey...)
valBuf = append(valBuf[:0], lastVal...)
}
}
if keyBuf != nil {
if err = comp.AddUncompressedWord(keyBuf); err != nil {
return nil, nil, nil, err
}
keyCount++ // Only counting keys, not values
//fmt.Printf("last heap key %x\n", keyBuf)
valBuf, err = d.commitmentValTransform(&oldFiles, &mergedFiles, valBuf)
if err != nil {
return nil, nil, nil, fmt.Errorf("merge: 2valTransform [%x] %w", valBuf, err)
}
if d.compressVals {
if err = comp.AddWord(valBuf); err != nil {
return nil, nil, nil, err
}
} else {
if err = comp.AddUncompressedWord(valBuf); err != nil {
return nil, nil, nil, err
}
}
}
if err = comp.Compress(); err != nil {
return nil, nil, nil, err
}
comp.Close()
comp = nil
idxPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kvi", d.filenameBase, r.valuesStartTxNum/d.aggregationStep, r.valuesEndTxNum/d.aggregationStep))
valuesIn = &filesItem{startTxNum: r.valuesStartTxNum, endTxNum: r.valuesEndTxNum}
if valuesIn.decompressor, err = compress.NewDecompressor(datPath); err != nil {
return nil, nil, nil, fmt.Errorf("merge %s decompressor [%d-%d]: %w", d.filenameBase, r.valuesStartTxNum, r.valuesEndTxNum, err)
}
if valuesIn.index, err = buildIndex(ctx, valuesIn.decompressor, idxPath, d.dir, keyCount, false /* values */); err != nil {
return nil, nil, nil, fmt.Errorf("merge %s buildIndex [%d-%d]: %w", d.filenameBase, r.valuesStartTxNum, r.valuesEndTxNum, err)
}
}
closeItem = false
d.stats.MergesCount++
d.mergesCount++
return
}
// Evaluates commitment for processed state. Commit=true - store trie state after evaluation
func (d *DomainCommitted) ComputeCommitment(trace bool) (rootHash []byte, branchNodeUpdates map[string]commitment.BranchData, err error) {
touchedKeys, hashedKeys, updates := d.TouchedKeyList()
if len(touchedKeys) == 0 {
rootHash, err = d.patriciaTrie.RootHash()
return rootHash, nil, err
}
// data accessing functions should be set once before
d.patriciaTrie.Reset()
d.patriciaTrie.SetTrace(trace)
switch d.mode {
case CommitmentModeDirect:
rootHash, branchNodeUpdates, err = d.patriciaTrie.ReviewKeys(touchedKeys, hashedKeys)
if err != nil {
return nil, nil, err
}
case CommitmentModeUpdate:
rootHash, branchNodeUpdates, err = d.patriciaTrie.ProcessUpdates(touchedKeys, hashedKeys, updates)
if err != nil {
return nil, nil, err
}
default:
return nil, nil, fmt.Errorf("invalid commitment mode: %d", d.mode)
}
return rootHash, branchNodeUpdates, err
}
var keyCommitmentState = []byte("state")
// SeekCommitment searches for last encoded state from DomainCommitted
// and if state found, sets it up to current domain
func (d *DomainCommitted) SeekCommitment(aggStep, sinceTx uint64) (uint64, error) {
var (
latestState []byte
stepbuf [2]byte
step uint16 = uint16(sinceTx/aggStep) - 1
latestTxNum uint64 = sinceTx - 1
)
d.SetTxNum(latestTxNum)
ctx := d.MakeContext()
for {
binary.BigEndian.PutUint16(stepbuf[:], step)
s, err := ctx.Get(keyCommitmentState, stepbuf[:], d.tx)
if err != nil {
return 0, err
}
if len(s) < 8 {
break
}
v := binary.BigEndian.Uint64(s)
if v == latestTxNum && len(latestState) != 0 {
break
}
latestTxNum, latestState = v, s
lookupTxN := latestTxNum + aggStep // - 1
step = uint16(latestTxNum/aggStep) + 1
d.SetTxNum(lookupTxN)
}
var latest commitmentState
if err := latest.Decode(latestState); err != nil {
return 0, nil
}
if err := d.patriciaTrie.SetState(latest.trieState); err != nil {
return 0, err
}
return latest.txNum, nil
}
type commitmentState struct {
txNum uint64
blockNum uint64
trieState []byte
}
func (cs *commitmentState) Decode(buf []byte) error {
if len(buf) < 10 {
return fmt.Errorf("ivalid commitment state buffer size")
}
pos := 0
cs.txNum = binary.BigEndian.Uint64(buf[pos : pos+8])
pos += 8
cs.blockNum = binary.BigEndian.Uint64(buf[pos : pos+8])
pos += 8
cs.trieState = make([]byte, binary.BigEndian.Uint16(buf[pos:pos+2]))
pos += 2
if len(cs.trieState) == 0 && len(buf) == 10 {
return nil
}
copy(cs.trieState, buf[pos:pos+len(cs.trieState)])
return nil
}
func (cs *commitmentState) Encode() ([]byte, error) {
buf := bytes.NewBuffer(nil)
var v [18]byte
binary.BigEndian.PutUint64(v[:], cs.txNum)
binary.BigEndian.PutUint64(v[8:16], cs.blockNum)
binary.BigEndian.PutUint16(v[16:18], uint16(len(cs.trieState)))
if _, err := buf.Write(v[:]); err != nil {
return nil, err
}
if _, err := buf.Write(cs.trieState); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func decodeU64(from []byte) uint64 {
var i uint64
for _, b := range from {
i = (i << 8) | uint64(b)
}
return i
}
func encodeU64(i uint64, to []byte) []byte {
// writes i to b in big endian byte order, using the least number of bytes needed to represent i.
switch {
case i < (1 << 8):
return append(to, byte(i))
case i < (1 << 16):
return append(to, byte(i>>8), byte(i))
case i < (1 << 24):
return append(to, byte(i>>16), byte(i>>8), byte(i))
case i < (1 << 32):
return append(to, byte(i>>24), byte(i>>16), byte(i>>8), byte(i))
case i < (1 << 40):
return append(to, byte(i>>32), byte(i>>24), byte(i>>16), byte(i>>8), byte(i))
case i < (1 << 48):
return append(to, byte(i>>40), byte(i>>32), byte(i>>24), byte(i>>16), byte(i>>8), byte(i))
case i < (1 << 56):
return append(to, byte(i>>48), byte(i>>40), byte(i>>32), byte(i>>24), byte(i>>16), byte(i>>8), byte(i))
default:
return append(to, byte(i>>56), byte(i>>48), byte(i>>40), byte(i>>32), byte(i>>24), byte(i>>16), byte(i>>8), byte(i))
}
}
// Optimised key referencing a state file record (file number and offset within the file)
func shortenedKey(apk []byte) (step uint16, offset uint64) {
step = binary.BigEndian.Uint16(apk[:2])
return step, decodeU64(apk[1:])
}