From 1ce5610eea39651b94517377b1c49dc7b25a6fed Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sun, 9 Oct 2022 20:16:26 +0700 Subject: [PATCH] e3: agg atomic (#671) --- state/aggregator22.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/state/aggregator22.go b/state/aggregator22.go index 4a9efbaa7..add19f958 100644 --- a/state/aggregator22.go +++ b/state/aggregator22.go @@ -45,7 +45,7 @@ type Aggregator22 struct { txNum uint64 logPrefix string rwTx kv.RwTx - maxTxNum uint64 + maxTxNum atomic.Uint64 backgroundResult *BackgroundResult working atomic.Bool @@ -281,7 +281,7 @@ func (a *Aggregator22) buildFilesInBackground(step uint64, collation Agg22Collat a.integrateFiles(sf, step*a.aggregationStep, (step+1)*a.aggregationStep) log.Info("[snapshots] history build done", "step", fmt.Sprintf("%d-%d", step, step+1)) maxSpan := uint64(32) * a.aggregationStep - for r := a.findMergeRange(a.maxTxNum, maxSpan); r.any(); r = a.findMergeRange(a.maxTxNum, maxSpan) { + for r := a.findMergeRange(a.maxTxNum.Load(), maxSpan); r.any(); r = a.findMergeRange(a.maxTxNum.Load(), maxSpan) { outs := a.staticFilesInRange(r) defer func() { if closeAll { @@ -452,10 +452,10 @@ func (a *Aggregator22) prune(txFrom, txTo uint64) error { } func (a *Aggregator22) LogStats(tx kv.Tx, tx2block func(endTxNumMinimax uint64) uint64) { - if a.maxTxNum == 0 { + if a.maxTxNum.Load() == 0 { return } - histBlockNumProgress := tx2block(a.maxTxNum) + histBlockNumProgress := tx2block(a.maxTxNum.Load()) str := make([]string, 0, a.accounts.InvertedIndex.files.Len()) a.accounts.InvertedIndex.files.Ascend(func(it *filesItem) bool { bn := tx2block(it.endTxNum) @@ -482,13 +482,13 @@ func (a *Aggregator22) LogStats(tx kv.Tx, tx2block func(endTxNumMinimax uint64) common2.ReadMemStats(&m) log.Info("[Snapshots] History Stat", "blocks", fmt.Sprintf("%dk", (histBlockNumProgress+1)/1000), - "txs", fmt.Sprintf("%dk", a.maxTxNum/1000), + "txs", fmt.Sprintf("%dk", a.maxTxNum.Load()/1000), "txNum2blockNum", strings.Join(str, ","), "first_history_idx_in_db", firstHistoryIndexBlockInDB, "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) } -func (a *Aggregator22) EndTxNumMinimax() uint64 { return a.maxTxNum } +func (a *Aggregator22) EndTxNumMinimax() uint64 { return a.maxTxNum.Load() } func (a *Aggregator22) recalcMaxTxNum() { min := a.accounts.endTxNumMinimax() if txNum := a.storage.endTxNumMinimax(); txNum < min { @@ -509,7 +509,7 @@ func (a *Aggregator22) recalcMaxTxNum() { if txNum := a.tracesTo.endTxNumMinimax(); txNum < min { min = txNum } - a.maxTxNum = min + a.maxTxNum.Store(min) } type Ranges22 struct { @@ -754,15 +754,15 @@ func (a *Aggregator22) ReadyToFinishTx() bool { } func (a *Aggregator22) FinishTx() error { - if (a.txNum + 1) <= a.maxTxNum+2*a.aggregationStep { // Leave one step worth in the DB + if (a.txNum + 1) <= a.maxTxNum.Load()+2*a.aggregationStep { // Leave one step worth in the DB return nil } - step := a.maxTxNum / a.aggregationStep + step := a.maxTxNum.Load() / a.aggregationStep if a.working.Load() { return nil } - if err := a.prune(0, a.maxTxNum); err != nil { + if err := a.prune(0, a.maxTxNum.Load()); err != nil { return err }