e3: close input chan when no work left (#7340)

This commit is contained in:
Alex Sharov 2023-04-19 11:47:53 +07:00 committed by GitHub
parent c733596f59
commit a5b9f2d774
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 10 deletions

View File

@ -108,11 +108,11 @@ func (l *KvList) Swap(i, j int) {
// Tasks added by method `ReTry` have higher priority than tasks added by `Add`.
// Method `Add` expecting already-ordered (by priority) tasks - doesn't do any additional sorting of new tasks.
type QueueWithRetry struct {
closed bool
newTasks chan *TxTask
retires TxTaskQueue
retiresLock sync.Mutex
capacity int
newTasksClosed bool
newTasks chan *TxTask
retires TxTaskQueue
retiresLock sync.Mutex
capacity int
}
func NewQueueWithRetry(capacity int) *QueueWithRetry {
@ -146,7 +146,7 @@ func (q *QueueWithRetry) ReTry(t *TxTask) {
q.retiresLock.Lock()
heap.Push(&q.retires, t)
q.retiresLock.Unlock()
if q.closed {
if q.newTasksClosed {
return
}
select {
@ -220,15 +220,19 @@ func (q *QueueWithRetry) popNoWait() (task *TxTask, ok bool) {
return task, task != nil
}
// Close safe to call multiple times
func (q *QueueWithRetry) Close() {
if q.closed {
// NewTasksFinish closing `newTasks` channel
func (q *QueueWithRetry) NewTasksFinish() {
if q.newTasksClosed {
return
}
q.closed = true
q.newTasksClosed = true
close(q.newTasks)
}
// Close safe to call multiple times
func (q *QueueWithRetry) Close() {
}
// ResultsQueue thread-safe priority-queue of execution results
type ResultsQueue struct {
limit int

View File

@ -672,6 +672,7 @@ Loop:
}
if parallel {
in.NewTasksFinish()
if err := rwLoopG.Wait(); err != nil {
return err
}