mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-27 22:28:21 +00:00
e3: background merge (#718)
This commit is contained in:
parent
fe5dd317e5
commit
689fc4effd
@ -573,7 +573,7 @@ func (mf MergedFiles) Close() {
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles, r Ranges, maxSpan uint64) (MergedFiles, error) {
|
||||
func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles, r Ranges, workers int) (MergedFiles, error) {
|
||||
defer func(t time.Time) { log.Info("[snapshots] merge", "took", time.Since(t)) }(time.Now())
|
||||
var mf MergedFiles
|
||||
closeFiles := true
|
||||
@ -589,7 +589,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles,
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.accounts.any() {
|
||||
if mf.accounts, mf.accountsIdx, mf.accountsHist, err = a.accounts.mergeFiles(ctx, files.accounts, files.accountsIdx, files.accountsHist, r.accounts, maxSpan); err != nil {
|
||||
if mf.accounts, mf.accountsIdx, mf.accountsHist, err = a.accounts.mergeFiles(ctx, files.accounts, files.accountsIdx, files.accountsHist, r.accounts, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
@ -598,7 +598,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles,
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.storage.any() {
|
||||
if mf.storage, mf.storageIdx, mf.storageHist, err = a.storage.mergeFiles(ctx, files.storage, files.storageIdx, files.storageHist, r.storage, maxSpan); err != nil {
|
||||
if mf.storage, mf.storageIdx, mf.storageHist, err = a.storage.mergeFiles(ctx, files.storage, files.storageIdx, files.storageHist, r.storage, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
@ -607,7 +607,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles,
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.code.any() {
|
||||
if mf.code, mf.codeIdx, mf.codeHist, err = a.code.mergeFiles(ctx, files.code, files.codeIdx, files.codeHist, r.code, maxSpan); err != nil {
|
||||
if mf.code, mf.codeIdx, mf.codeHist, err = a.code.mergeFiles(ctx, files.code, files.codeIdx, files.codeHist, r.code, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
@ -616,7 +616,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles,
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.commitment.any() {
|
||||
if mf.commitment, mf.commitmentIdx, mf.commitmentHist, err = a.commitment.mergeFiles(ctx, files.commitment, files.commitmentIdx, files.commitmentHist, r.commitment, maxSpan); err != nil {
|
||||
if mf.commitment, mf.commitmentIdx, mf.commitmentHist, err = a.commitment.mergeFiles(ctx, files.commitment, files.commitmentIdx, files.commitmentHist, r.commitment, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
@ -625,7 +625,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles,
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.logAddrs {
|
||||
if mf.logAddrs, err = a.logAddrs.mergeFiles(ctx, files.logAddrs, r.logAddrsStartTxNum, r.logAddrsEndTxNum, maxSpan); err != nil {
|
||||
if mf.logAddrs, err = a.logAddrs.mergeFiles(ctx, files.logAddrs, r.logAddrsStartTxNum, r.logAddrsEndTxNum, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
@ -634,7 +634,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles,
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.logTopics {
|
||||
if mf.logTopics, err = a.logTopics.mergeFiles(ctx, files.logTopics, r.logTopicsStartTxNum, r.logTopicsEndTxNum, maxSpan); err != nil {
|
||||
if mf.logTopics, err = a.logTopics.mergeFiles(ctx, files.logTopics, r.logTopicsStartTxNum, r.logTopicsEndTxNum, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
@ -643,7 +643,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles,
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.tracesFrom {
|
||||
if mf.tracesFrom, err = a.tracesFrom.mergeFiles(ctx, files.tracesFrom, r.tracesFromStartTxNum, r.tracesFromEndTxNum, maxSpan); err != nil {
|
||||
if mf.tracesFrom, err = a.tracesFrom.mergeFiles(ctx, files.tracesFrom, r.tracesFromStartTxNum, r.tracesFromEndTxNum, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
@ -652,7 +652,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles,
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.tracesTo {
|
||||
if mf.tracesTo, err = a.tracesTo.mergeFiles(ctx, files.tracesTo, r.tracesToStartTxNum, r.tracesToEndTxNum, maxSpan); err != nil {
|
||||
if mf.tracesTo, err = a.tracesTo.mergeFiles(ctx, files.tracesTo, r.tracesToStartTxNum, r.tracesToEndTxNum, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
@ -1161,7 +1161,7 @@ func (a *Aggregator) FinishTx() error {
|
||||
outs.Close()
|
||||
}
|
||||
}()
|
||||
in, err := a.mergeFiles(context.TODO(), outs, r, maxSpan)
|
||||
in, err := a.mergeFiles(context.TODO(), outs, r, 1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -56,6 +56,7 @@ type Aggregator22 struct {
|
||||
keepInDB uint64
|
||||
maxTxNum atomic.Uint64
|
||||
working atomic.Bool
|
||||
workingMerge atomic.Bool
|
||||
warmupWorking atomic.Bool
|
||||
}
|
||||
|
||||
@ -430,7 +431,7 @@ func (a *Aggregator22) buildFilesInBackground(ctx context.Context, step uint64,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Aggregator22) mergeLoopStep(ctx context.Context) (somethingDone bool, err error) {
|
||||
func (a *Aggregator22) mergeLoopStep(ctx context.Context, workers int) (somethingDone bool, err error) {
|
||||
closeAll := true
|
||||
maxSpan := uint64(32) * a.aggregationStep
|
||||
r := a.findMergeRange(a.maxTxNum.Load(), maxSpan)
|
||||
@ -444,7 +445,7 @@ func (a *Aggregator22) mergeLoopStep(ctx context.Context) (somethingDone bool, e
|
||||
outs.Close()
|
||||
}
|
||||
}()
|
||||
in, err := a.mergeFiles(ctx, outs, r, maxSpan)
|
||||
in, err := a.mergeFiles(ctx, outs, r, maxSpan, workers)
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
@ -460,9 +461,9 @@ func (a *Aggregator22) mergeLoopStep(ctx context.Context) (somethingDone bool, e
|
||||
closeAll = false
|
||||
return true, nil
|
||||
}
|
||||
func (a *Aggregator22) MergeLoop(ctx context.Context) error {
|
||||
func (a *Aggregator22) MergeLoop(ctx context.Context, workers int) error {
|
||||
for {
|
||||
somethingMerged, err := a.mergeLoopStep(ctx)
|
||||
somethingMerged, err := a.mergeLoopStep(ctx, workers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -606,17 +607,21 @@ func (a *Aggregator22) Flush(tx kv.RwTx) error {
|
||||
func (a *Aggregator22) CanPrune(tx kv.Tx) bool { return a.CanPruneFrom(tx) < a.maxTxNum.Load() }
|
||||
func (a *Aggregator22) CanPruneFrom(tx kv.Tx) uint64 {
|
||||
fst, _ := kv.FirstKey(tx, kv.TracesToKeys)
|
||||
if len(fst) > 0 {
|
||||
fst2, _ := kv.FirstKey(tx, kv.StorageHistoryKeys)
|
||||
if len(fst) > 0 && len(fst2) > 0 {
|
||||
fstInDb := binary.BigEndian.Uint64(fst)
|
||||
if fstInDb < a.maxTxNum.Load() {
|
||||
return fstInDb
|
||||
}
|
||||
fstInDb2 := binary.BigEndian.Uint64(fst2)
|
||||
return cmp.Min(fstInDb, fstInDb2)
|
||||
}
|
||||
return math2.MaxUint64
|
||||
}
|
||||
func (a *Aggregator22) Prune(ctx context.Context, limit uint64) error {
|
||||
a.Warmup(0, cmp.Max(a.aggregationStep, limit)) // warmup is asyn and moving faster than data deletion
|
||||
defer func(t time.Time) { log.Debug(fmt.Sprintf("prune took: %s\n", time.Since(t))) }(time.Now())
|
||||
//defer func(t time.Time) {
|
||||
// if time.Since(t) > 150*time.Millisecond {
|
||||
// log.Debug(fmt.Sprintf("prune took: %s\n", time.Since(t)))
|
||||
// }
|
||||
//}(time.Now())
|
||||
return a.prune(ctx, 0, a.maxTxNum.Load(), limit)
|
||||
}
|
||||
|
||||
@ -829,7 +834,7 @@ func (mf MergedFiles22) Close() {
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Aggregator22) mergeFiles(ctx context.Context, files SelectedStaticFiles22, r Ranges22, maxSpan uint64) (MergedFiles22, error) {
|
||||
func (a *Aggregator22) mergeFiles(ctx context.Context, files SelectedStaticFiles22, r Ranges22, maxSpan uint64, workers int) (MergedFiles22, error) {
|
||||
var mf MergedFiles22
|
||||
closeFiles := true
|
||||
defer func() {
|
||||
@ -837,76 +842,76 @@ func (a *Aggregator22) mergeFiles(ctx context.Context, files SelectedStaticFiles
|
||||
mf.Close()
|
||||
}
|
||||
}()
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(7)
|
||||
//var wg sync.WaitGroup
|
||||
//wg.Add(7)
|
||||
errCh := make(chan error, 7)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.accounts.any() {
|
||||
if mf.accountsIdx, mf.accountsHist, err = a.accounts.mergeFiles(ctx, files.accountsIdx, files.accountsHist, r.accounts, maxSpan); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
//go func() {
|
||||
// defer wg.Done()
|
||||
var err error
|
||||
if r.accounts.any() {
|
||||
if mf.accountsIdx, mf.accountsHist, err = a.accounts.mergeFiles(ctx, files.accountsIdx, files.accountsHist, r.accounts, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.storage.any() {
|
||||
if mf.storageIdx, mf.storageHist, err = a.storage.mergeFiles(ctx, files.storageIdx, files.storageHist, r.storage, maxSpan); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
//}()
|
||||
//go func() {
|
||||
// defer wg.Done()
|
||||
// var err error
|
||||
if r.storage.any() {
|
||||
if mf.storageIdx, mf.storageHist, err = a.storage.mergeFiles(ctx, files.storageIdx, files.storageHist, r.storage, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.code.any() {
|
||||
if mf.codeIdx, mf.codeHist, err = a.code.mergeFiles(ctx, files.codeIdx, files.codeHist, r.code, maxSpan); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
//}()
|
||||
//go func() {
|
||||
// defer wg.Done()
|
||||
// var err error
|
||||
if r.code.any() {
|
||||
if mf.codeIdx, mf.codeHist, err = a.code.mergeFiles(ctx, files.codeIdx, files.codeHist, r.code, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.logAddrs {
|
||||
if mf.logAddrs, err = a.logAddrs.mergeFiles(ctx, files.logAddrs, r.logAddrsStartTxNum, r.logAddrsEndTxNum, maxSpan); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
//}()
|
||||
//go func() {
|
||||
// defer wg.Done()
|
||||
// var err error
|
||||
if r.logAddrs {
|
||||
if mf.logAddrs, err = a.logAddrs.mergeFiles(ctx, files.logAddrs, r.logAddrsStartTxNum, r.logAddrsEndTxNum, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.logTopics {
|
||||
if mf.logTopics, err = a.logTopics.mergeFiles(ctx, files.logTopics, r.logTopicsStartTxNum, r.logTopicsEndTxNum, maxSpan); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
//}()
|
||||
//go func() {
|
||||
// defer wg.Done()
|
||||
// var err error
|
||||
if r.logTopics {
|
||||
if mf.logTopics, err = a.logTopics.mergeFiles(ctx, files.logTopics, r.logTopicsStartTxNum, r.logTopicsEndTxNum, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.tracesFrom {
|
||||
if mf.tracesFrom, err = a.tracesFrom.mergeFiles(ctx, files.tracesFrom, r.tracesFromStartTxNum, r.tracesFromEndTxNum, maxSpan); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
//}()
|
||||
//go func() {
|
||||
// defer wg.Done()
|
||||
// var err error
|
||||
if r.tracesFrom {
|
||||
if mf.tracesFrom, err = a.tracesFrom.mergeFiles(ctx, files.tracesFrom, r.tracesFromStartTxNum, r.tracesFromEndTxNum, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.tracesTo {
|
||||
if mf.tracesTo, err = a.tracesTo.mergeFiles(ctx, files.tracesTo, r.tracesToStartTxNum, r.tracesToEndTxNum, maxSpan); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
//}()
|
||||
//go func() {
|
||||
// defer wg.Done()
|
||||
// var err error
|
||||
if r.tracesTo {
|
||||
if mf.tracesTo, err = a.tracesTo.mergeFiles(ctx, files.tracesTo, r.tracesToStartTxNum, r.tracesToEndTxNum, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
}()
|
||||
}
|
||||
//}()
|
||||
//go func() {
|
||||
// wg.Wait()
|
||||
close(errCh)
|
||||
//}()
|
||||
var lastError error
|
||||
for err := range errCh {
|
||||
lastError = err
|
||||
@ -960,6 +965,7 @@ func (a *Aggregator22) BuildFilesInBackground(db kv.RoDB) error {
|
||||
if (a.txNum.Load() + 1) <= a.maxTxNum.Load()+a.aggregationStep+a.keepInDB { // Leave one step worth in the DB
|
||||
return nil
|
||||
}
|
||||
|
||||
step := a.maxTxNum.Load() / a.aggregationStep
|
||||
if a.working.Load() {
|
||||
return nil
|
||||
@ -988,12 +994,17 @@ func (a *Aggregator22) BuildFilesInBackground(db kv.RoDB) error {
|
||||
}
|
||||
step++
|
||||
}
|
||||
// trying to create as much small-step-files as possible:
|
||||
// - this is reason why run only 1 merge round at a time
|
||||
// - to remove old data from db as early as possible
|
||||
if _, err := a.mergeLoopStep(context.Background()); err != nil {
|
||||
log.Warn("merge", "err", err)
|
||||
|
||||
if a.workingMerge.Load() {
|
||||
return
|
||||
}
|
||||
defer a.workingMerge.Store(true)
|
||||
go func() {
|
||||
defer a.workingMerge.Store(false)
|
||||
if err := a.MergeLoop(context.Background(), 1); err != nil {
|
||||
log.Warn("merge", "err", err)
|
||||
}
|
||||
}()
|
||||
}()
|
||||
|
||||
//if err := a.prune(0, a.maxTxNum.Load(), a.aggregationStep); err != nil {
|
||||
@ -1254,8 +1265,7 @@ func lastIdInDB(db kv.RoDB, table string) (lstInDb uint64) {
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
_ = err
|
||||
//return err
|
||||
log.Warn("lastIdInDB", "err", err)
|
||||
}
|
||||
return lstInDb
|
||||
}
|
||||
|
@ -510,7 +510,7 @@ func collateAndMerge(t *testing.T, db kv.RwDB, d *Domain, txs uint64) {
|
||||
maxSpan := uint64(16 * 16)
|
||||
for r = d.findMergeRange(maxEndTxNum, maxSpan); r.any(); r = d.findMergeRange(maxEndTxNum, maxSpan) {
|
||||
valuesOuts, indexOuts, historyOuts, _ := d.staticFilesInRange(r)
|
||||
valuesIn, indexIn, historyIn, err := d.mergeFiles(ctx, valuesOuts, indexOuts, historyOuts, r, maxSpan)
|
||||
valuesIn, indexIn, historyIn, err := d.mergeFiles(ctx, valuesOuts, indexOuts, historyOuts, r, 1)
|
||||
require.NoError(t, err)
|
||||
d.integrateMergedFiles(valuesOuts, indexOuts, historyOuts, valuesIn, indexIn, historyIn)
|
||||
err = d.deleteFiles(valuesOuts, indexOuts, historyOuts)
|
||||
@ -542,7 +542,7 @@ func collateAndMergeOnce(t *testing.T, d *Domain, step uint64) {
|
||||
maxSpan := d.aggregationStep * d.aggregationStep
|
||||
for r = d.findMergeRange(maxEndTxNum, maxSpan); r.any(); r = d.findMergeRange(maxEndTxNum, maxSpan) {
|
||||
valuesOuts, indexOuts, historyOuts, _ := d.staticFilesInRange(r)
|
||||
valuesIn, indexIn, historyIn, err := d.mergeFiles(ctx, valuesOuts, indexOuts, historyOuts, r, maxSpan)
|
||||
valuesIn, indexIn, historyIn, err := d.mergeFiles(ctx, valuesOuts, indexOuts, historyOuts, r, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
d.integrateMergedFiles(valuesOuts, indexOuts, historyOuts, valuesIn, indexIn, historyIn)
|
||||
|
@ -389,7 +389,7 @@ func collateAndMergeHistory(tb testing.TB, db kv.RwDB, h *History, txs uint64) {
|
||||
maxSpan := uint64(16 * 16)
|
||||
for r = h.findMergeRange(maxEndTxNum, maxSpan); r.any(); r = h.findMergeRange(maxEndTxNum, maxSpan) {
|
||||
indexOuts, historyOuts, _ := h.staticFilesInRange(r)
|
||||
indexIn, historyIn, err := h.mergeFiles(ctx, indexOuts, historyOuts, r, maxSpan)
|
||||
indexIn, historyIn, err := h.mergeFiles(ctx, indexOuts, historyOuts, r, 1)
|
||||
require.NoError(tb, err)
|
||||
h.integrateMergedFiles(indexOuts, historyOuts, indexIn, historyIn)
|
||||
err = h.deleteFiles(indexOuts, historyOuts)
|
||||
|
@ -319,7 +319,7 @@ func mergeInverted(t *testing.T, db kv.RwDB, ii *InvertedIndex, txs uint64) {
|
||||
maxSpan := uint64(16 * 16)
|
||||
for found, startTxNum, endTxNum = ii.findMergeRange(maxEndTxNum, maxSpan); found; found, startTxNum, endTxNum = ii.findMergeRange(maxEndTxNum, maxSpan) {
|
||||
outs, _ := ii.staticFilesInRange(startTxNum, endTxNum)
|
||||
in, err := ii.mergeFiles(ctx, outs, startTxNum, endTxNum, maxSpan)
|
||||
in, err := ii.mergeFiles(ctx, outs, startTxNum, endTxNum, 1)
|
||||
require.NoError(t, err)
|
||||
ii.integrateMergedFiles(outs, in)
|
||||
err = ii.deleteFiles(outs)
|
||||
|
@ -302,7 +302,7 @@ func mergeEfs(preval, val, buf []byte) ([]byte, error) {
|
||||
return newEf.AppendBytes(buf), nil
|
||||
}
|
||||
|
||||
func (d *Domain) mergeFiles(ctx context.Context, valuesFiles, indexFiles, historyFiles []*filesItem, r DomainRanges, maxSpan uint64) (valuesIn, indexIn, historyIn *filesItem, err error) {
|
||||
func (d *Domain) mergeFiles(ctx context.Context, valuesFiles, indexFiles, historyFiles []*filesItem, r DomainRanges, workers int) (valuesIn, indexIn, historyIn *filesItem, err error) {
|
||||
if !r.any() {
|
||||
return
|
||||
}
|
||||
@ -350,7 +350,7 @@ func (d *Domain) mergeFiles(ctx context.Context, valuesFiles, indexFiles, histor
|
||||
history: r.history,
|
||||
indexStartTxNum: r.indexStartTxNum,
|
||||
indexEndTxNum: r.indexEndTxNum,
|
||||
index: r.index}, maxSpan); err != nil {
|
||||
index: r.index}, workers); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
if r.values {
|
||||
@ -360,7 +360,7 @@ func (d *Domain) mergeFiles(ctx context.Context, valuesFiles, indexFiles, histor
|
||||
}
|
||||
|
||||
datPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kv", d.filenameBase, r.valuesStartTxNum/d.aggregationStep, r.valuesEndTxNum/d.aggregationStep))
|
||||
if comp, err = compress.NewCompressor(context.Background(), "merge", datPath, d.tmpdir, compress.MinPatternScore, d.workers, log.LvlDebug); err != nil {
|
||||
if comp, err = compress.NewCompressor(context.Background(), "merge", datPath, d.tmpdir, compress.MinPatternScore, workers, log.LvlDebug); err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("merge %s history compressor: %w", d.filenameBase, err)
|
||||
}
|
||||
var cp CursorHeap
|
||||
@ -484,7 +484,7 @@ func (d *Domain) mergeFiles(ctx context.Context, valuesFiles, indexFiles, histor
|
||||
// d.valueMergeFn = merge
|
||||
//}
|
||||
|
||||
func (ii *InvertedIndex) mergeFiles(ctx context.Context, files []*filesItem, startTxNum, endTxNum uint64, maxSpan uint64) (*filesItem, error) {
|
||||
func (ii *InvertedIndex) mergeFiles(ctx context.Context, files []*filesItem, startTxNum, endTxNum uint64, workers int) (*filesItem, error) {
|
||||
for _, h := range files {
|
||||
defer h.decompressor.EnableMadvNormal().DisableReadAhead()
|
||||
}
|
||||
@ -515,7 +515,7 @@ func (ii *InvertedIndex) mergeFiles(ctx context.Context, files []*filesItem, sta
|
||||
}
|
||||
}()
|
||||
datPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.ef", ii.filenameBase, startTxNum/ii.aggregationStep, endTxNum/ii.aggregationStep))
|
||||
if comp, err = compress.NewCompressor(ctx, "Snapshots merge", datPath, ii.tmpdir, compress.MinPatternScore, ii.workers, log.LvlDebug); err != nil {
|
||||
if comp, err = compress.NewCompressor(ctx, "Snapshots merge", datPath, ii.tmpdir, compress.MinPatternScore, workers, log.LvlDebug); err != nil {
|
||||
return nil, fmt.Errorf("merge %s inverted index compressor: %w", ii.filenameBase, err)
|
||||
}
|
||||
var cp CursorHeap
|
||||
@ -608,7 +608,7 @@ func (ii *InvertedIndex) mergeFiles(ctx context.Context, files []*filesItem, sta
|
||||
return outItem, nil
|
||||
}
|
||||
|
||||
func (h *History) mergeFiles(ctx context.Context, indexFiles, historyFiles []*filesItem, r HistoryRanges, maxSpan uint64) (indexIn, historyIn *filesItem, err error) {
|
||||
func (h *History) mergeFiles(ctx context.Context, indexFiles, historyFiles []*filesItem, r HistoryRanges, workers int) (indexIn, historyIn *filesItem, err error) {
|
||||
if !r.any() {
|
||||
return nil, nil, nil
|
||||
}
|
||||
@ -621,7 +621,7 @@ func (h *History) mergeFiles(ctx context.Context, indexFiles, historyFiles []*fi
|
||||
}
|
||||
}
|
||||
}()
|
||||
if indexIn, err = h.InvertedIndex.mergeFiles(ctx, indexFiles, r.indexStartTxNum, r.indexEndTxNum, maxSpan); err != nil {
|
||||
if indexIn, err = h.InvertedIndex.mergeFiles(ctx, indexFiles, r.indexStartTxNum, r.indexEndTxNum, workers); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if r.history {
|
||||
@ -664,7 +664,7 @@ func (h *History) mergeFiles(ctx context.Context, indexFiles, historyFiles []*fi
|
||||
}()
|
||||
datPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.v", h.filenameBase, r.historyStartTxNum/h.aggregationStep, r.historyEndTxNum/h.aggregationStep))
|
||||
idxPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, r.historyStartTxNum/h.aggregationStep, r.historyEndTxNum/h.aggregationStep))
|
||||
if comp, err = compress.NewCompressor(context.Background(), "merge", datPath, h.tmpdir, compress.MinPatternScore, h.workers, log.LvlDebug); err != nil {
|
||||
if comp, err = compress.NewCompressor(context.Background(), "merge", datPath, h.tmpdir, compress.MinPatternScore, workers, log.LvlDebug); err != nil {
|
||||
return nil, nil, fmt.Errorf("merge %s history compressor: %w", h.filenameBase, err)
|
||||
}
|
||||
var cp CursorHeap
|
||||
|
@ -298,7 +298,7 @@ func (mf RMergedFiles) Close() {
|
||||
}
|
||||
}
|
||||
|
||||
func (ri *ReadIndices) mergeFiles(ctx context.Context, files RSelectedStaticFiles, r RRanges, maxSpan uint64) (RMergedFiles, error) {
|
||||
func (ri *ReadIndices) mergeFiles(ctx context.Context, files RSelectedStaticFiles, r RRanges, workers int) (RMergedFiles, error) {
|
||||
var mf RMergedFiles
|
||||
closeFiles := true
|
||||
defer func() {
|
||||
@ -313,7 +313,7 @@ func (ri *ReadIndices) mergeFiles(ctx context.Context, files RSelectedStaticFile
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.accounts {
|
||||
if mf.accounts, err = ri.accounts.mergeFiles(ctx, files.accounts, r.accountsStartTxNum, r.accountsEndTxNum, maxSpan); err != nil {
|
||||
if mf.accounts, err = ri.accounts.mergeFiles(ctx, files.accounts, r.accountsStartTxNum, r.accountsEndTxNum, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
@ -322,7 +322,7 @@ func (ri *ReadIndices) mergeFiles(ctx context.Context, files RSelectedStaticFile
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.storage {
|
||||
if mf.storage, err = ri.storage.mergeFiles(ctx, files.storage, r.storageStartTxNum, r.storageEndTxNum, maxSpan); err != nil {
|
||||
if mf.storage, err = ri.storage.mergeFiles(ctx, files.storage, r.storageStartTxNum, r.storageEndTxNum, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
@ -331,7 +331,7 @@ func (ri *ReadIndices) mergeFiles(ctx context.Context, files RSelectedStaticFile
|
||||
defer wg.Done()
|
||||
var err error
|
||||
if r.code {
|
||||
if mf.code, err = ri.code.mergeFiles(ctx, files.code, r.codeStartTxNum, r.codeEndTxNum, maxSpan); err != nil {
|
||||
if mf.code, err = ri.code.mergeFiles(ctx, files.code, r.codeStartTxNum, r.codeEndTxNum, workers); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}
|
||||
@ -429,7 +429,7 @@ func (ri *ReadIndices) FinishTx() error {
|
||||
outs.Close()
|
||||
}
|
||||
}()
|
||||
in, err := ri.mergeFiles(context.Background(), outs, r, maxSpan)
|
||||
in, err := ri.mergeFiles(context.Background(), outs, r, 1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user