erigon3: build .vi after downloading (#659)

This commit is contained in:
Alex Sharov 2022-09-29 12:14:45 +07:00 committed by GitHub
parent ec49625cd9
commit 784b6cc904
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 266 additions and 69 deletions

View File

@ -20,6 +20,17 @@ func Exist(path string) bool {
return true
}
func FileExist(path string) bool {
fi, err := os.Stat(path)
if err != nil && os.IsNotExist(err) {
return false
}
if !fi.Mode().IsRegular() {
return false
}
return true
}
func Recreate(dir string) {
if Exist(dir) {
_ = os.RemoveAll(dir)

View File

@ -365,6 +365,13 @@ func (d *Decompressor) WithReadAhead(f func() error) error {
return f()
}
// DisableReadAhead - usage: `defer d.EnableReadAhead().DisableReadAhead()`. Please don't use this funcs without `defer` to avoid leak.
func (d *Decompressor) DisableReadAhead() { _ = mmap.MadviseRandom(d.mmapHandle1) }
func (d *Decompressor) EnableReadAhead() *Decompressor {
_ = mmap.MadviseSequential(d.mmapHandle1)
return d
}
// Getter represent "reader" or "interator" that can move accross the data of the decompressor
// The full state of the getter can be captured by saving dataP, and dataBit
type Getter struct {

View File

@ -32,6 +32,7 @@ import (
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/google/btree"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon-lib/common"
@ -181,8 +182,12 @@ func (d *Domain) openFiles() error {
invalidFileItems := make([]*filesItem, 0)
d.files.Ascend(func(item *filesItem) bool {
datPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kv", d.filenameBase, item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep))
if fi, err := os.Stat(datPath); err != nil || fi.IsDir() {
if item.decompressor != nil {
item.decompressor.Close()
}
fromStep, toStep := item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep
datPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kv", d.filenameBase, fromStep, toStep))
if !dir.FileExist(datPath) {
invalidFileItems = append(invalidFileItems, item)
return true
}
@ -190,15 +195,16 @@ func (d *Domain) openFiles() error {
return false
}
idxPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kvi", d.filenameBase, item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep))
if fi, err := os.Stat(idxPath); err != nil || fi.IsDir() {
invalidFileItems = append(invalidFileItems, item)
return true
if item.index == nil {
idxPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kvi", d.filenameBase, fromStep, toStep))
if dir.FileExist(idxPath) {
if item.index, err = recsplit.OpenIndex(idxPath); err != nil {
log.Debug("InvertedIndex.openFiles: %w, %s", err, idxPath)
return false
}
totalKeys += item.index.KeyCount()
}
}
if item.index, err = recsplit.OpenIndex(idxPath); err != nil {
return false
}
totalKeys += item.index.KeyCount()
return true
})
if err != nil {
@ -410,6 +416,9 @@ func (d *Domain) MakeContext() *DomainContext {
var datsz, idxsz, files uint64
d.files.Ascend(func(item *filesItem) bool {
if item.index == nil {
return false
}
getter := item.decompressor.MakeGetter()
datsz += uint64(getter.Size())
idxsz += uint64(item.index.Size())
@ -708,6 +717,29 @@ func (d *Domain) buildFiles(step uint64, collation Collation) (StaticFiles, erro
}, nil
}
func (d *Domain) missedIdxFiles() (l []*filesItem) {
d.files.Ascend(func(item *filesItem) bool { // don't run slow logic while iterating on btree
fromStep, toStep := item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep
if !dir.FileExist(filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kvi", d.filenameBase, fromStep, toStep))) {
l = append(l, item)
}
return true
})
return l
}
// BuildMissedIndices - produce .efi/.vi/.kvi from .ef/.v/.kv
func (d *Domain) BuildMissedIndices() (err error) {
if err := d.History.BuildMissedIndices(); err != nil {
return err
}
for _, item := range d.missedIdxFiles() {
//TODO: build .kvi
_ = item
}
return d.openFiles()
}
func buildIndex(d *compress.Decompressor, idxPath, dir string, count int, values bool) (*recsplit.Index, error) {
var rs *recsplit.RecSplit
var err error
@ -722,6 +754,8 @@ func buildIndex(d *compress.Decompressor, idxPath, dir string, count int, values
return nil, fmt.Errorf("create recsplit: %w", err)
}
defer rs.Close()
defer d.EnableReadAhead().DisableReadAhead()
word := make([]byte, 0, 256)
var keyPos, valPos uint64
g := d.MakeGetter()

View File

@ -30,13 +30,14 @@ import (
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/google/btree"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/recsplit"
"github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
)
type History struct {
@ -131,31 +132,29 @@ func (h *History) openFiles() error {
invalidFileItems := make([]*filesItem, 0)
h.files.Ascend(func(item *filesItem) bool {
datPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.v", h.filenameBase, item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep))
if item.decompressor != nil {
item.decompressor.Close()
}
fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep
datPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.v", h.filenameBase, fromStep, toStep))
if fi, err := os.Stat(datPath); err != nil || fi.IsDir() {
invalidFileItems = append(invalidFileItems, item)
return true
}
if item.decompressor, err = compress.NewDecompressor(datPath); err != nil {
log.Debug("Hisrory.openFiles: %w, %s", err, datPath)
return false
}
idxPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep))
if fi, err := os.Stat(idxPath); err != nil || fi.IsDir() {
invalidFileItems = append(invalidFileItems, item)
return true
if item.index == nil {
idxPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, fromStep, toStep))
if dir.FileExist(idxPath) {
if item.index, err = recsplit.OpenIndex(idxPath); err != nil {
log.Debug("Hisrory.openFiles: %w, %s", err, idxPath)
return false
}
totalKeys += item.index.KeyCount()
}
}
//if !dir.Exist(idxPath) {
// if _, err = buildIndex(item.decompressor, idxPath, h.dir, item.decompressor.Count()/2, false /* values */); err != nil {
// return false
// }
//}
if item.index, err = recsplit.OpenIndex(idxPath); err != nil {
return false
}
totalKeys += item.index.KeyCount()
return true
})
if err != nil {
@ -196,11 +195,154 @@ func (h *History) Files() (res []string) {
return res
}
func (h *History) missedIdxFiles() (l []*filesItem) {
h.files.Ascend(func(item *filesItem) bool { // don't run slow logic while iterating on btree
fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep
if !dir.FileExist(filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, fromStep, toStep))) {
l = append(l, item)
}
return true
})
return l
}
// BuildMissedIndices - produce .efi/.vi/.kvi from .ef/.v/.kv
func (h *History) BuildMissedIndices() (err error) {
if err := h.InvertedIndex.BuildMissedIndices(); err != nil {
return err
}
//TODO: build .vi
for _, item := range h.missedIdxFiles() {
search := &filesItem{startTxNum: item.startTxNum, endTxNum: item.endTxNum}
iiItem, ok := h.InvertedIndex.files.Get(search)
if !ok {
return nil
}
fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep
idxPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, fromStep, toStep))
count, err := iterateForVi(item, iiItem, h.compressVals, func(v []byte) error { return nil })
if err != nil {
return err
}
if err := buildVi(item, iiItem, idxPath, h.dir, count, false /* values */, h.compressVals); err != nil {
return err
}
}
return h.openFiles()
}
func iterateForVi(historyItem, iiItem *filesItem, compressVals bool, f func(v []byte) error) (count int, err error) {
var cp CursorHeap
heap.Init(&cp)
g := iiItem.decompressor.MakeGetter()
g.Reset(0)
if g.HasNext() {
g2 := historyItem.decompressor.MakeGetter()
key, _ := g.NextUncompressed()
val, _ := g.NextUncompressed()
heap.Push(&cp, &CursorItem{
t: FILE_CURSOR,
dg: g,
dg2: g2,
key: key,
val: val,
endTxNum: iiItem.endTxNum,
reverse: false,
})
}
// In the loop below, the pair `keyBuf=>valBuf` is always 1 item behind `lastKey=>lastVal`.
// `lastKey` and `lastVal` are taken from the top of the multi-way merge (assisted by the CursorHeap cp), but not processed right away
// instead, the pair from the previous iteration is processed first - `keyBuf=>valBuf`. After that, `keyBuf` and `valBuf` are assigned
// to `lastKey` and `lastVal` correspondingly, and the next step of multi-way merge happens. Therefore, after the multi-way merge loop
// (when CursorHeap cp is empty), there is a need to process the last pair `keyBuf=>valBuf`, because it was one step behind
var valBuf []byte
for cp.Len() > 0 {
lastKey := common.Copy(cp[0].key)
// Advance all the items that have this key (including the top)
//var mergeOnce bool
for cp.Len() > 0 && bytes.Equal(cp[0].key, lastKey) {
ci1 := cp[0]
ef, _ := eliasfano32.ReadEliasFano(ci1.val)
for i := uint64(0); i < ef.Count(); i++ {
if compressVals {
valBuf, _ = ci1.dg2.Next(valBuf[:0])
} else {
valBuf, _ = ci1.dg2.NextUncompressed()
}
if err = f(valBuf); err != nil {
return count, err
}
}
count += int(ef.Count())
if ci1.dg.HasNext() {
ci1.key, _ = ci1.dg.NextUncompressed()
ci1.val, _ = ci1.dg.NextUncompressed()
heap.Fix(&cp, 0)
} else {
heap.Remove(&cp, 0)
}
}
}
return count, nil
}
func buildVi(historyItem, iiItem *filesItem, historyIdxPath, dir string, count int, values, compressVals bool) error {
rs, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{
KeyCount: count,
Enums: false,
BucketSize: 2000,
LeafSize: 8,
TmpDir: dir,
IndexFile: historyIdxPath,
})
if err != nil {
return fmt.Errorf("create recsplit: %w", err)
}
defer rs.Close()
var historyKey []byte
var txKey [8]byte
var valOffset uint64
defer iiItem.decompressor.EnableReadAhead().DisableReadAhead()
defer historyItem.decompressor.EnableReadAhead().DisableReadAhead()
g := iiItem.decompressor.MakeGetter()
g2 := historyItem.decompressor.MakeGetter()
var keyBuf, valBuf []byte
for {
g.Reset(0)
g2.Reset(0)
valOffset = 0
for g.HasNext() {
keyBuf, _ = g.NextUncompressed()
valBuf, _ = g.NextUncompressed()
ef, _ := eliasfano32.ReadEliasFano(valBuf)
efIt := ef.Iterator()
for efIt.HasNext() {
txNum := efIt.Next()
binary.BigEndian.PutUint64(txKey[:], txNum)
historyKey = append(append(historyKey[:0], txKey[:]...), keyBuf...)
if err = rs.AddKey(historyKey, valOffset); err != nil {
return err
}
if compressVals {
valOffset = g2.Skip()
} else {
valOffset = g2.SkipUncompressed()
}
}
}
if err = rs.Build(); err != nil {
if rs.Collision() {
log.Info("Building recsplit. Collision happened. It's ok. Restarting...")
rs.ResetNextSalt()
} else {
return fmt.Errorf("build %s idx: %w", historyIdxPath, err)
}
} else {
break
}
}
return nil
}
@ -611,6 +753,9 @@ func (h *History) MakeContext() *HistoryContext {
var hc = HistoryContext{h: h}
hc.indexFiles = btree.NewG[ctxItem](32, ctxItemLess)
h.InvertedIndex.files.Ascend(func(item *filesItem) bool {
if item.index == nil {
return false
}
hc.indexFiles.ReplaceOrInsert(ctxItem{
startTxNum: item.startTxNum,
endTxNum: item.endTxNum,
@ -621,6 +766,9 @@ func (h *History) MakeContext() *HistoryContext {
})
hc.historyFiles = btree.NewG[ctxItem](32, ctxItemLess)
h.files.Ascend(func(item *filesItem) bool {
if item.index == nil {
return false
}
hc.historyFiles.ReplaceOrInsert(ctxItem{
startTxNum: item.startTxNum,
endTxNum: item.endTxNum,

View File

@ -27,7 +27,6 @@ import (
"path/filepath"
"regexp"
"strconv"
"strings"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/google/btree"
@ -127,36 +126,22 @@ func (ii *InvertedIndex) scanStateFiles(files []fs.DirEntry) {
}
}
func (ii *InvertedIndex) BuildMissedIndices() (err error) {
var missedIndices []uint64
func (ii *InvertedIndex) missedIdxFiles() (l []*filesItem) {
ii.files.Ascend(func(item *filesItem) bool { // don't run slow logic while iterating on btree
fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep
idxPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep))
if !dir.Exist(idxPath) {
missedIndices = append(missedIndices, fromStep, toStep)
if !dir.FileExist(filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep))) {
l = append(l, item)
}
return true
})
if len(missedIndices) == 0 {
return nil
}
var logItems []string
for i := 0; i < len(missedIndices); i += 2 {
fromStep, toStep := missedIndices[i], missedIndices[i+1]
logItems = append(logItems, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep))
}
log.Info("[snapshots] BuildMissedIndices", "files", strings.Join(logItems, ","))
return l
}
for i := 0; i < len(missedIndices); i += 2 {
fromStep, toStep := missedIndices[i], missedIndices[i+1]
// BuildMissedIndices - produce .efi/.vi/.kvi from .ef/.v/.kv
func (ii *InvertedIndex) BuildMissedIndices() (err error) {
for _, item := range ii.missedIdxFiles() {
fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep
idxPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep))
if dir.Exist(idxPath) {
return nil
}
item, ok := ii.files.Get(&filesItem{startTxNum: fromStep * ii.aggregationStep, endTxNum: toStep * ii.aggregationStep})
if !ok {
return nil
}
if _, err := buildIndex(item.decompressor, idxPath, ii.dir, item.decompressor.Count()/2, false /* values */); err != nil {
return err
}
@ -167,28 +152,36 @@ func (ii *InvertedIndex) BuildMissedIndices() (err error) {
func (ii *InvertedIndex) openFiles() error {
var err error
var totalKeys uint64
var invalidFileItems []*filesItem
ii.files.Ascend(func(item *filesItem) bool {
fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep
if item.decompressor == nil {
datPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.ef", ii.filenameBase, fromStep, toStep))
if item.decompressor, err = compress.NewDecompressor(datPath); err != nil {
log.Debug("InvertedIndex.openFiles: %w, %s", err, datPath)
return false
}
if item.decompressor != nil {
item.decompressor.Close()
}
fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep
datPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.ef", ii.filenameBase, fromStep, toStep))
if !dir.FileExist(datPath) {
invalidFileItems = append(invalidFileItems, item)
}
if item.decompressor, err = compress.NewDecompressor(datPath); err != nil {
log.Debug("InvertedIndex.openFiles: %w, %s", err, datPath)
return false
}
if item.index == nil {
idxPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep))
if !dir.Exist(idxPath) {
return false
}
if item.index, err = recsplit.OpenIndex(idxPath); err != nil {
log.Debug("InvertedIndex.openFiles: %w, %s", err, idxPath)
return false
if dir.FileExist(idxPath) {
if item.index, err = recsplit.OpenIndex(idxPath); err != nil {
log.Debug("InvertedIndex.openFiles: %w, %s", err, idxPath)
return false
}
totalKeys += item.index.KeyCount()
}
}
totalKeys += item.index.KeyCount()
return true
})
for _, item := range invalidFileItems {
ii.files.Delete(item)
}
if err != nil {
return err
}
@ -249,6 +242,10 @@ func (ii *InvertedIndex) MakeContext() *InvertedIndexContext {
var ic = InvertedIndexContext{ii: ii}
ic.files = btree.NewG[ctxItem](32, ctxItemLess)
ii.files.Ascend(func(item *filesItem) bool {
if item.index == nil {
return false
}
ic.files.ReplaceOrInsert(ctxItem{
startTxNum: item.startTxNum,
endTxNum: item.endTxNum,