diff --git a/compress/parallel_compress.go b/compress/parallel_compress.go index 96ec1cba4..e5c5664fa 100644 --- a/compress/parallel_compress.go +++ b/compress/parallel_compress.go @@ -181,7 +181,7 @@ func optimiseCluster(trace bool, numBuf []byte, input []byte, trie *patricia.Pat return output, patterns, uncovered } -func reduceDictWorker(trace bool, inputCh chan []byte, completion *sync.WaitGroup, trie *patricia.PatriciaTree, collector *etl.Collector, inputSize, outputSize *atomic2.Uint64, posMap map[uint64]uint64) { +func reduceDictWorker(trace bool, inputCh chan []byte, outCh chan *pair, completion *sync.WaitGroup, trie *patricia.PatriciaTree, inputSize, outputSize *atomic2.Uint64, posMap map[uint64]uint64) { defer completion.Done() var output = make([]byte, 0, 256) var uncovered = make([]int, 256) @@ -194,10 +194,7 @@ func reduceDictWorker(trace bool, inputCh chan []byte, completion *sync.WaitGrou n := binary.PutUvarint(numBuf, uint64(len(input)-8)) output = append(output[:0], numBuf[:n]...) output, patterns, uncovered = optimiseCluster(trace, numBuf, input[8:], trie, &mf, output, uncovered, patterns, cellRing, posMap) - if err := collector.Collect(input[:8], output); err != nil { - log.Error("Could not collect", "error", err) - return - } + outCh <- &pair{k: input[:8], v: common.Copy(output)} inputSize.Add(1 + uint64(len(input)-8)) outputSize.Add(uint64(len(output))) posMap[uint64(len(input)-8+1)]++ @@ -205,6 +202,8 @@ func reduceDictWorker(trace bool, inputCh chan []byte, completion *sync.WaitGrou } } +type pair struct{ k, v []byte } + // reduceDict reduces the dictionary by trying the substitutions and counting frequency for each word func reducedict(trace bool, logPrefix, segmentFilePath, tmpDir string, datFile *DecompressedFile, workers int, dictBuilder *DictionaryBuilder) error { logEvery := time.NewTicker(20 * time.Second) @@ -228,22 +227,35 @@ func reducedict(trace bool, logPrefix, segmentFilePath, tmpDir string, datFile * log.Debug(fmt.Sprintf("[%s] dictionary file parsed", logPrefix), "entries", len(code2pattern)) ch := make(chan []byte, 10_000) inputSize, outputSize := atomic2.NewUint64(0), atomic2.NewUint64(0) - var wg sync.WaitGroup + var collectors []*etl.Collector defer func() { for _, c := range collectors { c.Close() } }() + out := make(chan *pair, 1024) + + aggregator := etl.NewCollector(compressLogPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize)) + defer aggregator.Close() + var wgAggregator sync.WaitGroup + wgAggregator.Add(1) + go func() { + defer wgAggregator.Done() + for a := range out { + if err := aggregator.Collect(a.k, a.v); err != nil { + panic(err) + } + } + }() var posMaps []map[uint64]uint64 + var wg sync.WaitGroup for i := 0; i < workers; i++ { //nolint - collector := etl.NewCollector(compressLogPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize)) - collectors = append(collectors, collector) posMap := make(map[uint64]uint64) posMaps = append(posMaps, posMap) wg.Add(1) - go reduceDictWorker(trace, ch, &wg, &pt, collector, inputSize, outputSize, posMap) + go reduceDictWorker(trace, ch, out, &wg, &pt, inputSize, outputSize, posMap) } var wordsCount uint64 if err := datFile.ForEach(func(v []byte) error { @@ -268,10 +280,11 @@ func reducedict(trace bool, logPrefix, segmentFilePath, tmpDir string, datFile * } close(ch) wg.Wait() + close(out) + wgAggregator.Wait() - var m runtime.MemStats - runtime.ReadMemStats(&m) - + //var m runtime.MemStats + //runtime.ReadMemStats(&m) //log.Info(fmt.Sprintf("[%s] Dictionary build done", logPrefix), "input", common.ByteCount(inputSize.Load()), "output", common.ByteCount(outputSize.Load()), "alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys)) posMap := make(map[uint64]uint64) for _, m := range posMaps { @@ -528,22 +541,11 @@ func reducedict(trace bool, logPrefix, segmentFilePath, tmpDir string, datFile * } log.Debug(fmt.Sprintf("[%s] Positional dictionary", logPrefix), "size", common.ByteCount(offset), "position cutoff", positionCutoff) - aggregator := etl.NewCollector(compressLogPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize)) - defer aggregator.Close() - for _, collector := range collectors { - if err = collector.Load(nil, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { - return aggregator.Collect(k, v) - }, etl.TransformArgs{}); err != nil { - return err - } - collector.Close() - } - wc := 0 var hc HuffmanCoder hc.w = cw r := bytes.NewReader(nil) - if err = aggregator.Load(nil, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { + if err = aggregator.Load(nil, "", func(_, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { // Re-encode it r.Reset(v) var l uint64