diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 22321ba6c..8eed43220 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -551,7 +551,7 @@ func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, tx kv.RwTx datPath := path.Join(c.dir, fmt.Sprintf("%s.%d-%d.dat", c.namebase, blockFrom, blockTo)) idxPath := path.Join(c.dir, fmt.Sprintf("%s.%d-%d.idx", c.namebase, blockFrom, blockTo)) var count int - if count, err = btreeToFile(bt, datPath, c.dir, false /* trace */, 8 /* workers */); err != nil { + if count, err = btreeToFile(bt, datPath, c.dir, false /* trace */, 1 /* workers */); err != nil { return nil, nil, fmt.Errorf("btreeToFile: %w", err) } return buildIndex(datPath, idxPath, c.dir, count) @@ -567,9 +567,9 @@ func (i *AggregateItem) Less(than btree.Item) bool { } func (c *Changes) produceChangeSets(datPath, idxPath string) error { - comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datPath, c.dir, compress.MinPatternScore, 8) + comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datPath, c.dir, compress.MinPatternScore, 1) if err != nil { - return fmt.Errorf("produceChangeSets NewCompressorSequential: %w", err) + return fmt.Errorf("produceChangeSets NewCompressor: %w", err) } defer comp.Close() var totalRecords int @@ -2151,7 +2151,7 @@ func (a *Aggregator) mergeIntoStateFile(cp *CursorHeap, prefixLen int, basename datPath := path.Join(dir, fmt.Sprintf("%s.%d-%d.dat", basename, startBlock, endBlock)) idxPath := path.Join(dir, fmt.Sprintf("%s.%d-%d.idx", basename, startBlock, endBlock)) //comp, err := compress.NewCompressorSequential(AggregatorPrefix, datPath, dir, compress.MinPatternScore) - comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datPath, dir, compress.MinPatternScore, 8) + comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datPath, dir, compress.MinPatternScore, 1) if err != nil { return nil, nil, fmt.Errorf("compressor %s: %w", datPath, err) }