snapshots: same workers amount #233

This commit is contained in:
Alex Sharov 2022-01-15 11:23:19 +07:00 committed by GitHub
parent c08ed12256
commit 01a6417505
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -96,7 +96,7 @@ func Compress(ctx context.Context, logPrefix, tmpFilePath, segmentFilePath strin
return err return err
} }
if err := reducedict(logPrefix, tmpFilePath, dictPath, tmpSegmentFilePath, tmpDir); err != nil { if err := reducedict(logPrefix, tmpFilePath, dictPath, tmpSegmentFilePath, tmpDir, workers); err != nil {
return err return err
} }
@ -275,7 +275,7 @@ func reduceDictWorker(inputCh chan []byte, completion *sync.WaitGroup, trie *pat
} }
// reduceDict reduces the dictionary by trying the substitutions and counting frequency for each word // reduceDict reduces the dictionary by trying the substitutions and counting frequency for each word
func reducedict(logPrefix, tmpFilePath, dictPath, segmentFilePath, tmpDir string) error { func reducedict(logPrefix, tmpFilePath, dictPath, segmentFilePath, tmpDir string, workers int) error {
logEvery := time.NewTicker(20 * time.Second) logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop() defer logEvery.Stop()
@ -300,7 +300,6 @@ func reducedict(logPrefix, tmpFilePath, dictPath, segmentFilePath, tmpDir string
ch := make(chan []byte, 10000) ch := make(chan []byte, 10000)
inputSize, outputSize := atomic2.NewUint64(0), atomic2.NewUint64(0) inputSize, outputSize := atomic2.NewUint64(0), atomic2.NewUint64(0)
var wg sync.WaitGroup var wg sync.WaitGroup
workers := runtime.NumCPU() / 2
var collectors []*etl.Collector var collectors []*etl.Collector
defer func() { defer func() {
for _, c := range collectors { for _, c := range collectors {