cumulative index stage: ctrl+C support

This commit is contained in:
alex.sharov 2023-04-19 13:14:07 +07:00
parent d5a06a2d4f
commit 1fb3b6f574
2 changed files with 11 additions and 20 deletions

View File

@ -111,11 +111,11 @@ func (l *KvList) Swap(i, j int) {
// Tasks added by method `ReTry` have higher priority than tasks added by `Add`.
// Method `Add` expecting already-ordered (by priority) tasks - doesn't do any additional sorting of new tasks.
type QueueWithRetry struct {
newTasksClosed bool
newTasks chan *TxTask
retires TxTaskQueue
retiresLock sync.Mutex
capacity int
closed bool
newTasks chan *TxTask
retires TxTaskQueue
retiresLock sync.Mutex
capacity int
}
func NewQueueWithRetry(capacity int) *QueueWithRetry {
@ -157,7 +157,7 @@ func (q *QueueWithRetry) ReTry(t *TxTask) {
q.retiresLock.Lock()
heap.Push(&q.retires, t)
q.retiresLock.Unlock()
if q.newTasksClosed {
if q.closed {
return
}
select {
@ -232,18 +232,13 @@ func (q *QueueWithRetry) popNoWait() (task *TxTask, ok bool) {
return task, task != nil
}
// NewTasksFinish closing `newTasks` channel
func (q *QueueWithRetry) NewTasksFinish() {
if q.newTasksClosed {
return
}
q.newTasksClosed = true
log.Warn("[dbg] closing newTasks")
close(q.newTasks)
}
// Close safe to call multiple times
func (q *QueueWithRetry) Close() {
if q.closed {
return
}
q.closed = true
close(q.newTasks)
}
// ResultsQueue thread-safe priority-queue of execution results

View File

@ -275,8 +275,6 @@ func ExecV3(ctx context.Context,
if parallel {
// `rwLoop` lives longer than `applyLoop`
rwLoop := func(ctx context.Context) error {
defer stopWorkers()
tx, err := chainDb.BeginRw(ctx)
if err != nil {
return err
@ -429,7 +427,6 @@ func ExecV3(ctx context.Context,
if err = tx.Commit(); err != nil {
return err
}
rws.Close()
return nil
}
@ -683,7 +680,6 @@ Loop:
}
if parallel {
in.NewTasksFinish()
if err := rwLoopG.Wait(); err != nil {
return err
}