From 784b6cc9045c26900c3707fa191dccc9988ccbdd Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Thu, 29 Sep 2022 12:14:45 +0700 Subject: [PATCH] erigon3: build .vi after downloading (#659) --- common/dir/rw_dir.go | 11 +++ compress/decompress.go | 7 ++ state/domain.go | 54 +++++++++--- state/history.go | 190 +++++++++++++++++++++++++++++++++++----- state/inverted_index.go | 73 ++++++++------- 5 files changed, 266 insertions(+), 69 deletions(-) diff --git a/common/dir/rw_dir.go b/common/dir/rw_dir.go index 7be9abcea..d53837ecc 100644 --- a/common/dir/rw_dir.go +++ b/common/dir/rw_dir.go @@ -20,6 +20,17 @@ func Exist(path string) bool { return true } +func FileExist(path string) bool { + fi, err := os.Stat(path) + if err != nil && os.IsNotExist(err) { + return false + } + if !fi.Mode().IsRegular() { + return false + } + return true +} + func Recreate(dir string) { if Exist(dir) { _ = os.RemoveAll(dir) diff --git a/compress/decompress.go b/compress/decompress.go index 8f849ea70..495a72811 100644 --- a/compress/decompress.go +++ b/compress/decompress.go @@ -365,6 +365,13 @@ func (d *Decompressor) WithReadAhead(f func() error) error { return f() } +// DisableReadAhead - usage: `defer d.EnableReadAhead().DisableReadAhead()`. Please don't use this funcs without `defer` to avoid leak. +func (d *Decompressor) DisableReadAhead() { _ = mmap.MadviseRandom(d.mmapHandle1) } +func (d *Decompressor) EnableReadAhead() *Decompressor { + _ = mmap.MadviseSequential(d.mmapHandle1) + return d +} + // Getter represent "reader" or "interator" that can move accross the data of the decompressor // The full state of the getter can be captured by saving dataP, and dataBit type Getter struct { diff --git a/state/domain.go b/state/domain.go index 03ff9a089..73f0d1d27 100644 --- a/state/domain.go +++ b/state/domain.go @@ -32,6 +32,7 @@ import ( "github.com/RoaringBitmap/roaring/roaring64" "github.com/google/btree" + "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/erigon-lib/common" @@ -181,8 +182,12 @@ func (d *Domain) openFiles() error { invalidFileItems := make([]*filesItem, 0) d.files.Ascend(func(item *filesItem) bool { - datPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kv", d.filenameBase, item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep)) - if fi, err := os.Stat(datPath); err != nil || fi.IsDir() { + if item.decompressor != nil { + item.decompressor.Close() + } + fromStep, toStep := item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep + datPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kv", d.filenameBase, fromStep, toStep)) + if !dir.FileExist(datPath) { invalidFileItems = append(invalidFileItems, item) return true } @@ -190,15 +195,16 @@ func (d *Domain) openFiles() error { return false } - idxPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kvi", d.filenameBase, item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep)) - if fi, err := os.Stat(idxPath); err != nil || fi.IsDir() { - invalidFileItems = append(invalidFileItems, item) - return true + if item.index == nil { + idxPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kvi", d.filenameBase, fromStep, toStep)) + if dir.FileExist(idxPath) { + if item.index, err = recsplit.OpenIndex(idxPath); err != nil { + log.Debug("InvertedIndex.openFiles: %w, %s", err, idxPath) + return false + } + totalKeys += item.index.KeyCount() + } } - if item.index, err = recsplit.OpenIndex(idxPath); err != nil { - return false - } - totalKeys += item.index.KeyCount() return true }) if err != nil { @@ -410,6 +416,9 @@ func (d *Domain) MakeContext() *DomainContext { var datsz, idxsz, files uint64 d.files.Ascend(func(item *filesItem) bool { + if item.index == nil { + return false + } getter := item.decompressor.MakeGetter() datsz += uint64(getter.Size()) idxsz += uint64(item.index.Size()) @@ -708,6 +717,29 @@ func (d *Domain) buildFiles(step uint64, collation Collation) (StaticFiles, erro }, nil } +func (d *Domain) missedIdxFiles() (l []*filesItem) { + d.files.Ascend(func(item *filesItem) bool { // don't run slow logic while iterating on btree + fromStep, toStep := item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep + if !dir.FileExist(filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kvi", d.filenameBase, fromStep, toStep))) { + l = append(l, item) + } + return true + }) + return l +} + +// BuildMissedIndices - produce .efi/.vi/.kvi from .ef/.v/.kv +func (d *Domain) BuildMissedIndices() (err error) { + if err := d.History.BuildMissedIndices(); err != nil { + return err + } + for _, item := range d.missedIdxFiles() { + //TODO: build .kvi + _ = item + } + return d.openFiles() +} + func buildIndex(d *compress.Decompressor, idxPath, dir string, count int, values bool) (*recsplit.Index, error) { var rs *recsplit.RecSplit var err error @@ -722,6 +754,8 @@ func buildIndex(d *compress.Decompressor, idxPath, dir string, count int, values return nil, fmt.Errorf("create recsplit: %w", err) } defer rs.Close() + defer d.EnableReadAhead().DisableReadAhead() + word := make([]byte, 0, 256) var keyPos, valPos uint64 g := d.MakeGetter() diff --git a/state/history.go b/state/history.go index c53304f46..e1647e24f 100644 --- a/state/history.go +++ b/state/history.go @@ -30,13 +30,14 @@ import ( "github.com/RoaringBitmap/roaring/roaring64" "github.com/google/btree" - "github.com/ledgerwatch/log/v3" - "golang.org/x/exp/slices" - + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/erigon-lib/compress" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/recsplit" "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32" + "github.com/ledgerwatch/log/v3" + "golang.org/x/exp/slices" ) type History struct { @@ -131,31 +132,29 @@ func (h *History) openFiles() error { invalidFileItems := make([]*filesItem, 0) h.files.Ascend(func(item *filesItem) bool { - datPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.v", h.filenameBase, item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep)) + if item.decompressor != nil { + item.decompressor.Close() + } + fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep + datPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.v", h.filenameBase, fromStep, toStep)) if fi, err := os.Stat(datPath); err != nil || fi.IsDir() { invalidFileItems = append(invalidFileItems, item) return true } if item.decompressor, err = compress.NewDecompressor(datPath); err != nil { + log.Debug("Hisrory.openFiles: %w, %s", err, datPath) return false } - idxPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep)) - - if fi, err := os.Stat(idxPath); err != nil || fi.IsDir() { - invalidFileItems = append(invalidFileItems, item) - return true + if item.index == nil { + idxPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, fromStep, toStep)) + if dir.FileExist(idxPath) { + if item.index, err = recsplit.OpenIndex(idxPath); err != nil { + log.Debug("Hisrory.openFiles: %w, %s", err, idxPath) + return false + } + totalKeys += item.index.KeyCount() + } } - - //if !dir.Exist(idxPath) { - // if _, err = buildIndex(item.decompressor, idxPath, h.dir, item.decompressor.Count()/2, false /* values */); err != nil { - // return false - // } - //} - - if item.index, err = recsplit.OpenIndex(idxPath); err != nil { - return false - } - totalKeys += item.index.KeyCount() return true }) if err != nil { @@ -196,11 +195,154 @@ func (h *History) Files() (res []string) { return res } +func (h *History) missedIdxFiles() (l []*filesItem) { + h.files.Ascend(func(item *filesItem) bool { // don't run slow logic while iterating on btree + fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep + if !dir.FileExist(filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, fromStep, toStep))) { + l = append(l, item) + } + return true + }) + return l +} + +// BuildMissedIndices - produce .efi/.vi/.kvi from .ef/.v/.kv func (h *History) BuildMissedIndices() (err error) { if err := h.InvertedIndex.BuildMissedIndices(); err != nil { return err } - //TODO: build .vi + for _, item := range h.missedIdxFiles() { + search := &filesItem{startTxNum: item.startTxNum, endTxNum: item.endTxNum} + iiItem, ok := h.InvertedIndex.files.Get(search) + if !ok { + return nil + } + fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep + idxPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, fromStep, toStep)) + count, err := iterateForVi(item, iiItem, h.compressVals, func(v []byte) error { return nil }) + if err != nil { + return err + } + if err := buildVi(item, iiItem, idxPath, h.dir, count, false /* values */, h.compressVals); err != nil { + return err + } + } + return h.openFiles() +} + +func iterateForVi(historyItem, iiItem *filesItem, compressVals bool, f func(v []byte) error) (count int, err error) { + var cp CursorHeap + heap.Init(&cp) + g := iiItem.decompressor.MakeGetter() + g.Reset(0) + if g.HasNext() { + g2 := historyItem.decompressor.MakeGetter() + key, _ := g.NextUncompressed() + val, _ := g.NextUncompressed() + heap.Push(&cp, &CursorItem{ + t: FILE_CURSOR, + dg: g, + dg2: g2, + key: key, + val: val, + endTxNum: iiItem.endTxNum, + reverse: false, + }) + } + + // In the loop below, the pair `keyBuf=>valBuf` is always 1 item behind `lastKey=>lastVal`. + // `lastKey` and `lastVal` are taken from the top of the multi-way merge (assisted by the CursorHeap cp), but not processed right away + // instead, the pair from the previous iteration is processed first - `keyBuf=>valBuf`. After that, `keyBuf` and `valBuf` are assigned + // to `lastKey` and `lastVal` correspondingly, and the next step of multi-way merge happens. Therefore, after the multi-way merge loop + // (when CursorHeap cp is empty), there is a need to process the last pair `keyBuf=>valBuf`, because it was one step behind + var valBuf []byte + for cp.Len() > 0 { + lastKey := common.Copy(cp[0].key) + // Advance all the items that have this key (including the top) + //var mergeOnce bool + for cp.Len() > 0 && bytes.Equal(cp[0].key, lastKey) { + ci1 := cp[0] + ef, _ := eliasfano32.ReadEliasFano(ci1.val) + for i := uint64(0); i < ef.Count(); i++ { + if compressVals { + valBuf, _ = ci1.dg2.Next(valBuf[:0]) + } else { + valBuf, _ = ci1.dg2.NextUncompressed() + } + if err = f(valBuf); err != nil { + return count, err + } + } + count += int(ef.Count()) + if ci1.dg.HasNext() { + ci1.key, _ = ci1.dg.NextUncompressed() + ci1.val, _ = ci1.dg.NextUncompressed() + heap.Fix(&cp, 0) + } else { + heap.Remove(&cp, 0) + } + } + } + return count, nil +} + +func buildVi(historyItem, iiItem *filesItem, historyIdxPath, dir string, count int, values, compressVals bool) error { + rs, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ + KeyCount: count, + Enums: false, + BucketSize: 2000, + LeafSize: 8, + TmpDir: dir, + IndexFile: historyIdxPath, + }) + if err != nil { + return fmt.Errorf("create recsplit: %w", err) + } + defer rs.Close() + var historyKey []byte + var txKey [8]byte + var valOffset uint64 + + defer iiItem.decompressor.EnableReadAhead().DisableReadAhead() + defer historyItem.decompressor.EnableReadAhead().DisableReadAhead() + + g := iiItem.decompressor.MakeGetter() + g2 := historyItem.decompressor.MakeGetter() + var keyBuf, valBuf []byte + for { + g.Reset(0) + g2.Reset(0) + valOffset = 0 + for g.HasNext() { + keyBuf, _ = g.NextUncompressed() + valBuf, _ = g.NextUncompressed() + ef, _ := eliasfano32.ReadEliasFano(valBuf) + efIt := ef.Iterator() + for efIt.HasNext() { + txNum := efIt.Next() + binary.BigEndian.PutUint64(txKey[:], txNum) + historyKey = append(append(historyKey[:0], txKey[:]...), keyBuf...) + if err = rs.AddKey(historyKey, valOffset); err != nil { + return err + } + if compressVals { + valOffset = g2.Skip() + } else { + valOffset = g2.SkipUncompressed() + } + } + } + if err = rs.Build(); err != nil { + if rs.Collision() { + log.Info("Building recsplit. Collision happened. It's ok. Restarting...") + rs.ResetNextSalt() + } else { + return fmt.Errorf("build %s idx: %w", historyIdxPath, err) + } + } else { + break + } + } return nil } @@ -611,6 +753,9 @@ func (h *History) MakeContext() *HistoryContext { var hc = HistoryContext{h: h} hc.indexFiles = btree.NewG[ctxItem](32, ctxItemLess) h.InvertedIndex.files.Ascend(func(item *filesItem) bool { + if item.index == nil { + return false + } hc.indexFiles.ReplaceOrInsert(ctxItem{ startTxNum: item.startTxNum, endTxNum: item.endTxNum, @@ -621,6 +766,9 @@ func (h *History) MakeContext() *HistoryContext { }) hc.historyFiles = btree.NewG[ctxItem](32, ctxItemLess) h.files.Ascend(func(item *filesItem) bool { + if item.index == nil { + return false + } hc.historyFiles.ReplaceOrInsert(ctxItem{ startTxNum: item.startTxNum, endTxNum: item.endTxNum, diff --git a/state/inverted_index.go b/state/inverted_index.go index b52b63b83..5fc4ef3b4 100644 --- a/state/inverted_index.go +++ b/state/inverted_index.go @@ -27,7 +27,6 @@ import ( "path/filepath" "regexp" "strconv" - "strings" "github.com/RoaringBitmap/roaring/roaring64" "github.com/google/btree" @@ -127,36 +126,22 @@ func (ii *InvertedIndex) scanStateFiles(files []fs.DirEntry) { } } -func (ii *InvertedIndex) BuildMissedIndices() (err error) { - var missedIndices []uint64 +func (ii *InvertedIndex) missedIdxFiles() (l []*filesItem) { ii.files.Ascend(func(item *filesItem) bool { // don't run slow logic while iterating on btree fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep - idxPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep)) - if !dir.Exist(idxPath) { - missedIndices = append(missedIndices, fromStep, toStep) + if !dir.FileExist(filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep))) { + l = append(l, item) } return true }) - if len(missedIndices) == 0 { - return nil - } - var logItems []string - for i := 0; i < len(missedIndices); i += 2 { - fromStep, toStep := missedIndices[i], missedIndices[i+1] - logItems = append(logItems, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep)) - } - log.Info("[snapshots] BuildMissedIndices", "files", strings.Join(logItems, ",")) + return l +} - for i := 0; i < len(missedIndices); i += 2 { - fromStep, toStep := missedIndices[i], missedIndices[i+1] +// BuildMissedIndices - produce .efi/.vi/.kvi from .ef/.v/.kv +func (ii *InvertedIndex) BuildMissedIndices() (err error) { + for _, item := range ii.missedIdxFiles() { + fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep idxPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep)) - if dir.Exist(idxPath) { - return nil - } - item, ok := ii.files.Get(&filesItem{startTxNum: fromStep * ii.aggregationStep, endTxNum: toStep * ii.aggregationStep}) - if !ok { - return nil - } if _, err := buildIndex(item.decompressor, idxPath, ii.dir, item.decompressor.Count()/2, false /* values */); err != nil { return err } @@ -167,28 +152,36 @@ func (ii *InvertedIndex) BuildMissedIndices() (err error) { func (ii *InvertedIndex) openFiles() error { var err error var totalKeys uint64 + var invalidFileItems []*filesItem ii.files.Ascend(func(item *filesItem) bool { - fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep - if item.decompressor == nil { - datPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.ef", ii.filenameBase, fromStep, toStep)) - if item.decompressor, err = compress.NewDecompressor(datPath); err != nil { - log.Debug("InvertedIndex.openFiles: %w, %s", err, datPath) - return false - } + if item.decompressor != nil { + item.decompressor.Close() } + fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep + datPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.ef", ii.filenameBase, fromStep, toStep)) + if !dir.FileExist(datPath) { + invalidFileItems = append(invalidFileItems, item) + } + if item.decompressor, err = compress.NewDecompressor(datPath); err != nil { + log.Debug("InvertedIndex.openFiles: %w, %s", err, datPath) + return false + } + if item.index == nil { idxPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep)) - if !dir.Exist(idxPath) { - return false - } - if item.index, err = recsplit.OpenIndex(idxPath); err != nil { - log.Debug("InvertedIndex.openFiles: %w, %s", err, idxPath) - return false + if dir.FileExist(idxPath) { + if item.index, err = recsplit.OpenIndex(idxPath); err != nil { + log.Debug("InvertedIndex.openFiles: %w, %s", err, idxPath) + return false + } + totalKeys += item.index.KeyCount() } } - totalKeys += item.index.KeyCount() return true }) + for _, item := range invalidFileItems { + ii.files.Delete(item) + } if err != nil { return err } @@ -249,6 +242,10 @@ func (ii *InvertedIndex) MakeContext() *InvertedIndexContext { var ic = InvertedIndexContext{ii: ii} ic.files = btree.NewG[ctxItem](32, ctxItemLess) ii.files.Ascend(func(item *filesItem) bool { + if item.index == nil { + return false + } + ic.files.ReplaceOrInsert(ctxItem{ startTxNum: item.startTxNum, endTxNum: item.endTxNum,