ParallelCompressor: Remove intermediate ETL collectors (#302)

This commit is contained in:
Alex Sharov 2022-02-04 16:48:02 +07:00 committed by GitHub
parent d4b9053aed
commit 567d9ddfed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -181,7 +181,7 @@ func optimiseCluster(trace bool, numBuf []byte, input []byte, trie *patricia.Pat
return output, patterns, uncovered
}
func reduceDictWorker(trace bool, inputCh chan []byte, completion *sync.WaitGroup, trie *patricia.PatriciaTree, collector *etl.Collector, inputSize, outputSize *atomic2.Uint64, posMap map[uint64]uint64) {
func reduceDictWorker(trace bool, inputCh chan []byte, outCh chan *pair, completion *sync.WaitGroup, trie *patricia.PatriciaTree, inputSize, outputSize *atomic2.Uint64, posMap map[uint64]uint64) {
defer completion.Done()
var output = make([]byte, 0, 256)
var uncovered = make([]int, 256)
@ -194,10 +194,7 @@ func reduceDictWorker(trace bool, inputCh chan []byte, completion *sync.WaitGrou
n := binary.PutUvarint(numBuf, uint64(len(input)-8))
output = append(output[:0], numBuf[:n]...)
output, patterns, uncovered = optimiseCluster(trace, numBuf, input[8:], trie, &mf, output, uncovered, patterns, cellRing, posMap)
if err := collector.Collect(input[:8], output); err != nil {
log.Error("Could not collect", "error", err)
return
}
outCh <- &pair{k: input[:8], v: common.Copy(output)}
inputSize.Add(1 + uint64(len(input)-8))
outputSize.Add(uint64(len(output)))
posMap[uint64(len(input)-8+1)]++
@ -205,6 +202,8 @@ func reduceDictWorker(trace bool, inputCh chan []byte, completion *sync.WaitGrou
}
}
type pair struct{ k, v []byte }
// reduceDict reduces the dictionary by trying the substitutions and counting frequency for each word
func reducedict(trace bool, logPrefix, segmentFilePath, tmpDir string, datFile *DecompressedFile, workers int, dictBuilder *DictionaryBuilder) error {
logEvery := time.NewTicker(20 * time.Second)
@ -228,22 +227,35 @@ func reducedict(trace bool, logPrefix, segmentFilePath, tmpDir string, datFile *
log.Debug(fmt.Sprintf("[%s] dictionary file parsed", logPrefix), "entries", len(code2pattern))
ch := make(chan []byte, 10_000)
inputSize, outputSize := atomic2.NewUint64(0), atomic2.NewUint64(0)
var wg sync.WaitGroup
var collectors []*etl.Collector
defer func() {
for _, c := range collectors {
c.Close()
}
}()
out := make(chan *pair, 1024)
aggregator := etl.NewCollector(compressLogPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer aggregator.Close()
var wgAggregator sync.WaitGroup
wgAggregator.Add(1)
go func() {
defer wgAggregator.Done()
for a := range out {
if err := aggregator.Collect(a.k, a.v); err != nil {
panic(err)
}
}
}()
var posMaps []map[uint64]uint64
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
//nolint
collector := etl.NewCollector(compressLogPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
collectors = append(collectors, collector)
posMap := make(map[uint64]uint64)
posMaps = append(posMaps, posMap)
wg.Add(1)
go reduceDictWorker(trace, ch, &wg, &pt, collector, inputSize, outputSize, posMap)
go reduceDictWorker(trace, ch, out, &wg, &pt, inputSize, outputSize, posMap)
}
var wordsCount uint64
if err := datFile.ForEach(func(v []byte) error {
@ -268,10 +280,11 @@ func reducedict(trace bool, logPrefix, segmentFilePath, tmpDir string, datFile *
}
close(ch)
wg.Wait()
close(out)
wgAggregator.Wait()
var m runtime.MemStats
runtime.ReadMemStats(&m)
//var m runtime.MemStats
//runtime.ReadMemStats(&m)
//log.Info(fmt.Sprintf("[%s] Dictionary build done", logPrefix), "input", common.ByteCount(inputSize.Load()), "output", common.ByteCount(outputSize.Load()), "alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys))
posMap := make(map[uint64]uint64)
for _, m := range posMaps {
@ -528,22 +541,11 @@ func reducedict(trace bool, logPrefix, segmentFilePath, tmpDir string, datFile *
}
log.Debug(fmt.Sprintf("[%s] Positional dictionary", logPrefix), "size", common.ByteCount(offset), "position cutoff", positionCutoff)
aggregator := etl.NewCollector(compressLogPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer aggregator.Close()
for _, collector := range collectors {
if err = collector.Load(nil, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
return aggregator.Collect(k, v)
}, etl.TransformArgs{}); err != nil {
return err
}
collector.Close()
}
wc := 0
var hc HuffmanCoder
hc.w = cw
r := bytes.NewReader(nil)
if err = aggregator.Load(nil, "", func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
if err = aggregator.Load(nil, "", func(_, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
// Re-encode it
r.Reset(v)
var l uint64