From 01a6417505e5d5848f1c34baa442e7057a70e096 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sat, 15 Jan 2022 11:23:19 +0700 Subject: [PATCH] snapshots: same workers amount #233 --- compress/parallel_compress.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/compress/parallel_compress.go b/compress/parallel_compress.go index 16b4a0128..785d3891c 100644 --- a/compress/parallel_compress.go +++ b/compress/parallel_compress.go @@ -96,7 +96,7 @@ func Compress(ctx context.Context, logPrefix, tmpFilePath, segmentFilePath strin return err } - if err := reducedict(logPrefix, tmpFilePath, dictPath, tmpSegmentFilePath, tmpDir); err != nil { + if err := reducedict(logPrefix, tmpFilePath, dictPath, tmpSegmentFilePath, tmpDir, workers); err != nil { return err } @@ -275,7 +275,7 @@ func reduceDictWorker(inputCh chan []byte, completion *sync.WaitGroup, trie *pat } // reduceDict reduces the dictionary by trying the substitutions and counting frequency for each word -func reducedict(logPrefix, tmpFilePath, dictPath, segmentFilePath, tmpDir string) error { +func reducedict(logPrefix, tmpFilePath, dictPath, segmentFilePath, tmpDir string, workers int) error { logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() @@ -300,7 +300,6 @@ func reducedict(logPrefix, tmpFilePath, dictPath, segmentFilePath, tmpDir string ch := make(chan []byte, 10000) inputSize, outputSize := atomic2.NewUint64(0), atomic2.NewUint64(0) var wg sync.WaitGroup - workers := runtime.NumCPU() / 2 var collectors []*etl.Collector defer func() { for _, c := range collectors {