From ff3e0745492281b11c46b5032c291c2e4d7690db Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Fri, 18 Mar 2022 16:09:57 +0000 Subject: [PATCH] No handover (#378) Co-authored-by: Alexey Sharp --- aggregator/aggregator.go | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 97465cc2e..d1413ed5a 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -164,7 +164,6 @@ type Aggregator struct { changesets bool // Whether to generate changesets (off by default) commitments bool // Whether to calculate commitments aggChannel chan *AggregationTask - aggBackCh chan struct{} // Channel for acknoledgement of AggregationTask aggError chan error aggWg sync.WaitGroup mergeChannel chan struct{} @@ -948,7 +947,6 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, c keccak: sha3.NewLegacyKeccak256(), hph: commitment.NewHexPatriciaHashed(length.Addr, nil, nil, nil, nil, nil), aggChannel: make(chan *AggregationTask), - aggBackCh: make(chan struct{}), aggError: make(chan error, 1), mergeChannel: make(chan struct{}, 1), mergeError: make(chan error, 1), @@ -1183,17 +1181,6 @@ func removeFiles(fType FileType, diffDir string, toRemove []*byEndBlockItem) err func (a *Aggregator) backgroundAggregation() { defer a.aggWg.Done() for aggTask := range a.aggChannel { - typesLimit := Commitment - if a.commitments { - typesLimit = AccountHistory - } - for fType := FirstType; fType < typesLimit; fType++ { - if fType < NumberOfStateTypes { - a.updateArch(aggTask.bt[fType], fType, uint32(aggTask.blockTo)) - } - a.addLocked(fType, &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo, tree: aggTask.bt[fType]}) - } - a.aggBackCh <- struct{}{} if a.changesets { if historyD, historyI, bitmapD, bitmapI, err := aggTask.changes[Account].produceChangeSets(aggTask.blockFrom, aggTask.blockTo, AccountHistory, AccountBitmap); err == nil { var historyItem = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo} @@ -1259,6 +1246,10 @@ func (a *Aggregator) backgroundAggregation() { return } } + typesLimit := Commitment + if a.commitments { + typesLimit = AccountHistory + } for fType := FirstType; fType < typesLimit; fType++ { var err error if err = aggTask.changes[fType].closeFiles(); err != nil { @@ -2902,8 +2893,13 @@ func (w *Writer) aggregateUpto(blockFrom, blockTo uint64) error { t = time.Now() // At this point, all the changes are gathered in 4 B-trees (accounts, code, storage and commitment) and removed from the database // What follows can be done in the 1st background goroutine + for fType := FirstType; fType < typesLimit; fType++ { + if fType < NumberOfStateTypes { + w.a.updateArch(aggTask.bt[fType], fType, uint32(aggTask.blockTo)) + } + w.a.addLocked(fType, &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo, tree: aggTask.bt[fType]}) + } w.a.aggChannel <- &aggTask - <-w.a.aggBackCh // Waiting for the B-tree based items have been added handoverTime := time.Since(t) if handoverTime > time.Second { log.Info("Long handover to background aggregation", "from", blockFrom, "to", blockTo, "composition", aggTime, "handover", time.Since(t))