This commit is contained in:
alex.sharov 2023-04-19 11:46:50 +07:00
parent c733596f59
commit 72502e18f0
2 changed files with 15 additions and 10 deletions

View File

@ -108,11 +108,11 @@ func (l *KvList) Swap(i, j int) {
// Tasks added by method `ReTry` have higher priority than tasks added by `Add`.
// Method `Add` expecting already-ordered (by priority) tasks - doesn't do any additional sorting of new tasks.
type QueueWithRetry struct {
closed bool
newTasks chan *TxTask
retires TxTaskQueue
retiresLock sync.Mutex
capacity int
newTasksClosed bool
newTasks chan *TxTask
retires TxTaskQueue
retiresLock sync.Mutex
capacity int
}
func NewQueueWithRetry(capacity int) *QueueWithRetry {
@ -146,7 +146,7 @@ func (q *QueueWithRetry) ReTry(t *TxTask) {
q.retiresLock.Lock()
heap.Push(&q.retires, t)
q.retiresLock.Unlock()
if q.closed {
if q.newTasksClosed {
return
}
select {
@ -220,15 +220,19 @@ func (q *QueueWithRetry) popNoWait() (task *TxTask, ok bool) {
return task, task != nil
}
// Close safe to call multiple times
func (q *QueueWithRetry) Close() {
if q.closed {
// NewTasksFinish closing `newTasks` channel
func (q *QueueWithRetry) NewTasksFinish() {
if q.newTasksClosed {
return
}
q.closed = true
q.newTasksClosed = true
close(q.newTasks)
}
// Close safe to call multiple times
func (q *QueueWithRetry) Close() {
}
// ResultsQueue thread-safe priority-queue of execution results
type ResultsQueue struct {
limit int

View File

@ -672,6 +672,7 @@ Loop:
}
if parallel {
in.NewTasksFinish()
if err := rwLoopG.Wait(); err != nil {
return err
}