diff --git a/compress/compress.go b/compress/compress.go index dc02c37fe..e07e69197 100644 --- a/compress/compress.go +++ b/compress/compress.go @@ -168,7 +168,7 @@ func (c *Compressor) Compress() error { } defer os.Remove(c.tmpOutFilePath) - if err := reducedict(c.trace, c.logPrefix, c.tmpOutFilePath, c.tmpDir, c.uncompressedFile, c.workers, db); err != nil { + if err := reducedict(c.ctx, c.trace, c.logPrefix, c.tmpOutFilePath, c.uncompressedFile, c.workers, db); err != nil { return err } diff --git a/compress/parallel_compress.go b/compress/parallel_compress.go index 6988115f7..dbc96e77c 100644 --- a/compress/parallel_compress.go +++ b/compress/parallel_compress.go @@ -238,7 +238,7 @@ func (cq *CompressionQueue) Pop() interface{} { } // reduceDict reduces the dictionary by trying the substitutions and counting frequency for each word -func reducedict(trace bool, logPrefix, segmentFilePath, tmpDir string, datFile *DecompressedFile, workers int, dictBuilder *DictionaryBuilder) error { +func reducedict(ctx context.Context, trace bool, logPrefix, segmentFilePath string, datFile *DecompressedFile, workers int, dictBuilder *DictionaryBuilder) error { logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() @@ -306,6 +306,11 @@ func reducedict(trace bool, logPrefix, segmentFilePath, tmpDir string, datFile * var inCount, outCount, emptyWordsCount uint64 // Counters words sent to compression and returned for compression var numBuf [binary.MaxVarintLen64]byte if err = datFile.ForEach(func(v []byte, compression bool) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } if workers > 1 { // take processed words in non-blocking way and push them to the queue outer: