E3: some clean (#6209)

This commit is contained in:
Alex Sharov 2022-12-05 10:23:49 +07:00 committed by GitHub
parent 1398703bc5
commit 061c4ef744
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 8 additions and 4 deletions

View File

@ -184,6 +184,8 @@ func (rs *State22) RegisterSender(txTask *exec22.TxTask) bool {
}
func (rs *State22) CommitTxNum(sender *common.Address, txNum uint64) uint64 {
rs.txsDone.Add(1)
rs.triggerLock.Lock()
defer rs.triggerLock.Unlock()
count := uint64(0)
@ -199,7 +201,6 @@ func (rs *State22) CommitTxNum(sender *common.Address, txNum uint64) uint64 {
delete(rs.senderTxNums, *sender)
}
}
rs.txsDone.Add(1)
return count
}

View File

@ -552,7 +552,7 @@ Loop:
if err := rs.ApplyState(applyTx, txTask, agg); err != nil {
panic(fmt.Errorf("State22.Apply: %w", err))
}
rs.CommitTxNum(txTask.Sender, txTask.TxNum)
triggerCount.Add(rs.CommitTxNum(txTask.Sender, txTask.TxNum))
outputTxNum.Inc()
outputBlockNum.Store(txTask.BlockNum)
if err := rs.ApplyHistory(applyTx, txTask, agg); err != nil {
@ -668,8 +668,9 @@ func blockWithSenders(db kv.RoDB, tx kv.Tx, blockReader services.BlockReader, bl
}
func processResultQueue(rws *exec22.TxTaskQueue, outputTxNum *atomic2.Uint64, rs *state.State22, agg *state2.Aggregator22, applyTx kv.Tx, triggerCount, outputBlockNum, repeatCount *atomic2.Uint64, resultsSize *atomic2.Int64, onSuccess func()) {
var txTask *exec22.TxTask
for rws.Len() > 0 && (*rws)[0].TxNum == outputTxNum.Load() {
txTask := heap.Pop(rws).(*exec22.TxTask)
txTask = heap.Pop(rws).(*exec22.TxTask)
resultsSize.Add(-txTask.ResultsSize)
if txTask.Error != nil || !rs.ReadsValid(txTask.ReadLists) {
rs.AddWork(txTask)
@ -683,13 +684,15 @@ func processResultQueue(rws *exec22.TxTaskQueue, outputTxNum *atomic2.Uint64, rs
}
triggerCount.Add(rs.CommitTxNum(txTask.Sender, txTask.TxNum))
outputTxNum.Inc()
outputBlockNum.Store(txTask.BlockNum)
onSuccess()
if err := rs.ApplyHistory(applyTx, txTask, agg); err != nil {
panic(fmt.Errorf("State22.Apply: %w", err))
}
//fmt.Printf("Applied %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
}
if txTask != nil {
outputBlockNum.Store(txTask.BlockNum)
}
}
func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, workerCount int, batchSize datasize.ByteSize, chainDb kv.RwDB,