diff --git a/compress/parallel_compress.go b/compress/parallel_compress.go index 257b54dbb..b11922696 100644 --- a/compress/parallel_compress.go +++ b/compress/parallel_compress.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "os" + "path/filepath" "runtime" "sort" "strconv" @@ -31,6 +32,10 @@ import ( const minPatternScore = 1024 func Compress(ctx context.Context, logPrefix, tmpFilePath, segmentFilePath string, workers int) error { + tmpDir, _ := filepath.Split(tmpFilePath) + _, fileName := filepath.Split(segmentFilePath) + tmpSegmentFilePath := filepath.Join(tmpDir, fileName) + logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() @@ -39,7 +44,6 @@ func Compress(ctx context.Context, logPrefix, tmpFilePath, segmentFilePath strin var superstring []byte // Collector for dictionary words (sorted by their score) - tmpDir := "" ch := make(chan []byte, workers) var wg sync.WaitGroup wg.Add(workers) @@ -92,7 +96,11 @@ func Compress(ctx context.Context, logPrefix, tmpFilePath, segmentFilePath strin return err } - if err := reducedict(logPrefix, tmpFilePath, dictPath, segmentFilePath); err != nil { + if err := reducedict(logPrefix, tmpFilePath, dictPath, tmpSegmentFilePath, tmpDir); err != nil { + return err + } + + if err := os.Rename(tmpSegmentFilePath, segmentFilePath); err != nil { return err } return nil @@ -267,7 +275,7 @@ func reduceDictWorker(inputCh chan []byte, completion *sync.WaitGroup, trie *pat } // reduceDict reduces the dictionary by trying the substitutions and counting frequency for each word -func reducedict(logPrefix, tmpFilePath, dictPath, segmentFilePath string) error { +func reducedict(logPrefix, tmpFilePath, dictPath, segmentFilePath, tmpDir string) error { logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() @@ -289,7 +297,6 @@ func reducedict(logPrefix, tmpFilePath, dictPath, segmentFilePath string) error return err } log.Info(fmt.Sprintf("[%s] dictionary file parsed", logPrefix), "entries", len(code2pattern)) - tmpDir := "" ch := make(chan []byte, 10000) inputSize, outputSize := atomic2.NewUint64(0), atomic2.NewUint64(0) var wg sync.WaitGroup