diff --git a/cmd/state/exec22/txtask.go b/cmd/state/exec22/txtask.go index 6103f446b..247f5874f 100644 --- a/cmd/state/exec22/txtask.go +++ b/cmd/state/exec22/txtask.go @@ -108,11 +108,11 @@ func (l *KvList) Swap(i, j int) { // Tasks added by method `ReTry` have higher priority than tasks added by `Add`. // Method `Add` expecting already-ordered (by priority) tasks - doesn't do any additional sorting of new tasks. type QueueWithRetry struct { - closed bool - newTasks chan *TxTask - retires TxTaskQueue - retiresLock sync.Mutex - capacity int + newTasksClosed bool + newTasks chan *TxTask + retires TxTaskQueue + retiresLock sync.Mutex + capacity int } func NewQueueWithRetry(capacity int) *QueueWithRetry { @@ -146,7 +146,7 @@ func (q *QueueWithRetry) ReTry(t *TxTask) { q.retiresLock.Lock() heap.Push(&q.retires, t) q.retiresLock.Unlock() - if q.closed { + if q.newTasksClosed { return } select { @@ -220,15 +220,19 @@ func (q *QueueWithRetry) popNoWait() (task *TxTask, ok bool) { return task, task != nil } -// Close safe to call multiple times -func (q *QueueWithRetry) Close() { - if q.closed { +// NewTasksFinish closing `newTasks` channel +func (q *QueueWithRetry) NewTasksFinish() { + if q.newTasksClosed { return } - q.closed = true + q.newTasksClosed = true close(q.newTasks) } +// Close safe to call multiple times +func (q *QueueWithRetry) Close() { +} + // ResultsQueue thread-safe priority-queue of execution results type ResultsQueue struct { limit int diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index b8f065160..cc84935dd 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -672,6 +672,7 @@ Loop: } if parallel { + in.NewTasksFinish() if err := rwLoopG.Wait(); err != nil { return err }