diff --git a/compress/parallel_compress.go b/compress/parallel_compress.go index 16b4a0128..785d3891c 100644 --- a/compress/parallel_compress.go +++ b/compress/parallel_compress.go @@ -96,7 +96,7 @@ func Compress(ctx context.Context, logPrefix, tmpFilePath, segmentFilePath strin return err } - if err := reducedict(logPrefix, tmpFilePath, dictPath, tmpSegmentFilePath, tmpDir); err != nil { + if err := reducedict(logPrefix, tmpFilePath, dictPath, tmpSegmentFilePath, tmpDir, workers); err != nil { return err } @@ -275,7 +275,7 @@ func reduceDictWorker(inputCh chan []byte, completion *sync.WaitGroup, trie *pat } // reduceDict reduces the dictionary by trying the substitutions and counting frequency for each word -func reducedict(logPrefix, tmpFilePath, dictPath, segmentFilePath, tmpDir string) error { +func reducedict(logPrefix, tmpFilePath, dictPath, segmentFilePath, tmpDir string, workers int) error { logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() @@ -300,7 +300,6 @@ func reducedict(logPrefix, tmpFilePath, dictPath, segmentFilePath, tmpDir string ch := make(chan []byte, 10000) inputSize, outputSize := atomic2.NewUint64(0), atomic2.NewUint64(0) var wg sync.WaitGroup - workers := runtime.NumCPU() / 2 var collectors []*etl.Collector defer func() { for _, c := range collectors {