From 746b31def24e660964c2990fdf101881e4c838db Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Wed, 5 Oct 2022 13:17:23 +0700 Subject: [PATCH] agg22 madv helpers (#668) --- commitment/hex_patricia_hashed.go | 2 +- etl/collector.go | 2 +- etl/dataprovider.go | 5 +- state/aggregator22.go | 55 +++++++++++++++---- state/history.go | 87 +++++++++++++++++++++++++------ state/inverted_index.go | 41 +++++++++++++++ 6 files changed, 161 insertions(+), 31 deletions(-) diff --git a/commitment/hex_patricia_hashed.go b/commitment/hex_patricia_hashed.go index 2f5f452eb..918e2c627 100644 --- a/commitment/hex_patricia_hashed.go +++ b/commitment/hex_patricia_hashed.go @@ -1763,7 +1763,7 @@ func (hph *HexPatriciaHashed) ProcessUpdates(plainKeys, hashedKeys [][]byte, upd return rootHash, branchNodeUpdates, nil } -//nolint +// nolint func (hph *HexPatriciaHashed) hashAndNibblizeKey(key []byte) []byte { hashedKey := make([]byte, length.Hash) diff --git a/etl/collector.go b/etl/collector.go index 0a01cbc56..06d2dc935 100644 --- a/etl/collector.go +++ b/etl/collector.go @@ -101,7 +101,7 @@ func NewCollector(logPrefix, tmpdir string, sortableBuffer Buffer) *Collector { c.allFlushed = true } else { doFsync := !c.autoClean /* is critical collector */ - provider, err = FlushToDisk(sortableBuffer, tmpdir, doFsync, c.logLvl) + provider, err = FlushToDisk(logPrefix, sortableBuffer, tmpdir, doFsync, c.logLvl) } if err != nil { return err diff --git a/etl/dataprovider.go b/etl/dataprovider.go index 87c5706f3..11189c4ff 100644 --- a/etl/dataprovider.go +++ b/etl/dataprovider.go @@ -40,7 +40,7 @@ type fileDataProvider struct { } // FlushToDisk - `doFsync` is true only for 'critical' collectors (which should not loose). -func FlushToDisk(b Buffer, tmpdir string, doFsync bool, lvl log.Lvl) (dataProvider, error) { +func FlushToDisk(logPrefix string, b Buffer, tmpdir string, doFsync bool, lvl log.Lvl) (dataProvider, error) { if b.Len() == 0 { return nil, nil } @@ -69,8 +69,7 @@ func FlushToDisk(b Buffer, tmpdir string, doFsync bool, lvl log.Lvl) (dataProvid if lvl >= log.LvlInfo { common.ReadMemStats(&m) } - log.Log(lvl, - "Flushed buffer file", + log.Log(lvl, fmt.Sprintf("[%s] Flushed buffer file", logPrefix), "name", bufferFile.Name(), "alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys)) }() diff --git a/state/aggregator22.go b/state/aggregator22.go index c3b5abe7c..21621e0b0 100644 --- a/state/aggregator22.go +++ b/state/aggregator22.go @@ -354,19 +354,13 @@ func (a *Aggregator22) Unwind(ctx context.Context, txUnwindTo uint64, stateLoad stateChanges := etl.NewCollector(a.logPrefix, "", etl.NewOldestEntryBuffer(etl.BufferOptimalSize)) defer stateChanges.Close() - if err := a.accounts.pruneF(txUnwindTo, math2.MaxUint64, func(txNum uint64, k, v []byte) error { - if err := stateChanges.Collect(k, v); err != nil { - return err - } - return nil + if err := a.accounts.pruneF(txUnwindTo, math2.MaxUint64, func(_ uint64, k, v []byte) error { + return stateChanges.Collect(k, v) }); err != nil { return err } - if err := a.storage.pruneF(txUnwindTo, math2.MaxUint64, func(txNu uint64, k, v []byte) error { - if err := stateChanges.Collect(k, v); err != nil { - return err - } - return nil + if err := a.storage.pruneF(txUnwindTo, math2.MaxUint64, func(_ uint64, k, v []byte) error { + return stateChanges.Collect(k, v) }); err != nil { return err } @@ -800,6 +794,47 @@ func (a *Aggregator22) AddLogTopic(topic []byte) error { return a.logTopics.Add(topic) } +// DisableReadAhead - usage: `defer d.EnableReadAhead().DisableReadAhead()`. Please don't use this funcs without `defer` to avoid leak. +func (a *Aggregator22) DisableReadAhead() { + a.accounts.DisableReadAhead() + a.storage.DisableReadAhead() + a.code.DisableReadAhead() + a.logAddrs.DisableReadAhead() + a.logTopics.DisableReadAhead() + a.tracesFrom.DisableReadAhead() + a.tracesTo.DisableReadAhead() +} +func (a *Aggregator22) EnableReadAhead() *Aggregator22 { + a.accounts.EnableReadAhead() + a.storage.EnableReadAhead() + a.code.EnableReadAhead() + a.logAddrs.EnableReadAhead() + a.logTopics.EnableReadAhead() + a.tracesFrom.EnableReadAhead() + a.tracesTo.EnableReadAhead() + return a +} +func (a *Aggregator22) EnableMadvWillNeed() *Aggregator22 { + a.accounts.EnableMadvWillNeed() + a.storage.EnableMadvWillNeed() + a.code.EnableMadvWillNeed() + a.logAddrs.EnableMadvWillNeed() + a.logTopics.EnableMadvWillNeed() + a.tracesFrom.EnableMadvWillNeed() + a.tracesTo.EnableMadvWillNeed() + return a +} +func (a *Aggregator22) EnableMadvNormal() *Aggregator22 { + a.accounts.EnableMadvNormalReadAhead() + a.storage.EnableMadvNormalReadAhead() + a.code.EnableMadvNormalReadAhead() + a.logAddrs.EnableMadvNormalReadAhead() + a.logTopics.EnableMadvNormalReadAhead() + a.tracesFrom.EnableMadvNormalReadAhead() + a.tracesTo.EnableMadvNormalReadAhead() + return a +} + func (ac *Aggregator22Context) LogAddrIterator(addr []byte, startTxNum, endTxNum uint64, roTx kv.Tx) InvertedIterator { return ac.logAddrs.IterateRange(addr, startTxNum, endTxNum, roTx) } diff --git a/state/history.go b/state/history.go index 6ecbb248d..15dfb221a 100644 --- a/state/history.go +++ b/state/history.go @@ -447,20 +447,21 @@ func (h *History) collate(step, txFrom, txTo uint64, roTx kv.Tx) (HistoryCollati if err != nil { return HistoryCollation{}, err } - if bytes.HasPrefix(v, []byte(key)) { - valNum := binary.BigEndian.Uint64(v[len(v)-8:]) - if valNum == 0 { - val = nil - } else { - if val, err = roTx.GetOne(h.historyValsTable, v[len(v)-8:]); err != nil { - return HistoryCollation{}, fmt.Errorf("get %s history val [%x]=>%d: %w", h.filenameBase, k, valNum, err) - } - } - if err = historyComp.AddUncompressedWord(val); err != nil { - return HistoryCollation{}, fmt.Errorf("add %s history val [%x]=>[%x]: %w", h.filenameBase, k, val, err) - } - historyCount++ + if !bytes.HasPrefix(v, []byte(key)) { + continue } + valNum := binary.BigEndian.Uint64(v[len(v)-8:]) + if valNum == 0 { + val = nil + } else { + if val, err = roTx.GetOne(h.historyValsTable, v[len(v)-8:]); err != nil { + return HistoryCollation{}, fmt.Errorf("get %s history val [%x]=>%d: %w", h.filenameBase, k, valNum, err) + } + } + if err = historyComp.AddUncompressedWord(val); err != nil { + return HistoryCollation{}, fmt.Errorf("add %s history val [%x]=>[%x]: %w", h.filenameBase, k, val, err) + } + historyCount++ } } closeComp = false @@ -796,11 +797,13 @@ func (hc *HistoryContext) GetNoState(key []byte, txNum uint64) ([]byte, bool, er offset := item.reader.Lookup(key) g := item.getter g.Reset(offset) - if k, _ := g.NextUncompressed(); bytes.Equal(k, key) { + k, _ := g.NextUncompressed() + if bytes.Equal(k, key) { //fmt.Printf("Found key=%x\n", k) eliasVal, _ := g.NextUncompressed() ef, _ := eliasfano32.ReadEliasFano(eliasVal) - if n, ok := ef.Search(txNum); ok { + n, ok := ef.Search(txNum) + if ok { foundTxNum = n foundEndTxNum = item.endTxNum foundStartTxNum = item.startTxNum @@ -851,7 +854,14 @@ func (hc *HistoryContext) GetNoStateWithRecent(key []byte, txNum uint64, roTx kv if roTx == nil { return nil, false, fmt.Errorf("roTx is nil") } - return hc.getNoStateFromDB(key, txNum, roTx) + v, ok, err = hc.getNoStateFromDB(key, txNum, roTx) + if err != nil { + return nil, ok, err + } + if ok { + return v, true, nil + } + return nil, false, err } func (hc *HistoryContext) getNoStateFromDB(key []byte, txNum uint64, tx kv.Tx) ([]byte, bool, error) { @@ -1115,3 +1125,48 @@ func (hi *HistoryIterator1) Next(keyBuf, valBuf []byte) ([]byte, []byte) { hi.advance() return k, v } + +func (h *History) DisableReadAhead() { + h.InvertedIndex.DisableReadAhead() + h.files.Ascend(func(item *filesItem) bool { + item.decompressor.DisableReadAhead() + if item.index != nil { + item.index.DisableReadAhead() + } + return true + }) +} + +func (h *History) EnableReadAhead() *History { + h.InvertedIndex.EnableReadAhead() + h.files.Ascend(func(item *filesItem) bool { + item.decompressor.EnableReadAhead() + if item.index != nil { + item.index.EnableReadAhead() + } + return true + }) + return h +} +func (h *History) EnableMadvWillNeed() *History { + h.InvertedIndex.EnableMadvWillNeed() + h.files.Ascend(func(item *filesItem) bool { + item.decompressor.EnableWillNeed() + if item.index != nil { + item.index.EnableWillNeed() + } + return true + }) + return h +} +func (h *History) EnableMadvNormalReadAhead() *History { + h.InvertedIndex.EnableMadvNormalReadAhead() + h.files.Ascend(func(item *filesItem) bool { + item.decompressor.EnableMadvNormal() + if item.index != nil { + item.index.EnableMadvNormal() + } + return true + }) + return h +} diff --git a/state/inverted_index.go b/state/inverted_index.go index 5fc4ef3b4..505f8d3c2 100644 --- a/state/inverted_index.go +++ b/state/inverted_index.go @@ -710,3 +710,44 @@ func (ii *InvertedIndex) prune(txFrom, txTo uint64) error { } return nil } + +func (ii *InvertedIndex) DisableReadAhead() { + ii.files.Ascend(func(item *filesItem) bool { + item.decompressor.DisableReadAhead() + if item.index != nil { + item.index.DisableReadAhead() + } + return true + }) +} + +func (ii *InvertedIndex) EnableReadAhead() *InvertedIndex { + ii.files.Ascend(func(item *filesItem) bool { + item.decompressor.EnableReadAhead() + if item.index != nil { + item.index.EnableReadAhead() + } + return true + }) + return ii +} +func (ii *InvertedIndex) EnableMadvWillNeed() *InvertedIndex { + ii.files.Ascend(func(item *filesItem) bool { + item.decompressor.EnableWillNeed() + if item.index != nil { + item.index.EnableWillNeed() + } + return true + }) + return ii +} +func (ii *InvertedIndex) EnableMadvNormalReadAhead() *InvertedIndex { + ii.files.Ascend(func(item *filesItem) bool { + item.decompressor.EnableMadvNormal() + if item.index != nil { + item.index.EnableMadvNormal() + } + return true + }) + return ii +}