erigon3: allow set workers amount for history compress and merge #657

This commit is contained in:
Alex Sharov 2022-09-28 14:31:28 +07:00 committed by GitHub
parent 6c929b7771
commit ec49625cd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 3 deletions

View File

@ -171,6 +171,17 @@ func (a *Aggregator) SetTxNum(txNum uint64) {
a.tracesTo.SetTxNum(txNum)
}
func (a *Aggregator) SetWorkers(i int) {
a.accounts.workers = i
a.storage.workers = i
a.code.workers = i
a.commitment.workers = i
a.logAddrs.workers = i
a.logTopics.workers = i
a.tracesFrom.workers = i
a.tracesTo.workers = i
}
type AggCollation struct {
accounts Collation
storage Collation

View File

@ -315,7 +315,7 @@ func (d *Domain) mergeFiles(valuesFiles, indexFiles, historyFiles []*filesItem,
}
if r.values {
datPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kv", d.filenameBase, r.valuesStartTxNum/d.aggregationStep, r.valuesEndTxNum/d.aggregationStep))
if comp, err = compress.NewCompressor(context.Background(), "merge", datPath, d.dir, compress.MinPatternScore, 1, log.LvlDebug); err != nil {
if comp, err = compress.NewCompressor(context.Background(), "merge", datPath, d.dir, compress.MinPatternScore, d.workers, log.LvlDebug); err != nil {
return nil, nil, nil, fmt.Errorf("merge %s history compressor: %w", d.filenameBase, err)
}
var cp CursorHeap
@ -465,7 +465,7 @@ func (ii *InvertedIndex) mergeFiles(files []*filesItem, startTxNum, endTxNum uin
}
}()
datPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.ef", ii.filenameBase, startTxNum/ii.aggregationStep, endTxNum/ii.aggregationStep))
if comp, err = compress.NewCompressor(context.Background(), "Snapshots merge", datPath, ii.dir, compress.MinPatternScore, 1, log.LvlDebug); err != nil {
if comp, err = compress.NewCompressor(context.Background(), "Snapshots merge", datPath, ii.dir, compress.MinPatternScore, ii.workers, log.LvlDebug); err != nil {
return nil, fmt.Errorf("merge %s inverted index compressor: %w", ii.filenameBase, err)
}
var cp CursorHeap
@ -602,7 +602,7 @@ func (h *History) mergeFiles(indexFiles, historyFiles []*filesItem, r HistoryRan
}()
datPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.v", h.filenameBase, r.historyStartTxNum/h.aggregationStep, r.historyEndTxNum/h.aggregationStep))
idxPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, r.historyStartTxNum/h.aggregationStep, r.historyEndTxNum/h.aggregationStep))
if comp, err = compress.NewCompressor(context.Background(), "merge", datPath, h.dir, compress.MinPatternScore, 1, log.LvlDebug); err != nil {
if comp, err = compress.NewCompressor(context.Background(), "merge", datPath, h.dir, compress.MinPatternScore, h.workers, log.LvlDebug); err != nil {
return nil, nil, fmt.Errorf("merge %s history compressor: %w", h.filenameBase, err)
}
var cp CursorHeap