e3: agg cancel background jobs (#808)

This commit is contained in:
Alex Sharov 2022-12-29 15:04:04 +07:00 committed by GitHub
parent 3105d7ba29
commit f36515a420
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -60,10 +60,13 @@ type Aggregator22 struct {
working atomic.Bool
workingMerge atomic.Bool
warmupWorking atomic.Bool
ctx context.Context
ctxCancel context.CancelFunc
}
func NewAggregator22(dir, tmpdir string, aggregationStep uint64, db kv.RoDB) (*Aggregator22, error) {
a := &Aggregator22{dir: dir, tmpdir: tmpdir, aggregationStep: aggregationStep, backgroundResult: &BackgroundResult{}, db: db, keepInDB: 2 * aggregationStep}
func NewAggregator22(ctx context.Context, dir, tmpdir string, aggregationStep uint64, db kv.RoDB) (*Aggregator22, error) {
ctx, ctxCancel := context.WithCancel(ctx)
a := &Aggregator22{ctx: ctx, ctxCancel: ctxCancel, dir: dir, tmpdir: tmpdir, aggregationStep: aggregationStep, backgroundResult: &BackgroundResult{}, db: db, keepInDB: 2 * aggregationStep}
return a, nil
}
@ -97,6 +100,7 @@ func (a *Aggregator22) ReopenFiles() error {
}
func (a *Aggregator22) Close() {
a.ctxCancel()
a.closeFiles()
}
@ -975,7 +979,7 @@ func (a *Aggregator22) deleteFiles(outs SelectedStaticFiles22) error {
// we can set it to 0, because no re-org on this blocks are possible
func (a *Aggregator22) KeepInDB(v uint64) { a.keepInDB = v }
func (a *Aggregator22) BuildFilesInBackground(ctx context.Context, db kv.RoDB) error {
func (a *Aggregator22) BuildFilesInBackground(db kv.RoDB) error {
if (a.txNum.Load() + 1) <= a.maxTxNum.Load()+a.aggregationStep+a.keepInDB { // Leave one step worth in the DB
return nil
}
@ -1004,7 +1008,7 @@ func (a *Aggregator22) BuildFilesInBackground(ctx context.Context, db kv.RoDB) e
// - to remove old data from db as early as possible
// - during files build, may happen commit of new data. on each loop step getting latest id in db
for step < lastIdInDB(db, a.accounts.indexKeysTable)/a.aggregationStep {
if err := a.buildFilesInBackground(ctx, step, db); err != nil {
if err := a.buildFilesInBackground(a.ctx, step, db); err != nil {
log.Warn("buildFilesInBackground", "err", err)
break
}
@ -1017,7 +1021,7 @@ func (a *Aggregator22) BuildFilesInBackground(ctx context.Context, db kv.RoDB) e
defer a.workingMerge.Store(true)
go func() {
defer a.workingMerge.Store(false)
if err := a.MergeLoop(ctx, 1); err != nil {
if err := a.MergeLoop(a.ctx, 1); err != nil {
log.Warn("merge", "err", err)
}
}()