From 061c4ef7440a36081277a2dad90ed0bc8343cfde Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Mon, 5 Dec 2022 10:23:49 +0700 Subject: [PATCH] E3: some clean (#6209) --- core/state/rw22.go | 3 ++- eth/stagedsync/exec3.go | 9 ++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/state/rw22.go b/core/state/rw22.go index 071eda01d..5b0d20564 100644 --- a/core/state/rw22.go +++ b/core/state/rw22.go @@ -184,6 +184,8 @@ func (rs *State22) RegisterSender(txTask *exec22.TxTask) bool { } func (rs *State22) CommitTxNum(sender *common.Address, txNum uint64) uint64 { + rs.txsDone.Add(1) + rs.triggerLock.Lock() defer rs.triggerLock.Unlock() count := uint64(0) @@ -199,7 +201,6 @@ func (rs *State22) CommitTxNum(sender *common.Address, txNum uint64) uint64 { delete(rs.senderTxNums, *sender) } } - rs.txsDone.Add(1) return count } diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 9c36b477f..28745dd19 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -552,7 +552,7 @@ Loop: if err := rs.ApplyState(applyTx, txTask, agg); err != nil { panic(fmt.Errorf("State22.Apply: %w", err)) } - rs.CommitTxNum(txTask.Sender, txTask.TxNum) + triggerCount.Add(rs.CommitTxNum(txTask.Sender, txTask.TxNum)) outputTxNum.Inc() outputBlockNum.Store(txTask.BlockNum) if err := rs.ApplyHistory(applyTx, txTask, agg); err != nil { @@ -668,8 +668,9 @@ func blockWithSenders(db kv.RoDB, tx kv.Tx, blockReader services.BlockReader, bl } func processResultQueue(rws *exec22.TxTaskQueue, outputTxNum *atomic2.Uint64, rs *state.State22, agg *state2.Aggregator22, applyTx kv.Tx, triggerCount, outputBlockNum, repeatCount *atomic2.Uint64, resultsSize *atomic2.Int64, onSuccess func()) { + var txTask *exec22.TxTask for rws.Len() > 0 && (*rws)[0].TxNum == outputTxNum.Load() { - txTask := heap.Pop(rws).(*exec22.TxTask) + txTask = heap.Pop(rws).(*exec22.TxTask) resultsSize.Add(-txTask.ResultsSize) if txTask.Error != nil || !rs.ReadsValid(txTask.ReadLists) { rs.AddWork(txTask) @@ -683,13 +684,15 @@ func processResultQueue(rws *exec22.TxTaskQueue, outputTxNum *atomic2.Uint64, rs } triggerCount.Add(rs.CommitTxNum(txTask.Sender, txTask.TxNum)) outputTxNum.Inc() - outputBlockNum.Store(txTask.BlockNum) onSuccess() if err := rs.ApplyHistory(applyTx, txTask, agg); err != nil { panic(fmt.Errorf("State22.Apply: %w", err)) } //fmt.Printf("Applied %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex) } + if txTask != nil { + outputBlockNum.Store(txTask.BlockNum) + } } func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, workerCount int, batchSize datasize.ByteSize, chainDb kv.RwDB,