e3: result size atomic change after processResultQueue (#6975)

This commit is contained in:
Alex Sharov 2023-02-28 16:35:13 +07:00 committed by GitHub
parent edcd271f72
commit 570ff33e88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -259,15 +259,15 @@ func ExecV3(ctx context.Context,
resultsSize.Add(added)
}
var processedTxNum, conflicts, processedBlockNum uint64
if err := func() (err error) {
processedResultSize, processedTxNum, conflicts, processedBlockNum, err := func() (processedResultSize int64, processedTxNum, conflicts, processedBlockNum uint64, err error) {
rwsLock.Lock()
defer rwsLock.Unlock()
processedTxNum, conflicts, processedBlockNum, err = processResultQueue(rws, outputTxNum.Load(), rs, agg, tx, triggerCount, resultsSize, notifyReceived, applyWorker)
return err
}(); err != nil {
return processResultQueue(rws, outputTxNum.Load(), rs, agg, tx, triggerCount, notifyReceived, applyWorker)
}()
if err != nil {
return err
}
resultsSize.Add(-processedResultSize)
repeatCount.Add(conflicts)
if processedBlockNum > lastBlockNum {
outputBlockNum.Set(processedBlockNum)
@ -375,10 +375,11 @@ func ExecV3(ctx context.Context,
}
}
applyWorker.ResetTx(tx)
processedTxNum, conflicts, processedBlockNum, err := processResultQueue(rws, outputTxNum.Load(), rs, agg, tx, triggerCount, resultsSize, func() {}, applyWorker)
processedResultSize, processedTxNum, conflicts, processedBlockNum, err := processResultQueue(rws, outputTxNum.Load(), rs, agg, tx, triggerCount, func() {}, applyWorker)
if err != nil {
return err
}
resultsSize.Add(-processedResultSize)
repeatCount.Add(conflicts)
if processedBlockNum > 0 {
outputBlockNum.Set(processedBlockNum)
@ -778,12 +779,12 @@ func blockWithSenders(db kv.RoDB, tx kv.Tx, blockReader services.BlockReader, bl
return b, nil
}
func processResultQueue(rws *exec22.TxTaskQueue, outputTxNumIn uint64, rs *state.StateV3, agg *state2.AggregatorV3, applyTx kv.Tx, triggerCount *atomic2.Uint64, resultsSize *atomic2.Int64, onSuccess func(), applyWorker *exec3.Worker) (outputTxNum, conflicts, processedBlockNum uint64, err error) {
func processResultQueue(rws *exec22.TxTaskQueue, outputTxNumIn uint64, rs *state.StateV3, agg *state2.AggregatorV3, applyTx kv.Tx, triggerCount *atomic2.Uint64, onSuccess func(), applyWorker *exec3.Worker) (resultSize int64, outputTxNum, conflicts, processedBlockNum uint64, err error) {
var i int
outputTxNum = outputTxNumIn
for rws.Len() > 0 && (*rws)[0].TxNum == outputTxNum {
txTask := heap.Pop(rws).(*exec22.TxTask)
resultsSize.Add(-txTask.ResultsSize)
resultSize += txTask.ResultsSize
if txTask.Error != nil || !rs.ReadsValid(txTask.ReadLists) {
conflicts++
@ -796,24 +797,24 @@ func processResultQueue(rws *exec22.TxTaskQueue, outputTxNumIn uint64, rs *state
// resolve first conflict right here: it's faster and conflict-free
applyWorker.RunTxTask(txTask)
if txTask.Error != nil {
return outputTxNum, conflicts, processedBlockNum, txTask.Error
return resultSize, outputTxNum, conflicts, processedBlockNum, txTask.Error
}
i++
}
if err := rs.ApplyState(applyTx, txTask, agg); err != nil {
return outputTxNum, conflicts, processedBlockNum, fmt.Errorf("StateV3.Apply: %w", err)
return resultSize, outputTxNum, conflicts, processedBlockNum, fmt.Errorf("StateV3.Apply: %w", err)
}
triggerCount.Add(rs.CommitTxNum(txTask.Sender, txTask.TxNum))
outputTxNum++
onSuccess()
if err := rs.ApplyHistory(txTask, agg); err != nil {
return outputTxNum, conflicts, processedBlockNum, fmt.Errorf("StateV3.Apply: %w", err)
return resultSize, outputTxNum, conflicts, processedBlockNum, fmt.Errorf("StateV3.Apply: %w", err)
}
//fmt.Printf("Applied %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
processedBlockNum = txTask.BlockNum
}
return outputTxNum, conflicts, processedBlockNum, nil
return resultSize, outputTxNum, conflicts, processedBlockNum, nil
}
func reconstituteStep(last bool,