From 689fc4effd79e2944643891ea76e7411fd697977 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Tue, 1 Nov 2022 09:55:34 +0700 Subject: [PATCH] e3: background merge (#718) --- state/aggregator.go | 20 ++--- state/aggregator22.go | 168 +++++++++++++++++++---------------- state/domain_test.go | 4 +- state/history_test.go | 2 +- state/inverted_index_test.go | 2 +- state/merge.go | 16 ++-- state/read_indices.go | 10 +-- 7 files changed, 116 insertions(+), 106 deletions(-) diff --git a/state/aggregator.go b/state/aggregator.go index 7a627a7e1..64788d21c 100644 --- a/state/aggregator.go +++ b/state/aggregator.go @@ -573,7 +573,7 @@ func (mf MergedFiles) Close() { } } -func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles, r Ranges, maxSpan uint64) (MergedFiles, error) { +func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles, r Ranges, workers int) (MergedFiles, error) { defer func(t time.Time) { log.Info("[snapshots] merge", "took", time.Since(t)) }(time.Now()) var mf MergedFiles closeFiles := true @@ -589,7 +589,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles, defer wg.Done() var err error if r.accounts.any() { - if mf.accounts, mf.accountsIdx, mf.accountsHist, err = a.accounts.mergeFiles(ctx, files.accounts, files.accountsIdx, files.accountsHist, r.accounts, maxSpan); err != nil { + if mf.accounts, mf.accountsIdx, mf.accountsHist, err = a.accounts.mergeFiles(ctx, files.accounts, files.accountsIdx, files.accountsHist, r.accounts, workers); err != nil { errCh <- err } } @@ -598,7 +598,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles, defer wg.Done() var err error if r.storage.any() { - if mf.storage, mf.storageIdx, mf.storageHist, err = a.storage.mergeFiles(ctx, files.storage, files.storageIdx, files.storageHist, r.storage, maxSpan); err != nil { + if mf.storage, mf.storageIdx, mf.storageHist, err = a.storage.mergeFiles(ctx, files.storage, files.storageIdx, files.storageHist, r.storage, workers); err != nil { errCh <- err } } @@ -607,7 +607,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles, defer wg.Done() var err error if r.code.any() { - if mf.code, mf.codeIdx, mf.codeHist, err = a.code.mergeFiles(ctx, files.code, files.codeIdx, files.codeHist, r.code, maxSpan); err != nil { + if mf.code, mf.codeIdx, mf.codeHist, err = a.code.mergeFiles(ctx, files.code, files.codeIdx, files.codeHist, r.code, workers); err != nil { errCh <- err } } @@ -616,7 +616,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles, defer wg.Done() var err error if r.commitment.any() { - if mf.commitment, mf.commitmentIdx, mf.commitmentHist, err = a.commitment.mergeFiles(ctx, files.commitment, files.commitmentIdx, files.commitmentHist, r.commitment, maxSpan); err != nil { + if mf.commitment, mf.commitmentIdx, mf.commitmentHist, err = a.commitment.mergeFiles(ctx, files.commitment, files.commitmentIdx, files.commitmentHist, r.commitment, workers); err != nil { errCh <- err } } @@ -625,7 +625,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles, defer wg.Done() var err error if r.logAddrs { - if mf.logAddrs, err = a.logAddrs.mergeFiles(ctx, files.logAddrs, r.logAddrsStartTxNum, r.logAddrsEndTxNum, maxSpan); err != nil { + if mf.logAddrs, err = a.logAddrs.mergeFiles(ctx, files.logAddrs, r.logAddrsStartTxNum, r.logAddrsEndTxNum, workers); err != nil { errCh <- err } } @@ -634,7 +634,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles, defer wg.Done() var err error if r.logTopics { - if mf.logTopics, err = a.logTopics.mergeFiles(ctx, files.logTopics, r.logTopicsStartTxNum, r.logTopicsEndTxNum, maxSpan); err != nil { + if mf.logTopics, err = a.logTopics.mergeFiles(ctx, files.logTopics, r.logTopicsStartTxNum, r.logTopicsEndTxNum, workers); err != nil { errCh <- err } } @@ -643,7 +643,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles, defer wg.Done() var err error if r.tracesFrom { - if mf.tracesFrom, err = a.tracesFrom.mergeFiles(ctx, files.tracesFrom, r.tracesFromStartTxNum, r.tracesFromEndTxNum, maxSpan); err != nil { + if mf.tracesFrom, err = a.tracesFrom.mergeFiles(ctx, files.tracesFrom, r.tracesFromStartTxNum, r.tracesFromEndTxNum, workers); err != nil { errCh <- err } } @@ -652,7 +652,7 @@ func (a *Aggregator) mergeFiles(ctx context.Context, files SelectedStaticFiles, defer wg.Done() var err error if r.tracesTo { - if mf.tracesTo, err = a.tracesTo.mergeFiles(ctx, files.tracesTo, r.tracesToStartTxNum, r.tracesToEndTxNum, maxSpan); err != nil { + if mf.tracesTo, err = a.tracesTo.mergeFiles(ctx, files.tracesTo, r.tracesToStartTxNum, r.tracesToEndTxNum, workers); err != nil { errCh <- err } } @@ -1161,7 +1161,7 @@ func (a *Aggregator) FinishTx() error { outs.Close() } }() - in, err := a.mergeFiles(context.TODO(), outs, r, maxSpan) + in, err := a.mergeFiles(context.TODO(), outs, r, 1) if err != nil { return err } diff --git a/state/aggregator22.go b/state/aggregator22.go index 6fdacd747..597b0f57d 100644 --- a/state/aggregator22.go +++ b/state/aggregator22.go @@ -56,6 +56,7 @@ type Aggregator22 struct { keepInDB uint64 maxTxNum atomic.Uint64 working atomic.Bool + workingMerge atomic.Bool warmupWorking atomic.Bool } @@ -430,7 +431,7 @@ func (a *Aggregator22) buildFilesInBackground(ctx context.Context, step uint64, return nil } -func (a *Aggregator22) mergeLoopStep(ctx context.Context) (somethingDone bool, err error) { +func (a *Aggregator22) mergeLoopStep(ctx context.Context, workers int) (somethingDone bool, err error) { closeAll := true maxSpan := uint64(32) * a.aggregationStep r := a.findMergeRange(a.maxTxNum.Load(), maxSpan) @@ -444,7 +445,7 @@ func (a *Aggregator22) mergeLoopStep(ctx context.Context) (somethingDone bool, e outs.Close() } }() - in, err := a.mergeFiles(ctx, outs, r, maxSpan) + in, err := a.mergeFiles(ctx, outs, r, maxSpan, workers) if err != nil { return true, err } @@ -460,9 +461,9 @@ func (a *Aggregator22) mergeLoopStep(ctx context.Context) (somethingDone bool, e closeAll = false return true, nil } -func (a *Aggregator22) MergeLoop(ctx context.Context) error { +func (a *Aggregator22) MergeLoop(ctx context.Context, workers int) error { for { - somethingMerged, err := a.mergeLoopStep(ctx) + somethingMerged, err := a.mergeLoopStep(ctx, workers) if err != nil { return err } @@ -606,17 +607,21 @@ func (a *Aggregator22) Flush(tx kv.RwTx) error { func (a *Aggregator22) CanPrune(tx kv.Tx) bool { return a.CanPruneFrom(tx) < a.maxTxNum.Load() } func (a *Aggregator22) CanPruneFrom(tx kv.Tx) uint64 { fst, _ := kv.FirstKey(tx, kv.TracesToKeys) - if len(fst) > 0 { + fst2, _ := kv.FirstKey(tx, kv.StorageHistoryKeys) + if len(fst) > 0 && len(fst2) > 0 { fstInDb := binary.BigEndian.Uint64(fst) - if fstInDb < a.maxTxNum.Load() { - return fstInDb - } + fstInDb2 := binary.BigEndian.Uint64(fst2) + return cmp.Min(fstInDb, fstInDb2) } return math2.MaxUint64 } func (a *Aggregator22) Prune(ctx context.Context, limit uint64) error { a.Warmup(0, cmp.Max(a.aggregationStep, limit)) // warmup is asyn and moving faster than data deletion - defer func(t time.Time) { log.Debug(fmt.Sprintf("prune took: %s\n", time.Since(t))) }(time.Now()) + //defer func(t time.Time) { + // if time.Since(t) > 150*time.Millisecond { + // log.Debug(fmt.Sprintf("prune took: %s\n", time.Since(t))) + // } + //}(time.Now()) return a.prune(ctx, 0, a.maxTxNum.Load(), limit) } @@ -829,7 +834,7 @@ func (mf MergedFiles22) Close() { } } -func (a *Aggregator22) mergeFiles(ctx context.Context, files SelectedStaticFiles22, r Ranges22, maxSpan uint64) (MergedFiles22, error) { +func (a *Aggregator22) mergeFiles(ctx context.Context, files SelectedStaticFiles22, r Ranges22, maxSpan uint64, workers int) (MergedFiles22, error) { var mf MergedFiles22 closeFiles := true defer func() { @@ -837,76 +842,76 @@ func (a *Aggregator22) mergeFiles(ctx context.Context, files SelectedStaticFiles mf.Close() } }() - var wg sync.WaitGroup - wg.Add(7) + //var wg sync.WaitGroup + //wg.Add(7) errCh := make(chan error, 7) - go func() { - defer wg.Done() - var err error - if r.accounts.any() { - if mf.accountsIdx, mf.accountsHist, err = a.accounts.mergeFiles(ctx, files.accountsIdx, files.accountsHist, r.accounts, maxSpan); err != nil { - errCh <- err - } + //go func() { + // defer wg.Done() + var err error + if r.accounts.any() { + if mf.accountsIdx, mf.accountsHist, err = a.accounts.mergeFiles(ctx, files.accountsIdx, files.accountsHist, r.accounts, workers); err != nil { + errCh <- err } - }() - go func() { - defer wg.Done() - var err error - if r.storage.any() { - if mf.storageIdx, mf.storageHist, err = a.storage.mergeFiles(ctx, files.storageIdx, files.storageHist, r.storage, maxSpan); err != nil { - errCh <- err - } + } + //}() + //go func() { + // defer wg.Done() + // var err error + if r.storage.any() { + if mf.storageIdx, mf.storageHist, err = a.storage.mergeFiles(ctx, files.storageIdx, files.storageHist, r.storage, workers); err != nil { + errCh <- err } - }() - go func() { - defer wg.Done() - var err error - if r.code.any() { - if mf.codeIdx, mf.codeHist, err = a.code.mergeFiles(ctx, files.codeIdx, files.codeHist, r.code, maxSpan); err != nil { - errCh <- err - } + } + //}() + //go func() { + // defer wg.Done() + // var err error + if r.code.any() { + if mf.codeIdx, mf.codeHist, err = a.code.mergeFiles(ctx, files.codeIdx, files.codeHist, r.code, workers); err != nil { + errCh <- err } - }() - go func() { - defer wg.Done() - var err error - if r.logAddrs { - if mf.logAddrs, err = a.logAddrs.mergeFiles(ctx, files.logAddrs, r.logAddrsStartTxNum, r.logAddrsEndTxNum, maxSpan); err != nil { - errCh <- err - } + } + //}() + //go func() { + // defer wg.Done() + // var err error + if r.logAddrs { + if mf.logAddrs, err = a.logAddrs.mergeFiles(ctx, files.logAddrs, r.logAddrsStartTxNum, r.logAddrsEndTxNum, workers); err != nil { + errCh <- err } - }() - go func() { - defer wg.Done() - var err error - if r.logTopics { - if mf.logTopics, err = a.logTopics.mergeFiles(ctx, files.logTopics, r.logTopicsStartTxNum, r.logTopicsEndTxNum, maxSpan); err != nil { - errCh <- err - } + } + //}() + //go func() { + // defer wg.Done() + // var err error + if r.logTopics { + if mf.logTopics, err = a.logTopics.mergeFiles(ctx, files.logTopics, r.logTopicsStartTxNum, r.logTopicsEndTxNum, workers); err != nil { + errCh <- err } - }() - go func() { - defer wg.Done() - var err error - if r.tracesFrom { - if mf.tracesFrom, err = a.tracesFrom.mergeFiles(ctx, files.tracesFrom, r.tracesFromStartTxNum, r.tracesFromEndTxNum, maxSpan); err != nil { - errCh <- err - } + } + //}() + //go func() { + // defer wg.Done() + // var err error + if r.tracesFrom { + if mf.tracesFrom, err = a.tracesFrom.mergeFiles(ctx, files.tracesFrom, r.tracesFromStartTxNum, r.tracesFromEndTxNum, workers); err != nil { + errCh <- err } - }() - go func() { - defer wg.Done() - var err error - if r.tracesTo { - if mf.tracesTo, err = a.tracesTo.mergeFiles(ctx, files.tracesTo, r.tracesToStartTxNum, r.tracesToEndTxNum, maxSpan); err != nil { - errCh <- err - } + } + //}() + //go func() { + // defer wg.Done() + // var err error + if r.tracesTo { + if mf.tracesTo, err = a.tracesTo.mergeFiles(ctx, files.tracesTo, r.tracesToStartTxNum, r.tracesToEndTxNum, workers); err != nil { + errCh <- err } - }() - go func() { - wg.Wait() - close(errCh) - }() + } + //}() + //go func() { + // wg.Wait() + close(errCh) + //}() var lastError error for err := range errCh { lastError = err @@ -960,6 +965,7 @@ func (a *Aggregator22) BuildFilesInBackground(db kv.RoDB) error { if (a.txNum.Load() + 1) <= a.maxTxNum.Load()+a.aggregationStep+a.keepInDB { // Leave one step worth in the DB return nil } + step := a.maxTxNum.Load() / a.aggregationStep if a.working.Load() { return nil @@ -988,12 +994,17 @@ func (a *Aggregator22) BuildFilesInBackground(db kv.RoDB) error { } step++ } - // trying to create as much small-step-files as possible: - // - this is reason why run only 1 merge round at a time - // - to remove old data from db as early as possible - if _, err := a.mergeLoopStep(context.Background()); err != nil { - log.Warn("merge", "err", err) + + if a.workingMerge.Load() { + return } + defer a.workingMerge.Store(true) + go func() { + defer a.workingMerge.Store(false) + if err := a.MergeLoop(context.Background(), 1); err != nil { + log.Warn("merge", "err", err) + } + }() }() //if err := a.prune(0, a.maxTxNum.Load(), a.aggregationStep); err != nil { @@ -1254,8 +1265,7 @@ func lastIdInDB(db kv.RoDB, table string) (lstInDb uint64) { } return nil }); err != nil { - _ = err - //return err + log.Warn("lastIdInDB", "err", err) } return lstInDb } diff --git a/state/domain_test.go b/state/domain_test.go index e422edaa1..b16514aa3 100644 --- a/state/domain_test.go +++ b/state/domain_test.go @@ -510,7 +510,7 @@ func collateAndMerge(t *testing.T, db kv.RwDB, d *Domain, txs uint64) { maxSpan := uint64(16 * 16) for r = d.findMergeRange(maxEndTxNum, maxSpan); r.any(); r = d.findMergeRange(maxEndTxNum, maxSpan) { valuesOuts, indexOuts, historyOuts, _ := d.staticFilesInRange(r) - valuesIn, indexIn, historyIn, err := d.mergeFiles(ctx, valuesOuts, indexOuts, historyOuts, r, maxSpan) + valuesIn, indexIn, historyIn, err := d.mergeFiles(ctx, valuesOuts, indexOuts, historyOuts, r, 1) require.NoError(t, err) d.integrateMergedFiles(valuesOuts, indexOuts, historyOuts, valuesIn, indexIn, historyIn) err = d.deleteFiles(valuesOuts, indexOuts, historyOuts) @@ -542,7 +542,7 @@ func collateAndMergeOnce(t *testing.T, d *Domain, step uint64) { maxSpan := d.aggregationStep * d.aggregationStep for r = d.findMergeRange(maxEndTxNum, maxSpan); r.any(); r = d.findMergeRange(maxEndTxNum, maxSpan) { valuesOuts, indexOuts, historyOuts, _ := d.staticFilesInRange(r) - valuesIn, indexIn, historyIn, err := d.mergeFiles(ctx, valuesOuts, indexOuts, historyOuts, r, maxSpan) + valuesIn, indexIn, historyIn, err := d.mergeFiles(ctx, valuesOuts, indexOuts, historyOuts, r, 1) require.NoError(t, err) d.integrateMergedFiles(valuesOuts, indexOuts, historyOuts, valuesIn, indexIn, historyIn) diff --git a/state/history_test.go b/state/history_test.go index 12002c0c8..f5c8a60eb 100644 --- a/state/history_test.go +++ b/state/history_test.go @@ -389,7 +389,7 @@ func collateAndMergeHistory(tb testing.TB, db kv.RwDB, h *History, txs uint64) { maxSpan := uint64(16 * 16) for r = h.findMergeRange(maxEndTxNum, maxSpan); r.any(); r = h.findMergeRange(maxEndTxNum, maxSpan) { indexOuts, historyOuts, _ := h.staticFilesInRange(r) - indexIn, historyIn, err := h.mergeFiles(ctx, indexOuts, historyOuts, r, maxSpan) + indexIn, historyIn, err := h.mergeFiles(ctx, indexOuts, historyOuts, r, 1) require.NoError(tb, err) h.integrateMergedFiles(indexOuts, historyOuts, indexIn, historyIn) err = h.deleteFiles(indexOuts, historyOuts) diff --git a/state/inverted_index_test.go b/state/inverted_index_test.go index 7a1f4e9d0..a51774869 100644 --- a/state/inverted_index_test.go +++ b/state/inverted_index_test.go @@ -319,7 +319,7 @@ func mergeInverted(t *testing.T, db kv.RwDB, ii *InvertedIndex, txs uint64) { maxSpan := uint64(16 * 16) for found, startTxNum, endTxNum = ii.findMergeRange(maxEndTxNum, maxSpan); found; found, startTxNum, endTxNum = ii.findMergeRange(maxEndTxNum, maxSpan) { outs, _ := ii.staticFilesInRange(startTxNum, endTxNum) - in, err := ii.mergeFiles(ctx, outs, startTxNum, endTxNum, maxSpan) + in, err := ii.mergeFiles(ctx, outs, startTxNum, endTxNum, 1) require.NoError(t, err) ii.integrateMergedFiles(outs, in) err = ii.deleteFiles(outs) diff --git a/state/merge.go b/state/merge.go index ba2320414..299fbafb0 100644 --- a/state/merge.go +++ b/state/merge.go @@ -302,7 +302,7 @@ func mergeEfs(preval, val, buf []byte) ([]byte, error) { return newEf.AppendBytes(buf), nil } -func (d *Domain) mergeFiles(ctx context.Context, valuesFiles, indexFiles, historyFiles []*filesItem, r DomainRanges, maxSpan uint64) (valuesIn, indexIn, historyIn *filesItem, err error) { +func (d *Domain) mergeFiles(ctx context.Context, valuesFiles, indexFiles, historyFiles []*filesItem, r DomainRanges, workers int) (valuesIn, indexIn, historyIn *filesItem, err error) { if !r.any() { return } @@ -350,7 +350,7 @@ func (d *Domain) mergeFiles(ctx context.Context, valuesFiles, indexFiles, histor history: r.history, indexStartTxNum: r.indexStartTxNum, indexEndTxNum: r.indexEndTxNum, - index: r.index}, maxSpan); err != nil { + index: r.index}, workers); err != nil { return nil, nil, nil, err } if r.values { @@ -360,7 +360,7 @@ func (d *Domain) mergeFiles(ctx context.Context, valuesFiles, indexFiles, histor } datPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kv", d.filenameBase, r.valuesStartTxNum/d.aggregationStep, r.valuesEndTxNum/d.aggregationStep)) - if comp, err = compress.NewCompressor(context.Background(), "merge", datPath, d.tmpdir, compress.MinPatternScore, d.workers, log.LvlDebug); err != nil { + if comp, err = compress.NewCompressor(context.Background(), "merge", datPath, d.tmpdir, compress.MinPatternScore, workers, log.LvlDebug); err != nil { return nil, nil, nil, fmt.Errorf("merge %s history compressor: %w", d.filenameBase, err) } var cp CursorHeap @@ -484,7 +484,7 @@ func (d *Domain) mergeFiles(ctx context.Context, valuesFiles, indexFiles, histor // d.valueMergeFn = merge //} -func (ii *InvertedIndex) mergeFiles(ctx context.Context, files []*filesItem, startTxNum, endTxNum uint64, maxSpan uint64) (*filesItem, error) { +func (ii *InvertedIndex) mergeFiles(ctx context.Context, files []*filesItem, startTxNum, endTxNum uint64, workers int) (*filesItem, error) { for _, h := range files { defer h.decompressor.EnableMadvNormal().DisableReadAhead() } @@ -515,7 +515,7 @@ func (ii *InvertedIndex) mergeFiles(ctx context.Context, files []*filesItem, sta } }() datPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.ef", ii.filenameBase, startTxNum/ii.aggregationStep, endTxNum/ii.aggregationStep)) - if comp, err = compress.NewCompressor(ctx, "Snapshots merge", datPath, ii.tmpdir, compress.MinPatternScore, ii.workers, log.LvlDebug); err != nil { + if comp, err = compress.NewCompressor(ctx, "Snapshots merge", datPath, ii.tmpdir, compress.MinPatternScore, workers, log.LvlDebug); err != nil { return nil, fmt.Errorf("merge %s inverted index compressor: %w", ii.filenameBase, err) } var cp CursorHeap @@ -608,7 +608,7 @@ func (ii *InvertedIndex) mergeFiles(ctx context.Context, files []*filesItem, sta return outItem, nil } -func (h *History) mergeFiles(ctx context.Context, indexFiles, historyFiles []*filesItem, r HistoryRanges, maxSpan uint64) (indexIn, historyIn *filesItem, err error) { +func (h *History) mergeFiles(ctx context.Context, indexFiles, historyFiles []*filesItem, r HistoryRanges, workers int) (indexIn, historyIn *filesItem, err error) { if !r.any() { return nil, nil, nil } @@ -621,7 +621,7 @@ func (h *History) mergeFiles(ctx context.Context, indexFiles, historyFiles []*fi } } }() - if indexIn, err = h.InvertedIndex.mergeFiles(ctx, indexFiles, r.indexStartTxNum, r.indexEndTxNum, maxSpan); err != nil { + if indexIn, err = h.InvertedIndex.mergeFiles(ctx, indexFiles, r.indexStartTxNum, r.indexEndTxNum, workers); err != nil { return nil, nil, err } if r.history { @@ -664,7 +664,7 @@ func (h *History) mergeFiles(ctx context.Context, indexFiles, historyFiles []*fi }() datPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.v", h.filenameBase, r.historyStartTxNum/h.aggregationStep, r.historyEndTxNum/h.aggregationStep)) idxPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, r.historyStartTxNum/h.aggregationStep, r.historyEndTxNum/h.aggregationStep)) - if comp, err = compress.NewCompressor(context.Background(), "merge", datPath, h.tmpdir, compress.MinPatternScore, h.workers, log.LvlDebug); err != nil { + if comp, err = compress.NewCompressor(context.Background(), "merge", datPath, h.tmpdir, compress.MinPatternScore, workers, log.LvlDebug); err != nil { return nil, nil, fmt.Errorf("merge %s history compressor: %w", h.filenameBase, err) } var cp CursorHeap diff --git a/state/read_indices.go b/state/read_indices.go index a0985c6cb..cf014b96d 100644 --- a/state/read_indices.go +++ b/state/read_indices.go @@ -298,7 +298,7 @@ func (mf RMergedFiles) Close() { } } -func (ri *ReadIndices) mergeFiles(ctx context.Context, files RSelectedStaticFiles, r RRanges, maxSpan uint64) (RMergedFiles, error) { +func (ri *ReadIndices) mergeFiles(ctx context.Context, files RSelectedStaticFiles, r RRanges, workers int) (RMergedFiles, error) { var mf RMergedFiles closeFiles := true defer func() { @@ -313,7 +313,7 @@ func (ri *ReadIndices) mergeFiles(ctx context.Context, files RSelectedStaticFile defer wg.Done() var err error if r.accounts { - if mf.accounts, err = ri.accounts.mergeFiles(ctx, files.accounts, r.accountsStartTxNum, r.accountsEndTxNum, maxSpan); err != nil { + if mf.accounts, err = ri.accounts.mergeFiles(ctx, files.accounts, r.accountsStartTxNum, r.accountsEndTxNum, workers); err != nil { errCh <- err } } @@ -322,7 +322,7 @@ func (ri *ReadIndices) mergeFiles(ctx context.Context, files RSelectedStaticFile defer wg.Done() var err error if r.storage { - if mf.storage, err = ri.storage.mergeFiles(ctx, files.storage, r.storageStartTxNum, r.storageEndTxNum, maxSpan); err != nil { + if mf.storage, err = ri.storage.mergeFiles(ctx, files.storage, r.storageStartTxNum, r.storageEndTxNum, workers); err != nil { errCh <- err } } @@ -331,7 +331,7 @@ func (ri *ReadIndices) mergeFiles(ctx context.Context, files RSelectedStaticFile defer wg.Done() var err error if r.code { - if mf.code, err = ri.code.mergeFiles(ctx, files.code, r.codeStartTxNum, r.codeEndTxNum, maxSpan); err != nil { + if mf.code, err = ri.code.mergeFiles(ctx, files.code, r.codeStartTxNum, r.codeEndTxNum, workers); err != nil { errCh <- err } } @@ -429,7 +429,7 @@ func (ri *ReadIndices) FinishTx() error { outs.Close() } }() - in, err := ri.mergeFiles(context.Background(), outs, r, maxSpan) + in, err := ri.mergeFiles(context.Background(), outs, r, 1) if err != nil { return err }