mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-20 17:31:11 +00:00
1479 lines
40 KiB
Go
1479 lines
40 KiB
Go
/*
|
|
Copyright 2022 Erigon contributors
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package state
|
|
|
|
import (
|
|
"bytes"
|
|
"container/heap"
|
|
"context"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io/fs"
|
|
"math"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/RoaringBitmap/roaring/roaring64"
|
|
"github.com/google/btree"
|
|
"github.com/ledgerwatch/erigon-lib/etl"
|
|
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
|
|
"github.com/ledgerwatch/log/v3"
|
|
"golang.org/x/exp/slices"
|
|
"golang.org/x/sync/semaphore"
|
|
|
|
"github.com/ledgerwatch/erigon-lib/common"
|
|
"github.com/ledgerwatch/erigon-lib/common/cmp"
|
|
"github.com/ledgerwatch/erigon-lib/common/dir"
|
|
"github.com/ledgerwatch/erigon-lib/compress"
|
|
"github.com/ledgerwatch/erigon-lib/kv"
|
|
"github.com/ledgerwatch/erigon-lib/recsplit"
|
|
"github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32"
|
|
)
|
|
|
|
type History struct {
|
|
*InvertedIndex
|
|
files *btree.BTreeG[*filesItem]
|
|
historyValsTable string // key1+key2+txnNum -> oldValue , stores values BEFORE change
|
|
settingsTable string
|
|
workers int
|
|
compressVals bool
|
|
|
|
wal *historyWAL
|
|
walLock sync.RWMutex
|
|
}
|
|
|
|
func NewHistory(
|
|
dir, tmpdir string,
|
|
aggregationStep uint64,
|
|
filenameBase string,
|
|
indexKeysTable string,
|
|
indexTable string,
|
|
historyValsTable string,
|
|
settingsTable string,
|
|
compressVals bool,
|
|
) (*History, error) {
|
|
h := History{
|
|
files: btree.NewG[*filesItem](32, filesItemLess),
|
|
historyValsTable: historyValsTable,
|
|
settingsTable: settingsTable,
|
|
compressVals: compressVals,
|
|
workers: 1,
|
|
}
|
|
var err error
|
|
h.InvertedIndex, err = NewInvertedIndex(dir, tmpdir, aggregationStep, filenameBase, indexKeysTable, indexTable)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("NewHistory: %s, %w", filenameBase, err)
|
|
}
|
|
files, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
h.scanStateFiles(files)
|
|
if err = h.openFiles(); err != nil {
|
|
return nil, fmt.Errorf("NewHistory.openFiles: %s, %w", filenameBase, err)
|
|
}
|
|
return &h, nil
|
|
}
|
|
|
|
func (h *History) scanStateFiles(files []fs.DirEntry) {
|
|
re := regexp.MustCompile("^" + h.filenameBase + ".([0-9]+)-([0-9]+).v$")
|
|
var err error
|
|
var uselessFiles []string
|
|
for _, f := range files {
|
|
if !f.Type().IsRegular() {
|
|
continue
|
|
}
|
|
|
|
name := f.Name()
|
|
subs := re.FindStringSubmatch(name)
|
|
if len(subs) != 3 {
|
|
if len(subs) != 0 {
|
|
log.Warn("File ignored by inverted index scan, more than 3 submatches", "name", name, "submatches", len(subs))
|
|
}
|
|
continue
|
|
}
|
|
var startStep, endStep uint64
|
|
if startStep, err = strconv.ParseUint(subs[1], 10, 64); err != nil {
|
|
log.Warn("File ignored by inverted index scan, parsing startTxNum", "error", err, "name", name)
|
|
continue
|
|
}
|
|
if endStep, err = strconv.ParseUint(subs[2], 10, 64); err != nil {
|
|
log.Warn("File ignored by inverted index scan, parsing endTxNum", "error", err, "name", name)
|
|
continue
|
|
}
|
|
if startStep > endStep {
|
|
log.Warn("File ignored by inverted index scan, startTxNum > endTxNum", "name", name)
|
|
continue
|
|
}
|
|
|
|
startTxNum, endTxNum := startStep*h.aggregationStep, endStep*h.aggregationStep
|
|
var item = &filesItem{startTxNum: startTxNum, endTxNum: endTxNum}
|
|
{
|
|
var subSet, superSet *filesItem
|
|
h.files.DescendLessOrEqual(item, func(it *filesItem) bool {
|
|
if it.isSubsetOf(item) {
|
|
subSet = it
|
|
} else if item.isSubsetOf(it) {
|
|
superSet = it
|
|
}
|
|
return true
|
|
})
|
|
if subSet != nil {
|
|
h.files.Delete(subSet)
|
|
uselessFiles = append(uselessFiles,
|
|
fmt.Sprintf("%s.%d-%d.v", h.filenameBase, subSet.startTxNum/h.aggregationStep, subSet.endTxNum/h.aggregationStep),
|
|
fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, subSet.startTxNum/h.aggregationStep, subSet.endTxNum/h.aggregationStep),
|
|
)
|
|
}
|
|
if superSet != nil {
|
|
uselessFiles = append(uselessFiles,
|
|
fmt.Sprintf("%s.%d-%d.v", h.filenameBase, startStep, endStep),
|
|
fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, startStep, endStep),
|
|
)
|
|
continue
|
|
}
|
|
}
|
|
{
|
|
var subSet, superSet *filesItem
|
|
h.files.AscendGreaterOrEqual(item, func(it *filesItem) bool {
|
|
if it.isSubsetOf(item) {
|
|
subSet = it
|
|
} else if item.isSubsetOf(it) {
|
|
superSet = it
|
|
}
|
|
return false
|
|
})
|
|
if subSet != nil {
|
|
h.files.Delete(subSet)
|
|
uselessFiles = append(uselessFiles,
|
|
fmt.Sprintf("%s.%d-%d.v", h.filenameBase, subSet.startTxNum/h.aggregationStep, subSet.endTxNum/h.aggregationStep),
|
|
fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, subSet.startTxNum/h.aggregationStep, subSet.endTxNum/h.aggregationStep),
|
|
)
|
|
}
|
|
if superSet != nil {
|
|
uselessFiles = append(uselessFiles,
|
|
fmt.Sprintf("%s.%d-%d.v", h.filenameBase, startStep, endStep),
|
|
fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, startStep, endStep),
|
|
)
|
|
continue
|
|
}
|
|
}
|
|
h.files.ReplaceOrInsert(item)
|
|
}
|
|
if len(uselessFiles) > 0 {
|
|
log.Info("[snapshots] history can delete", "files", strings.Join(uselessFiles, ","))
|
|
}
|
|
}
|
|
|
|
func (h *History) openFiles() error {
|
|
var totalKeys uint64
|
|
var err error
|
|
|
|
invalidFileItems := make([]*filesItem, 0)
|
|
h.files.Ascend(func(item *filesItem) bool {
|
|
if item.decompressor != nil {
|
|
item.decompressor.Close()
|
|
}
|
|
fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep
|
|
datPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.v", h.filenameBase, fromStep, toStep))
|
|
if !dir.FileExist(datPath) {
|
|
invalidFileItems = append(invalidFileItems, item)
|
|
return true
|
|
}
|
|
if item.decompressor, err = compress.NewDecompressor(datPath); err != nil {
|
|
log.Debug("Hisrory.openFiles: %w, %s", err, datPath)
|
|
return false
|
|
}
|
|
if item.index == nil {
|
|
idxPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, fromStep, toStep))
|
|
if dir.FileExist(idxPath) {
|
|
if item.index, err = recsplit.OpenIndex(idxPath); err != nil {
|
|
log.Debug("Hisrory.openFiles: %w, %s", err, idxPath)
|
|
return false
|
|
}
|
|
totalKeys += item.index.KeyCount()
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, item := range invalidFileItems {
|
|
h.files.Delete(item)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *History) closeFiles() {
|
|
h.files.Ascend(func(item *filesItem) bool {
|
|
if item.decompressor != nil {
|
|
item.decompressor.Close()
|
|
}
|
|
if item.index != nil {
|
|
item.index.Close()
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (h *History) Close() {
|
|
h.InvertedIndex.Close()
|
|
h.closeFiles()
|
|
}
|
|
|
|
func (h *History) Files() (res []string) {
|
|
h.files.Ascend(func(item *filesItem) bool {
|
|
if item.decompressor != nil {
|
|
_, fName := filepath.Split(item.decompressor.FilePath())
|
|
res = append(res, filepath.Join("history", fName))
|
|
}
|
|
return true
|
|
})
|
|
res = append(res, h.InvertedIndex.Files()...)
|
|
return res
|
|
}
|
|
|
|
func (h *History) missedIdxFiles() (l []*filesItem) {
|
|
h.files.Ascend(func(item *filesItem) bool { // don't run slow logic while iterating on btree
|
|
fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep
|
|
if !dir.FileExist(filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, fromStep, toStep))) {
|
|
l = append(l, item)
|
|
}
|
|
return true
|
|
})
|
|
return l
|
|
}
|
|
|
|
// BuildMissedIndices - produce .efi/.vi/.kvi from .ef/.v/.kv
|
|
func (h *History) BuildMissedIndices(ctx context.Context, sem *semaphore.Weighted) (err error) {
|
|
if err := h.InvertedIndex.BuildMissedIndices(ctx, sem); err != nil {
|
|
return err
|
|
}
|
|
|
|
missedFiles := h.missedIdxFiles()
|
|
errs := make(chan error, len(missedFiles))
|
|
wg := sync.WaitGroup{}
|
|
|
|
for _, item := range missedFiles {
|
|
if err := sem.Acquire(ctx, 1); err != nil {
|
|
errs <- err
|
|
break
|
|
}
|
|
wg.Add(1)
|
|
go func(item *filesItem) {
|
|
defer sem.Release(1)
|
|
defer wg.Done()
|
|
|
|
search := &filesItem{startTxNum: item.startTxNum, endTxNum: item.endTxNum}
|
|
iiItem, ok := h.InvertedIndex.files.Get(search)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep
|
|
fName := fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, fromStep, toStep)
|
|
idxPath := filepath.Join(h.dir, fName)
|
|
log.Info("[snapshots] build idx", "file", fName)
|
|
count, err := iterateForVi(item, iiItem, h.compressVals, func(v []byte) error { return nil })
|
|
if err != nil {
|
|
errs <- err
|
|
}
|
|
errs <- buildVi(item, iiItem, idxPath, h.tmpdir, count, false /* values */, h.compressVals)
|
|
}(item)
|
|
}
|
|
go func() {
|
|
wg.Wait()
|
|
close(errs)
|
|
}()
|
|
var lastError error
|
|
for err := range errs {
|
|
if err != nil {
|
|
lastError = err
|
|
}
|
|
}
|
|
if lastError != nil {
|
|
return lastError
|
|
}
|
|
|
|
return h.openFiles()
|
|
}
|
|
|
|
func iterateForVi(historyItem, iiItem *filesItem, compressVals bool, f func(v []byte) error) (count int, err error) {
|
|
var cp CursorHeap
|
|
heap.Init(&cp)
|
|
g := iiItem.decompressor.MakeGetter()
|
|
g.Reset(0)
|
|
if g.HasNext() {
|
|
g2 := historyItem.decompressor.MakeGetter()
|
|
key, _ := g.NextUncompressed()
|
|
val, _ := g.NextUncompressed()
|
|
heap.Push(&cp, &CursorItem{
|
|
t: FILE_CURSOR,
|
|
dg: g,
|
|
dg2: g2,
|
|
key: key,
|
|
val: val,
|
|
endTxNum: iiItem.endTxNum,
|
|
reverse: false,
|
|
})
|
|
}
|
|
|
|
// In the loop below, the pair `keyBuf=>valBuf` is always 1 item behind `lastKey=>lastVal`.
|
|
// `lastKey` and `lastVal` are taken from the top of the multi-way merge (assisted by the CursorHeap cp), but not processed right away
|
|
// instead, the pair from the previous iteration is processed first - `keyBuf=>valBuf`. After that, `keyBuf` and `valBuf` are assigned
|
|
// to `lastKey` and `lastVal` correspondingly, and the next step of multi-way merge happens. Therefore, after the multi-way merge loop
|
|
// (when CursorHeap cp is empty), there is a need to process the last pair `keyBuf=>valBuf`, because it was one step behind
|
|
var valBuf []byte
|
|
for cp.Len() > 0 {
|
|
lastKey := common.Copy(cp[0].key)
|
|
// Advance all the items that have this key (including the top)
|
|
//var mergeOnce bool
|
|
for cp.Len() > 0 && bytes.Equal(cp[0].key, lastKey) {
|
|
ci1 := cp[0]
|
|
ef, _ := eliasfano32.ReadEliasFano(ci1.val)
|
|
for i := uint64(0); i < ef.Count(); i++ {
|
|
if compressVals {
|
|
valBuf, _ = ci1.dg2.Next(valBuf[:0])
|
|
} else {
|
|
valBuf, _ = ci1.dg2.NextUncompressed()
|
|
}
|
|
if err = f(valBuf); err != nil {
|
|
return count, err
|
|
}
|
|
}
|
|
count += int(ef.Count())
|
|
if ci1.dg.HasNext() {
|
|
ci1.key, _ = ci1.dg.NextUncompressed()
|
|
ci1.val, _ = ci1.dg.NextUncompressed()
|
|
heap.Fix(&cp, 0)
|
|
} else {
|
|
heap.Remove(&cp, 0)
|
|
}
|
|
}
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
func buildVi(historyItem, iiItem *filesItem, historyIdxPath, tmpdir string, count int, values, compressVals bool) error {
|
|
_, fName := filepath.Split(historyIdxPath)
|
|
log.Debug("[snapshots] build idx", "file", fName)
|
|
rs, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{
|
|
KeyCount: count,
|
|
Enums: false,
|
|
BucketSize: 2000,
|
|
LeafSize: 8,
|
|
TmpDir: tmpdir,
|
|
IndexFile: historyIdxPath,
|
|
EtlBufLimit: etl.BufferOptimalSize / 2,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("create recsplit: %w", err)
|
|
}
|
|
defer rs.Close()
|
|
var historyKey []byte
|
|
var txKey [8]byte
|
|
var valOffset uint64
|
|
|
|
defer iiItem.decompressor.EnableMadvNormal().DisableReadAhead()
|
|
defer historyItem.decompressor.EnableMadvNormal().DisableReadAhead()
|
|
|
|
g := iiItem.decompressor.MakeGetter()
|
|
g2 := historyItem.decompressor.MakeGetter()
|
|
var keyBuf, valBuf []byte
|
|
for {
|
|
g.Reset(0)
|
|
g2.Reset(0)
|
|
valOffset = 0
|
|
for g.HasNext() {
|
|
keyBuf, _ = g.NextUncompressed()
|
|
valBuf, _ = g.NextUncompressed()
|
|
ef, _ := eliasfano32.ReadEliasFano(valBuf)
|
|
efIt := ef.Iterator()
|
|
for efIt.HasNext() {
|
|
txNum := efIt.Next()
|
|
binary.BigEndian.PutUint64(txKey[:], txNum)
|
|
historyKey = append(append(historyKey[:0], txKey[:]...), keyBuf...)
|
|
if err = rs.AddKey(historyKey, valOffset); err != nil {
|
|
return err
|
|
}
|
|
if compressVals {
|
|
valOffset = g2.Skip()
|
|
} else {
|
|
valOffset = g2.SkipUncompressed()
|
|
}
|
|
}
|
|
}
|
|
if err = rs.Build(); err != nil {
|
|
if rs.Collision() {
|
|
log.Info("Building recsplit. Collision happened. It's ok. Restarting...")
|
|
rs.ResetNextSalt()
|
|
} else {
|
|
return fmt.Errorf("build %s idx: %w", historyIdxPath, err)
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *History) AddPrevValue(key1, key2, original []byte) (err error) {
|
|
h.walLock.RLock() // read-lock for reading fielw `w` and writing into it, write-lock for setting new `w`
|
|
err = h.wal.addPrevValue(key1, key2, original)
|
|
h.walLock.RUnlock()
|
|
return err
|
|
}
|
|
func (h *History) StartWrites(tmpdir string) {
|
|
h.InvertedIndex.StartWrites(tmpdir)
|
|
h.walLock.Lock()
|
|
defer h.walLock.Unlock()
|
|
h.wal = h.newWriter(tmpdir)
|
|
}
|
|
func (h *History) FinishWrites() {
|
|
h.InvertedIndex.FinishWrites()
|
|
h.walLock.Lock()
|
|
defer h.walLock.Unlock()
|
|
h.wal.close()
|
|
h.wal = nil
|
|
}
|
|
|
|
func (h *History) Rotate() historyFlusher {
|
|
h.walLock.Lock()
|
|
defer h.walLock.Unlock()
|
|
w := h.wal
|
|
h.wal = h.newWriter(h.wal.tmpdir)
|
|
h.wal.autoIncrement = w.autoIncrement
|
|
return historyFlusher{w, h.InvertedIndex.Rotate()}
|
|
}
|
|
|
|
type historyFlusher struct {
|
|
h *historyWAL
|
|
i *invertedIndexWAL
|
|
}
|
|
|
|
func (f historyFlusher) Flush(tx kv.RwTx) error {
|
|
if err := f.i.Flush(tx); err != nil {
|
|
return err
|
|
}
|
|
if err := f.h.flush(tx); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type historyWAL struct {
|
|
h *History
|
|
historyVals *etl.Collector
|
|
tmpdir string
|
|
autoIncrementBuf []byte
|
|
historyKey []byte
|
|
autoIncrement uint64
|
|
buffered bool
|
|
}
|
|
|
|
func (h *historyWAL) close() {
|
|
if h == nil { // allow dobule-close
|
|
return
|
|
}
|
|
h.historyVals.Close()
|
|
}
|
|
|
|
func (h *History) newWriter(tmpdir string) *historyWAL {
|
|
w := &historyWAL{h: h,
|
|
tmpdir: tmpdir,
|
|
autoIncrementBuf: make([]byte, 8),
|
|
historyKey: make([]byte, 0, 128),
|
|
|
|
buffered: true,
|
|
historyVals: etl.NewCollector(h.historyValsTable, tmpdir, etl.NewSortableBuffer(WALCollectorRam)),
|
|
}
|
|
w.historyVals.LogLvl(log.LvlTrace)
|
|
|
|
val, err := h.tx.GetOne(h.settingsTable, historyValCountKey)
|
|
if err != nil {
|
|
panic(err)
|
|
//return err
|
|
}
|
|
var valNum uint64
|
|
if len(val) > 0 {
|
|
valNum = binary.BigEndian.Uint64(val)
|
|
}
|
|
w.autoIncrement = valNum
|
|
return w
|
|
}
|
|
|
|
func (h *historyWAL) flush(tx kv.RwTx) error {
|
|
binary.BigEndian.PutUint64(h.autoIncrementBuf, h.autoIncrement)
|
|
if err := tx.Put(h.h.settingsTable, historyValCountKey, h.autoIncrementBuf); err != nil {
|
|
return err
|
|
}
|
|
if err := h.historyVals.Load(tx, h.h.historyValsTable, loadFunc, etl.TransformArgs{}); err != nil {
|
|
return err
|
|
}
|
|
h.close()
|
|
return nil
|
|
}
|
|
|
|
func (h *historyWAL) addPrevValue(key1, key2, original []byte) error {
|
|
/*
|
|
lk := len(key1) + len(key2)
|
|
historyKey := make([]byte, lk+8)
|
|
copy(historyKey, key1)
|
|
if len(key2) > 0 {
|
|
copy(historyKey[len(key1):], key2)
|
|
}
|
|
if len(original) > 0 {
|
|
val, err := h.h.tx.GetOne(h.h.settingsTable, historyValCountKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var valNum uint64
|
|
if len(val) > 0 {
|
|
valNum = binary.BigEndian.Uint64(val)
|
|
}
|
|
valNum++
|
|
binary.BigEndian.PutUint64(historyKey[lk:], valNum)
|
|
if err = h.h.tx.Put(h.h.settingsTable, historyValCountKey, historyKey[lk:]); err != nil {
|
|
return err
|
|
}
|
|
if err = h.h.tx.Put(h.h.historyValsTable, historyKey[lk:], original); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
*/
|
|
|
|
lk := len(key1) + len(key2)
|
|
historyKey := h.historyKey[:lk+8]
|
|
copy(historyKey, key1)
|
|
if len(key2) > 0 {
|
|
copy(historyKey[len(key1):], key2)
|
|
}
|
|
if len(original) > 0 {
|
|
h.autoIncrement++
|
|
binary.BigEndian.PutUint64(historyKey[lk:], h.autoIncrement)
|
|
//if err := h.h.tx.Put(h.h.settingsTable, historyValCountKey, historyKey[lk:]); err != nil {
|
|
// return err
|
|
//}
|
|
|
|
if h.buffered {
|
|
if err := h.historyVals.Collect(historyKey[lk:], original); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if err := h.h.tx.Put(h.h.historyValsTable, historyKey[lk:], original); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
} else {
|
|
binary.BigEndian.PutUint64(historyKey[lk:], 0)
|
|
}
|
|
|
|
if err := h.h.InvertedIndex.add(historyKey, historyKey[:lk]); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type HistoryCollation struct {
|
|
historyComp *compress.Compressor
|
|
indexBitmaps map[string]*roaring64.Bitmap
|
|
historyPath string
|
|
historyCount int
|
|
}
|
|
|
|
func (c HistoryCollation) Close() {
|
|
if c.historyComp != nil {
|
|
c.historyComp.Close()
|
|
}
|
|
for _, b := range c.indexBitmaps {
|
|
bitmapdb.ReturnToPool64(b)
|
|
}
|
|
}
|
|
|
|
func (h *History) collate(step, txFrom, txTo uint64, roTx kv.Tx, logEvery *time.Ticker) (HistoryCollation, error) {
|
|
var historyComp *compress.Compressor
|
|
var err error
|
|
closeComp := true
|
|
defer func() {
|
|
if closeComp {
|
|
if historyComp != nil {
|
|
historyComp.Close()
|
|
}
|
|
}
|
|
}()
|
|
historyPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.v", h.filenameBase, step, step+1))
|
|
if historyComp, err = compress.NewCompressor(context.Background(), "collate history", historyPath, h.tmpdir, compress.MinPatternScore, h.workers, log.LvlTrace); err != nil {
|
|
return HistoryCollation{}, fmt.Errorf("create %s history compressor: %w", h.filenameBase, err)
|
|
}
|
|
keysCursor, err := roTx.CursorDupSort(h.indexKeysTable)
|
|
if err != nil {
|
|
return HistoryCollation{}, fmt.Errorf("create %s history cursor: %w", h.filenameBase, err)
|
|
}
|
|
defer keysCursor.Close()
|
|
indexBitmaps := map[string]*roaring64.Bitmap{}
|
|
var txKey [8]byte
|
|
binary.BigEndian.PutUint64(txKey[:], txFrom)
|
|
var val []byte
|
|
var k, v []byte
|
|
for k, v, err = keysCursor.Seek(txKey[:]); err == nil && k != nil; k, v, err = keysCursor.Next() {
|
|
txNum := binary.BigEndian.Uint64(k)
|
|
if txNum >= txTo {
|
|
break
|
|
}
|
|
var bitmap *roaring64.Bitmap
|
|
var ok bool
|
|
if bitmap, ok = indexBitmaps[string(v[:len(v)-8])]; !ok {
|
|
bitmap = bitmapdb.NewBitmap64()
|
|
indexBitmaps[string(v[:len(v)-8])] = bitmap
|
|
}
|
|
bitmap.Add(txNum)
|
|
select {
|
|
case <-logEvery.C:
|
|
log.Info("[snapshots] collate history", "name", h.filenameBase, "range", fmt.Sprintf("%.2f-%.2f", float64(txNum)/float64(h.aggregationStep), float64(txTo)/float64(h.aggregationStep)))
|
|
bitmap.RunOptimize()
|
|
default:
|
|
}
|
|
}
|
|
if err != nil {
|
|
return HistoryCollation{}, fmt.Errorf("iterate over %s history cursor: %w", h.filenameBase, err)
|
|
}
|
|
keys := make([]string, 0, len(indexBitmaps))
|
|
for key := range indexBitmaps {
|
|
keys = append(keys, key)
|
|
}
|
|
slices.Sort(keys)
|
|
historyCount := 0
|
|
for _, key := range keys {
|
|
bitmap := indexBitmaps[key]
|
|
it := bitmap.Iterator()
|
|
for it.HasNext() {
|
|
txNum := it.Next()
|
|
binary.BigEndian.PutUint64(txKey[:], txNum)
|
|
v, err := keysCursor.SeekBothRange(txKey[:], []byte(key))
|
|
if err != nil {
|
|
return HistoryCollation{}, err
|
|
}
|
|
if !bytes.HasPrefix(v, []byte(key)) {
|
|
continue
|
|
}
|
|
valNum := binary.BigEndian.Uint64(v[len(v)-8:])
|
|
if valNum == 0 {
|
|
val = nil
|
|
} else {
|
|
if val, err = roTx.GetOne(h.historyValsTable, v[len(v)-8:]); err != nil {
|
|
return HistoryCollation{}, fmt.Errorf("get %s history val [%x]=>%d: %w", h.filenameBase, k, valNum, err)
|
|
}
|
|
}
|
|
if err = historyComp.AddUncompressedWord(val); err != nil {
|
|
return HistoryCollation{}, fmt.Errorf("add %s history val [%x]=>[%x]: %w", h.filenameBase, k, val, err)
|
|
}
|
|
historyCount++
|
|
}
|
|
}
|
|
closeComp = false
|
|
return HistoryCollation{
|
|
historyPath: historyPath,
|
|
historyComp: historyComp,
|
|
historyCount: historyCount,
|
|
indexBitmaps: indexBitmaps,
|
|
}, nil
|
|
}
|
|
|
|
type HistoryFiles struct {
|
|
historyDecomp *compress.Decompressor
|
|
historyIdx *recsplit.Index
|
|
efHistoryDecomp *compress.Decompressor
|
|
efHistoryIdx *recsplit.Index
|
|
}
|
|
|
|
func (sf HistoryFiles) Close() {
|
|
if sf.historyDecomp != nil {
|
|
sf.historyDecomp.Close()
|
|
}
|
|
if sf.historyIdx != nil {
|
|
sf.historyIdx.Close()
|
|
}
|
|
if sf.efHistoryDecomp != nil {
|
|
sf.efHistoryDecomp.Close()
|
|
}
|
|
if sf.efHistoryIdx != nil {
|
|
sf.efHistoryIdx.Close()
|
|
}
|
|
}
|
|
|
|
// buildFiles performs potentially resource intensive operations of creating
|
|
// static files and their indices
|
|
func (h *History) buildFiles(ctx context.Context, step uint64, collation HistoryCollation) (HistoryFiles, error) {
|
|
historyComp := collation.historyComp
|
|
var historyDecomp, efHistoryDecomp *compress.Decompressor
|
|
var historyIdx, efHistoryIdx *recsplit.Index
|
|
var efHistoryComp *compress.Compressor
|
|
var rs *recsplit.RecSplit
|
|
closeComp := true
|
|
defer func() {
|
|
if closeComp {
|
|
if historyComp != nil {
|
|
historyComp.Close()
|
|
}
|
|
if historyDecomp != nil {
|
|
historyDecomp.Close()
|
|
}
|
|
if historyIdx != nil {
|
|
historyIdx.Close()
|
|
}
|
|
if efHistoryComp != nil {
|
|
efHistoryComp.Close()
|
|
}
|
|
if efHistoryDecomp != nil {
|
|
efHistoryDecomp.Close()
|
|
}
|
|
if efHistoryIdx != nil {
|
|
efHistoryIdx.Close()
|
|
}
|
|
if rs != nil {
|
|
rs.Close()
|
|
}
|
|
}
|
|
}()
|
|
historyIdxPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, step, step+1))
|
|
if err := historyComp.Compress(); err != nil {
|
|
return HistoryFiles{}, fmt.Errorf("compress %s history: %w", h.filenameBase, err)
|
|
}
|
|
historyComp.Close()
|
|
historyComp = nil
|
|
var err error
|
|
if historyDecomp, err = compress.NewDecompressor(collation.historyPath); err != nil {
|
|
return HistoryFiles{}, fmt.Errorf("open %s history decompressor: %w", h.filenameBase, err)
|
|
}
|
|
// Build history ef
|
|
efHistoryPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.ef", h.filenameBase, step, step+1))
|
|
efHistoryComp, err = compress.NewCompressor(ctx, "ef history", efHistoryPath, h.tmpdir, compress.MinPatternScore, h.workers, log.LvlTrace)
|
|
if err != nil {
|
|
return HistoryFiles{}, fmt.Errorf("create %s ef history compressor: %w", h.filenameBase, err)
|
|
}
|
|
var buf []byte
|
|
keys := make([]string, 0, len(collation.indexBitmaps))
|
|
for key := range collation.indexBitmaps {
|
|
keys = append(keys, key)
|
|
}
|
|
slices.Sort(keys)
|
|
for _, key := range keys {
|
|
if err = efHistoryComp.AddUncompressedWord([]byte(key)); err != nil {
|
|
return HistoryFiles{}, fmt.Errorf("add %s ef history key [%x]: %w", h.InvertedIndex.filenameBase, key, err)
|
|
}
|
|
bitmap := collation.indexBitmaps[key]
|
|
ef := eliasfano32.NewEliasFano(bitmap.GetCardinality(), bitmap.Maximum())
|
|
it := bitmap.Iterator()
|
|
for it.HasNext() {
|
|
txNum := it.Next()
|
|
ef.AddOffset(txNum)
|
|
}
|
|
ef.Build()
|
|
buf = ef.AppendBytes(buf[:0])
|
|
if err = efHistoryComp.AddUncompressedWord(buf); err != nil {
|
|
return HistoryFiles{}, fmt.Errorf("add %s ef history val: %w", h.filenameBase, err)
|
|
}
|
|
}
|
|
if err = efHistoryComp.Compress(); err != nil {
|
|
return HistoryFiles{}, fmt.Errorf("compress %s ef history: %w", h.filenameBase, err)
|
|
}
|
|
efHistoryComp.Close()
|
|
efHistoryComp = nil
|
|
if efHistoryDecomp, err = compress.NewDecompressor(efHistoryPath); err != nil {
|
|
return HistoryFiles{}, fmt.Errorf("open %s ef history decompressor: %w", h.filenameBase, err)
|
|
}
|
|
efHistoryIdxPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.efi", h.filenameBase, step, step+1))
|
|
if efHistoryIdx, err = buildIndex(ctx, efHistoryDecomp, efHistoryIdxPath, h.tmpdir, len(keys), false /* values */); err != nil {
|
|
return HistoryFiles{}, fmt.Errorf("build %s ef history idx: %w", h.filenameBase, err)
|
|
}
|
|
if rs, err = recsplit.NewRecSplit(recsplit.RecSplitArgs{
|
|
KeyCount: collation.historyCount,
|
|
Enums: false,
|
|
BucketSize: 2000,
|
|
LeafSize: 8,
|
|
TmpDir: h.tmpdir,
|
|
IndexFile: historyIdxPath,
|
|
}); err != nil {
|
|
return HistoryFiles{}, fmt.Errorf("create recsplit: %w", err)
|
|
}
|
|
rs.LogLvl(log.LvlTrace)
|
|
var historyKey []byte
|
|
var txKey [8]byte
|
|
var valOffset uint64
|
|
g := historyDecomp.MakeGetter()
|
|
for {
|
|
g.Reset(0)
|
|
valOffset = 0
|
|
for _, key := range keys {
|
|
bitmap := collation.indexBitmaps[key]
|
|
it := bitmap.Iterator()
|
|
for it.HasNext() {
|
|
txNum := it.Next()
|
|
binary.BigEndian.PutUint64(txKey[:], txNum)
|
|
historyKey = append(append(historyKey[:0], txKey[:]...), key...)
|
|
if err = rs.AddKey(historyKey, valOffset); err != nil {
|
|
return HistoryFiles{}, fmt.Errorf("add %s history idx [%x]: %w", h.filenameBase, historyKey, err)
|
|
}
|
|
valOffset = g.Skip()
|
|
}
|
|
}
|
|
if err = rs.Build(); err != nil {
|
|
if rs.Collision() {
|
|
log.Info("Building recsplit. Collision happened. It's ok. Restarting...")
|
|
rs.ResetNextSalt()
|
|
} else {
|
|
return HistoryFiles{}, fmt.Errorf("build idx: %w", err)
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
rs.Close()
|
|
rs = nil
|
|
if historyIdx, err = recsplit.OpenIndex(historyIdxPath); err != nil {
|
|
return HistoryFiles{}, fmt.Errorf("open idx: %w", err)
|
|
}
|
|
closeComp = false
|
|
return HistoryFiles{
|
|
historyDecomp: historyDecomp,
|
|
historyIdx: historyIdx,
|
|
efHistoryDecomp: efHistoryDecomp,
|
|
efHistoryIdx: efHistoryIdx,
|
|
}, nil
|
|
}
|
|
|
|
func (h *History) integrateFiles(sf HistoryFiles, txNumFrom, txNumTo uint64) {
|
|
h.InvertedIndex.integrateFiles(InvertedFiles{
|
|
decomp: sf.efHistoryDecomp,
|
|
index: sf.efHistoryIdx,
|
|
}, txNumFrom, txNumTo)
|
|
h.files.ReplaceOrInsert(&filesItem{
|
|
startTxNum: txNumFrom,
|
|
endTxNum: txNumTo,
|
|
decompressor: sf.historyDecomp,
|
|
index: sf.historyIdx,
|
|
})
|
|
}
|
|
|
|
func (h *History) warmup(txFrom, limit uint64, tx kv.Tx) error {
|
|
historyKeysCursor, err := tx.CursorDupSort(h.indexKeysTable)
|
|
if err != nil {
|
|
return fmt.Errorf("create %s history cursor: %w", h.filenameBase, err)
|
|
}
|
|
defer historyKeysCursor.Close()
|
|
var txKey [8]byte
|
|
binary.BigEndian.PutUint64(txKey[:], txFrom)
|
|
idxC, err := tx.CursorDupSort(h.indexTable)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer idxC.Close()
|
|
valsC, err := tx.Cursor(h.historyValsTable)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer valsC.Close()
|
|
k, v, err := historyKeysCursor.Seek(txKey[:])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if k == nil {
|
|
return nil
|
|
}
|
|
txFrom = binary.BigEndian.Uint64(k)
|
|
txTo := txFrom + h.aggregationStep
|
|
if limit != math.MaxUint64 && limit != 0 {
|
|
txTo = txFrom + limit
|
|
}
|
|
for ; err == nil && k != nil; k, v, err = historyKeysCursor.Next() {
|
|
txNum := binary.BigEndian.Uint64(k)
|
|
if txNum >= txTo {
|
|
break
|
|
}
|
|
_, _, _ = valsC.Seek(v[len(v)-8:])
|
|
_, _ = idxC.SeekBothRange(v[:len(v)-8], k)
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("iterate over %s history keys: %w", h.filenameBase, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *History) prune(ctx context.Context, txFrom, txTo, limit uint64, logEvery *time.Ticker) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
historyKeysCursor, err := h.tx.RwCursorDupSort(h.indexKeysTable)
|
|
if err != nil {
|
|
return fmt.Errorf("create %s history cursor: %w", h.filenameBase, err)
|
|
}
|
|
defer historyKeysCursor.Close()
|
|
var txKey [8]byte
|
|
binary.BigEndian.PutUint64(txKey[:], txFrom)
|
|
|
|
k, v, err := historyKeysCursor.Seek(txKey[:])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if k == nil {
|
|
return nil
|
|
}
|
|
txFrom = binary.BigEndian.Uint64(k)
|
|
if limit != math.MaxUint64 && limit != 0 {
|
|
txTo = cmp.Min(txTo, txFrom+limit)
|
|
}
|
|
if txFrom >= txTo {
|
|
return nil
|
|
}
|
|
|
|
valsC, err := h.tx.RwCursor(h.historyValsTable)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer valsC.Close()
|
|
idxC, err := h.tx.RwCursorDupSort(h.indexTable)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer idxC.Close()
|
|
for ; err == nil && k != nil; k, v, err = historyKeysCursor.Next() {
|
|
txNum := binary.BigEndian.Uint64(k)
|
|
if txNum >= txTo {
|
|
break
|
|
}
|
|
if err = valsC.Delete(v[len(v)-8:]); err != nil {
|
|
return err
|
|
}
|
|
if err = idxC.DeleteExact(v[:len(v)-8], k); err != nil {
|
|
return err
|
|
}
|
|
// This DeleteCurrent needs to the last in the loop iteration, because it invalidates k and v
|
|
if err = historyKeysCursor.DeleteCurrent(); err != nil {
|
|
return err
|
|
}
|
|
select {
|
|
case <-logEvery.C:
|
|
log.Info("[snapshots] prune history", "name", h.filenameBase, "range", fmt.Sprintf("%.2f-%.2f", float64(txNum)/float64(h.aggregationStep), float64(txTo)/float64(h.aggregationStep)))
|
|
default:
|
|
}
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("iterate over %s history keys: %w", h.filenameBase, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *History) pruneF(txFrom, txTo uint64, f func(txNum uint64, k, v []byte) error) error {
|
|
historyKeysCursor, err := h.tx.RwCursorDupSort(h.indexKeysTable)
|
|
if err != nil {
|
|
return fmt.Errorf("create %s history cursor: %w", h.filenameBase, err)
|
|
}
|
|
defer historyKeysCursor.Close()
|
|
var txKey [8]byte
|
|
binary.BigEndian.PutUint64(txKey[:], txFrom)
|
|
var k, v []byte
|
|
idxC, err := h.tx.RwCursorDupSort(h.indexTable)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer idxC.Close()
|
|
valsC, err := h.tx.RwCursor(h.historyValsTable)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer valsC.Close()
|
|
for k, v, err = historyKeysCursor.Seek(txKey[:]); err == nil && k != nil; k, v, err = historyKeysCursor.Next() {
|
|
txNum := binary.BigEndian.Uint64(k)
|
|
if txNum >= txTo {
|
|
break
|
|
}
|
|
key, txnNumBytes := v[:len(v)-8], v[len(v)-8:]
|
|
{
|
|
kk, vv, err := valsC.SeekExact(txnNumBytes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := f(txNum, key, vv); err != nil {
|
|
return err
|
|
}
|
|
if kk != nil {
|
|
if err = valsC.DeleteCurrent(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
if err = idxC.DeleteExact(key, k); err != nil {
|
|
return err
|
|
}
|
|
// This DeleteCurrent needs to the last in the loop iteration, because it invalidates k and v
|
|
if err = historyKeysCursor.DeleteCurrent(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("iterate over %s history keys: %w", h.filenameBase, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type HistoryContext struct {
|
|
h *History
|
|
indexFiles, historyFiles *btree.BTreeG[ctxItem]
|
|
|
|
tx kv.Tx
|
|
}
|
|
|
|
func (h *History) MakeContext() *HistoryContext {
|
|
var hc = HistoryContext{h: h}
|
|
hc.indexFiles = btree.NewG[ctxItem](32, ctxItemLess)
|
|
h.InvertedIndex.files.Ascend(func(item *filesItem) bool {
|
|
if item.index == nil {
|
|
return false
|
|
}
|
|
hc.indexFiles.ReplaceOrInsert(ctxItem{
|
|
startTxNum: item.startTxNum,
|
|
endTxNum: item.endTxNum,
|
|
getter: item.decompressor.MakeGetter(),
|
|
reader: recsplit.NewIndexReader(item.index),
|
|
})
|
|
return true
|
|
})
|
|
hc.historyFiles = btree.NewG[ctxItem](32, ctxItemLess)
|
|
h.files.Ascend(func(item *filesItem) bool {
|
|
if item.index == nil {
|
|
return false
|
|
}
|
|
hc.historyFiles.ReplaceOrInsert(ctxItem{
|
|
startTxNum: item.startTxNum,
|
|
endTxNum: item.endTxNum,
|
|
getter: item.decompressor.MakeGetter(),
|
|
reader: recsplit.NewIndexReader(item.index),
|
|
})
|
|
return true
|
|
})
|
|
return &hc
|
|
}
|
|
func (hc *HistoryContext) SetTx(tx kv.Tx) { hc.tx = tx }
|
|
|
|
func (hc *HistoryContext) GetNoState(key []byte, txNum uint64) ([]byte, bool, error) {
|
|
//fmt.Printf("GetNoState [%x] %d\n", key, txNum)
|
|
var foundTxNum uint64
|
|
var foundEndTxNum uint64
|
|
var foundStartTxNum uint64
|
|
var found bool
|
|
hc.indexFiles.AscendGreaterOrEqual(ctxItem{startTxNum: txNum, endTxNum: txNum}, func(item ctxItem) bool {
|
|
//fmt.Printf("ef item %d-%d, key %x\n", item.startTxNum, item.endTxNum, key)
|
|
if item.reader.Empty() {
|
|
return true
|
|
}
|
|
offset := item.reader.Lookup(key)
|
|
g := item.getter
|
|
g.Reset(offset)
|
|
k, _ := g.NextUncompressed()
|
|
if !bytes.Equal(k, key) {
|
|
return true
|
|
}
|
|
//fmt.Printf("Found key=%x\n", k)
|
|
eliasVal, _ := g.NextUncompressed()
|
|
ef, _ := eliasfano32.ReadEliasFano(eliasVal)
|
|
n, ok := ef.Search(txNum)
|
|
if ok {
|
|
foundTxNum = n
|
|
foundEndTxNum = item.endTxNum
|
|
foundStartTxNum = item.startTxNum
|
|
found = true
|
|
//fmt.Printf("Found n=%d\n", n)
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
if found {
|
|
var historyItem ctxItem
|
|
var ok bool
|
|
var search ctxItem
|
|
search.startTxNum = foundStartTxNum
|
|
search.endTxNum = foundEndTxNum
|
|
if historyItem, ok = hc.historyFiles.Get(search); !ok {
|
|
return nil, false, fmt.Errorf("no %s file found for [%x]", hc.h.filenameBase, key)
|
|
}
|
|
var txKey [8]byte
|
|
binary.BigEndian.PutUint64(txKey[:], foundTxNum)
|
|
offset := historyItem.reader.Lookup2(txKey[:], key)
|
|
//fmt.Printf("offset = %d, txKey=[%x], key=[%x]\n", offset, txKey[:], key)
|
|
g := historyItem.getter
|
|
g.Reset(offset)
|
|
if hc.h.compressVals {
|
|
v, _ := g.Next(nil)
|
|
return v, true, nil
|
|
}
|
|
v, _ := g.NextUncompressed()
|
|
return v, true, nil
|
|
}
|
|
return nil, false, nil
|
|
}
|
|
|
|
// GetNoStateWithRecent searches history for a value of specified key before txNum
|
|
// second return value is true if the value is found in the history (even if it is nil)
|
|
func (hc *HistoryContext) GetNoStateWithRecent(key []byte, txNum uint64, roTx kv.Tx) ([]byte, bool, error) {
|
|
v, ok, err := hc.GetNoState(key, txNum)
|
|
if err != nil {
|
|
return nil, ok, err
|
|
}
|
|
if ok {
|
|
return v, true, nil
|
|
}
|
|
|
|
// Value not found in history files, look in the recent history
|
|
if roTx == nil {
|
|
return nil, false, fmt.Errorf("roTx is nil")
|
|
}
|
|
v, ok, err = hc.getNoStateFromDB(key, txNum, roTx)
|
|
if err != nil {
|
|
return nil, ok, err
|
|
}
|
|
if ok {
|
|
return v, true, nil
|
|
}
|
|
return nil, false, err
|
|
}
|
|
|
|
func (hc *HistoryContext) getNoStateFromDB(key []byte, txNum uint64, tx kv.Tx) ([]byte, bool, error) {
|
|
indexCursor, err := tx.CursorDupSort(hc.h.indexTable)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
defer indexCursor.Close()
|
|
var txKey [8]byte
|
|
binary.BigEndian.PutUint64(txKey[:], txNum)
|
|
var foundTxNumVal []byte
|
|
if foundTxNumVal, err = indexCursor.SeekBothRange(key, txKey[:]); err != nil {
|
|
return nil, false, err
|
|
}
|
|
if foundTxNumVal != nil {
|
|
var historyKeysCursor kv.CursorDupSort
|
|
if historyKeysCursor, err = tx.CursorDupSort(hc.h.indexKeysTable); err != nil {
|
|
return nil, false, err
|
|
}
|
|
defer historyKeysCursor.Close()
|
|
var vn []byte
|
|
if vn, err = historyKeysCursor.SeekBothRange(foundTxNumVal, key); err != nil {
|
|
return nil, false, err
|
|
}
|
|
valNum := binary.BigEndian.Uint64(vn[len(vn)-8:])
|
|
if valNum == 0 {
|
|
// This is special valNum == 0, which is empty value
|
|
return nil, true, nil
|
|
}
|
|
var v []byte
|
|
if v, err = tx.GetOne(hc.h.historyValsTable, vn[len(vn)-8:]); err != nil {
|
|
return nil, false, err
|
|
}
|
|
return v, true, nil
|
|
}
|
|
return nil, false, nil
|
|
}
|
|
|
|
func (hc *HistoryContext) IterateChanged(startTxNum, endTxNum uint64, roTx kv.Tx) *HistoryIterator1 {
|
|
hi := HistoryIterator1{
|
|
hasNextInDb: true,
|
|
roTx: roTx,
|
|
indexTable: hc.h.indexTable,
|
|
idxKeysTable: hc.h.indexKeysTable,
|
|
valsTable: hc.h.historyValsTable,
|
|
}
|
|
|
|
hc.indexFiles.Ascend(func(item ctxItem) bool {
|
|
if item.endTxNum >= endTxNum {
|
|
hi.hasNextInDb = false
|
|
}
|
|
if item.endTxNum <= startTxNum {
|
|
return true
|
|
}
|
|
if item.startTxNum >= endTxNum {
|
|
return false
|
|
}
|
|
g := item.getter
|
|
g.Reset(0)
|
|
if g.HasNext() {
|
|
key, offset := g.NextUncompressed()
|
|
heap.Push(&hi.h, &ReconItem{g: g, key: key, startTxNum: item.startTxNum, endTxNum: item.endTxNum, txNum: item.endTxNum, startOffset: offset, lastOffset: offset})
|
|
hi.hasNextInFiles = true
|
|
}
|
|
hi.total += uint64(item.getter.Size())
|
|
return true
|
|
})
|
|
hi.hc = hc
|
|
hi.compressVals = hc.h.compressVals
|
|
hi.startTxNum = startTxNum
|
|
hi.endTxNum = endTxNum
|
|
binary.BigEndian.PutUint64(hi.startTxKey[:], startTxNum)
|
|
hi.advanceInDb()
|
|
hi.advanceInFiles()
|
|
hi.advance()
|
|
return &hi
|
|
}
|
|
|
|
type HistoryIterator1 struct {
|
|
roTx kv.Tx
|
|
txNum2kCursor kv.CursorDupSort
|
|
idxCursor kv.CursorDupSort
|
|
hc *HistoryContext
|
|
valsTable string
|
|
idxKeysTable string
|
|
indexTable string
|
|
nextFileKey []byte
|
|
nextDbKey []byte
|
|
nextDbVal []byte
|
|
nextFileVal []byte
|
|
nextVal []byte
|
|
nextKey []byte
|
|
h ReconHeap
|
|
total uint64
|
|
endTxNum uint64
|
|
startTxNum uint64
|
|
advFileCnt int
|
|
advDbCnt int
|
|
startTxKey [8]byte
|
|
txnKey [8]byte
|
|
hasNextInFiles bool
|
|
hasNextInDb bool
|
|
compressVals bool
|
|
}
|
|
|
|
func (hi *HistoryIterator1) Stat() (int, int) { return hi.advDbCnt, hi.advFileCnt }
|
|
|
|
func (hi *HistoryIterator1) Close() {
|
|
if hi.idxCursor != nil {
|
|
hi.idxCursor.Close()
|
|
}
|
|
if hi.txNum2kCursor != nil {
|
|
hi.txNum2kCursor.Close()
|
|
}
|
|
}
|
|
|
|
func (hi *HistoryIterator1) advanceInFiles() {
|
|
hi.advFileCnt++
|
|
for hi.h.Len() > 0 {
|
|
top := heap.Pop(&hi.h).(*ReconItem)
|
|
key := top.key
|
|
var idxVal []byte
|
|
if hi.compressVals {
|
|
idxVal, _ = top.g.Next(nil)
|
|
} else {
|
|
idxVal, _ = top.g.NextUncompressed()
|
|
}
|
|
if top.g.HasNext() {
|
|
if hi.compressVals {
|
|
top.key, _ = top.g.Next(nil)
|
|
} else {
|
|
top.key, _ = top.g.NextUncompressed()
|
|
}
|
|
heap.Push(&hi.h, top)
|
|
}
|
|
|
|
if bytes.Equal(key, hi.nextFileKey) {
|
|
continue
|
|
}
|
|
ef, _ := eliasfano32.ReadEliasFano(idxVal)
|
|
n, ok := ef.Search(hi.startTxNum)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if n >= hi.endTxNum {
|
|
continue
|
|
}
|
|
|
|
hi.nextFileKey = key
|
|
binary.BigEndian.PutUint64(hi.txnKey[:], n)
|
|
search := ctxItem{startTxNum: top.startTxNum, endTxNum: top.endTxNum}
|
|
historyItem, ok := hi.hc.historyFiles.Get(search)
|
|
if !ok {
|
|
panic(fmt.Errorf("no %s file found for [%x]", hi.hc.h.filenameBase, hi.nextFileKey))
|
|
}
|
|
offset := historyItem.reader.Lookup2(hi.txnKey[:], hi.nextFileKey)
|
|
g := historyItem.getter
|
|
g.Reset(offset)
|
|
if hi.compressVals {
|
|
hi.nextFileVal, _ = g.Next(nil)
|
|
} else {
|
|
hi.nextFileVal, _ = g.NextUncompressed()
|
|
}
|
|
hi.nextFileKey = key
|
|
return
|
|
}
|
|
hi.hasNextInFiles = false
|
|
}
|
|
|
|
func (hi *HistoryIterator1) advanceInDb() {
|
|
hi.advDbCnt++
|
|
var k []byte
|
|
var err error
|
|
if hi.idxCursor == nil {
|
|
if hi.idxCursor, err = hi.roTx.CursorDupSort(hi.indexTable); err != nil {
|
|
// TODO pass error properly around
|
|
panic(err)
|
|
}
|
|
if hi.txNum2kCursor, err = hi.roTx.CursorDupSort(hi.idxKeysTable); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if k, _, err = hi.idxCursor.First(); err != nil {
|
|
// TODO pass error properly around
|
|
panic(err)
|
|
}
|
|
} else {
|
|
if k, _, err = hi.idxCursor.NextNoDup(); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
for ; k != nil; k, _, err = hi.idxCursor.NextNoDup() {
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
foundTxNumVal, err := hi.idxCursor.SeekBothRange(k, hi.startTxKey[:])
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
if foundTxNumVal == nil {
|
|
continue
|
|
}
|
|
txNum := binary.BigEndian.Uint64(foundTxNumVal)
|
|
if txNum >= hi.endTxNum {
|
|
continue
|
|
}
|
|
hi.nextDbKey = append(hi.nextDbKey[:0], k...)
|
|
vn, err := hi.txNum2kCursor.SeekBothRange(foundTxNumVal, k)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
valNum := binary.BigEndian.Uint64(vn[len(vn)-8:])
|
|
if valNum == 0 {
|
|
// This is special valNum == 0, which is empty value
|
|
hi.nextDbVal = hi.nextDbVal[:0]
|
|
return
|
|
}
|
|
v, err := hi.roTx.GetOne(hi.valsTable, vn[len(vn)-8:])
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
hi.nextDbVal = append(hi.nextDbVal[:0], v...)
|
|
return
|
|
}
|
|
hi.idxCursor.Close()
|
|
hi.idxCursor = nil
|
|
hi.hasNextInDb = false
|
|
}
|
|
|
|
func (hi *HistoryIterator1) advance() {
|
|
if hi.hasNextInFiles {
|
|
if hi.hasNextInDb {
|
|
c := bytes.Compare(hi.nextFileKey, hi.nextDbKey)
|
|
if c < 0 {
|
|
hi.nextKey = append(hi.nextKey[:0], hi.nextFileKey...)
|
|
hi.nextVal = append(hi.nextVal[:0], hi.nextFileVal...)
|
|
hi.advanceInFiles()
|
|
} else if c > 0 {
|
|
hi.nextKey = append(hi.nextKey[:0], hi.nextDbKey...)
|
|
hi.nextVal = append(hi.nextVal[:0], hi.nextDbVal...)
|
|
hi.advanceInDb()
|
|
} else {
|
|
hi.nextKey = append(hi.nextKey[:0], hi.nextFileKey...)
|
|
hi.nextVal = append(hi.nextVal[:0], hi.nextFileVal...)
|
|
hi.advanceInDb()
|
|
hi.advanceInFiles()
|
|
}
|
|
} else {
|
|
hi.nextKey = append(hi.nextKey[:0], hi.nextFileKey...)
|
|
hi.nextVal = append(hi.nextVal[:0], hi.nextFileVal...)
|
|
hi.advanceInFiles()
|
|
}
|
|
} else if hi.hasNextInDb {
|
|
hi.nextKey = append(hi.nextKey[:0], hi.nextDbKey...)
|
|
hi.nextVal = append(hi.nextVal[:0], hi.nextDbVal...)
|
|
hi.advanceInDb()
|
|
} else {
|
|
hi.nextKey = nil
|
|
hi.nextVal = nil
|
|
}
|
|
}
|
|
|
|
func (hi *HistoryIterator1) HasNext() bool {
|
|
return hi.hasNextInFiles || hi.hasNextInDb || hi.nextKey != nil
|
|
}
|
|
|
|
func (hi *HistoryIterator1) Next(keyBuf, valBuf []byte) ([]byte, []byte) {
|
|
k := append(keyBuf, hi.nextKey...)
|
|
v := append(valBuf, hi.nextVal...)
|
|
hi.advance()
|
|
return k, v
|
|
}
|
|
|
|
func (h *History) DisableReadAhead() {
|
|
h.InvertedIndex.DisableReadAhead()
|
|
h.files.Ascend(func(item *filesItem) bool {
|
|
item.decompressor.DisableReadAhead()
|
|
if item.index != nil {
|
|
item.index.DisableReadAhead()
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (h *History) EnableReadAhead() *History {
|
|
h.InvertedIndex.EnableReadAhead()
|
|
h.files.Ascend(func(item *filesItem) bool {
|
|
item.decompressor.EnableReadAhead()
|
|
if item.index != nil {
|
|
item.index.EnableReadAhead()
|
|
}
|
|
return true
|
|
})
|
|
return h
|
|
}
|
|
func (h *History) EnableMadvWillNeed() *History {
|
|
h.InvertedIndex.EnableMadvWillNeed()
|
|
h.files.Ascend(func(item *filesItem) bool {
|
|
item.decompressor.EnableWillNeed()
|
|
if item.index != nil {
|
|
item.index.EnableWillNeed()
|
|
}
|
|
return true
|
|
})
|
|
return h
|
|
}
|
|
func (h *History) EnableMadvNormalReadAhead() *History {
|
|
h.InvertedIndex.EnableMadvNormalReadAhead()
|
|
h.files.Ascend(func(item *filesItem) bool {
|
|
item.decompressor.EnableMadvNormal()
|
|
if item.index != nil {
|
|
item.index.EnableMadvNormal()
|
|
}
|
|
return true
|
|
})
|
|
return h
|
|
}
|