Update aggregator.go (#274)

This commit is contained in:
ledgerwatch 2022-01-24 22:43:41 +00:00 committed by GitHub
parent 7ec016b160
commit a7ec201c0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -551,7 +551,7 @@ func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, tx kv.RwTx
datPath := path.Join(c.dir, fmt.Sprintf("%s.%d-%d.dat", c.namebase, blockFrom, blockTo)) datPath := path.Join(c.dir, fmt.Sprintf("%s.%d-%d.dat", c.namebase, blockFrom, blockTo))
idxPath := path.Join(c.dir, fmt.Sprintf("%s.%d-%d.idx", c.namebase, blockFrom, blockTo)) idxPath := path.Join(c.dir, fmt.Sprintf("%s.%d-%d.idx", c.namebase, blockFrom, blockTo))
var count int var count int
if count, err = btreeToFile(bt, datPath, c.dir, false /* trace */, 8 /* workers */); err != nil { if count, err = btreeToFile(bt, datPath, c.dir, false /* trace */, 1 /* workers */); err != nil {
return nil, nil, fmt.Errorf("btreeToFile: %w", err) return nil, nil, fmt.Errorf("btreeToFile: %w", err)
} }
return buildIndex(datPath, idxPath, c.dir, count) return buildIndex(datPath, idxPath, c.dir, count)
@ -567,9 +567,9 @@ func (i *AggregateItem) Less(than btree.Item) bool {
} }
func (c *Changes) produceChangeSets(datPath, idxPath string) error { func (c *Changes) produceChangeSets(datPath, idxPath string) error {
comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datPath, c.dir, compress.MinPatternScore, 8) comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datPath, c.dir, compress.MinPatternScore, 1)
if err != nil { if err != nil {
return fmt.Errorf("produceChangeSets NewCompressorSequential: %w", err) return fmt.Errorf("produceChangeSets NewCompressor: %w", err)
} }
defer comp.Close() defer comp.Close()
var totalRecords int var totalRecords int
@ -2151,7 +2151,7 @@ func (a *Aggregator) mergeIntoStateFile(cp *CursorHeap, prefixLen int, basename
datPath := path.Join(dir, fmt.Sprintf("%s.%d-%d.dat", basename, startBlock, endBlock)) datPath := path.Join(dir, fmt.Sprintf("%s.%d-%d.dat", basename, startBlock, endBlock))
idxPath := path.Join(dir, fmt.Sprintf("%s.%d-%d.idx", basename, startBlock, endBlock)) idxPath := path.Join(dir, fmt.Sprintf("%s.%d-%d.idx", basename, startBlock, endBlock))
//comp, err := compress.NewCompressorSequential(AggregatorPrefix, datPath, dir, compress.MinPatternScore) //comp, err := compress.NewCompressorSequential(AggregatorPrefix, datPath, dir, compress.MinPatternScore)
comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datPath, dir, compress.MinPatternScore, 8) comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datPath, dir, compress.MinPatternScore, 1)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("compressor %s: %w", datPath, err) return nil, nil, fmt.Errorf("compressor %s: %w", datPath, err)
} }