mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-03 17:44:29 +00:00
No handover (#378)
Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
parent
f93ea948d0
commit
ff3e074549
@ -164,7 +164,6 @@ type Aggregator struct {
|
||||
changesets bool // Whether to generate changesets (off by default)
|
||||
commitments bool // Whether to calculate commitments
|
||||
aggChannel chan *AggregationTask
|
||||
aggBackCh chan struct{} // Channel for acknoledgement of AggregationTask
|
||||
aggError chan error
|
||||
aggWg sync.WaitGroup
|
||||
mergeChannel chan struct{}
|
||||
@ -948,7 +947,6 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, c
|
||||
keccak: sha3.NewLegacyKeccak256(),
|
||||
hph: commitment.NewHexPatriciaHashed(length.Addr, nil, nil, nil, nil, nil),
|
||||
aggChannel: make(chan *AggregationTask),
|
||||
aggBackCh: make(chan struct{}),
|
||||
aggError: make(chan error, 1),
|
||||
mergeChannel: make(chan struct{}, 1),
|
||||
mergeError: make(chan error, 1),
|
||||
@ -1183,17 +1181,6 @@ func removeFiles(fType FileType, diffDir string, toRemove []*byEndBlockItem) err
|
||||
func (a *Aggregator) backgroundAggregation() {
|
||||
defer a.aggWg.Done()
|
||||
for aggTask := range a.aggChannel {
|
||||
typesLimit := Commitment
|
||||
if a.commitments {
|
||||
typesLimit = AccountHistory
|
||||
}
|
||||
for fType := FirstType; fType < typesLimit; fType++ {
|
||||
if fType < NumberOfStateTypes {
|
||||
a.updateArch(aggTask.bt[fType], fType, uint32(aggTask.blockTo))
|
||||
}
|
||||
a.addLocked(fType, &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo, tree: aggTask.bt[fType]})
|
||||
}
|
||||
a.aggBackCh <- struct{}{}
|
||||
if a.changesets {
|
||||
if historyD, historyI, bitmapD, bitmapI, err := aggTask.changes[Account].produceChangeSets(aggTask.blockFrom, aggTask.blockTo, AccountHistory, AccountBitmap); err == nil {
|
||||
var historyItem = &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo}
|
||||
@ -1259,6 +1246,10 @@ func (a *Aggregator) backgroundAggregation() {
|
||||
return
|
||||
}
|
||||
}
|
||||
typesLimit := Commitment
|
||||
if a.commitments {
|
||||
typesLimit = AccountHistory
|
||||
}
|
||||
for fType := FirstType; fType < typesLimit; fType++ {
|
||||
var err error
|
||||
if err = aggTask.changes[fType].closeFiles(); err != nil {
|
||||
@ -2902,8 +2893,13 @@ func (w *Writer) aggregateUpto(blockFrom, blockTo uint64) error {
|
||||
t = time.Now()
|
||||
// At this point, all the changes are gathered in 4 B-trees (accounts, code, storage and commitment) and removed from the database
|
||||
// What follows can be done in the 1st background goroutine
|
||||
for fType := FirstType; fType < typesLimit; fType++ {
|
||||
if fType < NumberOfStateTypes {
|
||||
w.a.updateArch(aggTask.bt[fType], fType, uint32(aggTask.blockTo))
|
||||
}
|
||||
w.a.addLocked(fType, &byEndBlockItem{startBlock: aggTask.blockFrom, endBlock: aggTask.blockTo, tree: aggTask.bt[fType]})
|
||||
}
|
||||
w.a.aggChannel <- &aggTask
|
||||
<-w.a.aggBackCh // Waiting for the B-tree based items have been added
|
||||
handoverTime := time.Since(t)
|
||||
if handoverTime > time.Second {
|
||||
log.Info("Long handover to background aggregation", "from", blockFrom, "to", blockTo, "composition", aggTime, "handover", time.Since(t))
|
||||
|
Loading…
Reference in New Issue
Block a user