e3: agg atomic (#671)

This commit is contained in:
Alex Sharov 2022-10-09 20:16:26 +07:00 committed by GitHub
parent 77d3a90936
commit 1ce5610eea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -45,7 +45,7 @@ type Aggregator22 struct {
txNum uint64
logPrefix string
rwTx kv.RwTx
maxTxNum uint64
maxTxNum atomic.Uint64
backgroundResult *BackgroundResult
working atomic.Bool
@ -281,7 +281,7 @@ func (a *Aggregator22) buildFilesInBackground(step uint64, collation Agg22Collat
a.integrateFiles(sf, step*a.aggregationStep, (step+1)*a.aggregationStep)
log.Info("[snapshots] history build done", "step", fmt.Sprintf("%d-%d", step, step+1))
maxSpan := uint64(32) * a.aggregationStep
for r := a.findMergeRange(a.maxTxNum, maxSpan); r.any(); r = a.findMergeRange(a.maxTxNum, maxSpan) {
for r := a.findMergeRange(a.maxTxNum.Load(), maxSpan); r.any(); r = a.findMergeRange(a.maxTxNum.Load(), maxSpan) {
outs := a.staticFilesInRange(r)
defer func() {
if closeAll {
@ -452,10 +452,10 @@ func (a *Aggregator22) prune(txFrom, txTo uint64) error {
}
func (a *Aggregator22) LogStats(tx kv.Tx, tx2block func(endTxNumMinimax uint64) uint64) {
if a.maxTxNum == 0 {
if a.maxTxNum.Load() == 0 {
return
}
histBlockNumProgress := tx2block(a.maxTxNum)
histBlockNumProgress := tx2block(a.maxTxNum.Load())
str := make([]string, 0, a.accounts.InvertedIndex.files.Len())
a.accounts.InvertedIndex.files.Ascend(func(it *filesItem) bool {
bn := tx2block(it.endTxNum)
@ -482,13 +482,13 @@ func (a *Aggregator22) LogStats(tx kv.Tx, tx2block func(endTxNumMinimax uint64)
common2.ReadMemStats(&m)
log.Info("[Snapshots] History Stat",
"blocks", fmt.Sprintf("%dk", (histBlockNumProgress+1)/1000),
"txs", fmt.Sprintf("%dk", a.maxTxNum/1000),
"txs", fmt.Sprintf("%dk", a.maxTxNum.Load()/1000),
"txNum2blockNum", strings.Join(str, ","),
"first_history_idx_in_db", firstHistoryIndexBlockInDB,
"alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
}
func (a *Aggregator22) EndTxNumMinimax() uint64 { return a.maxTxNum }
func (a *Aggregator22) EndTxNumMinimax() uint64 { return a.maxTxNum.Load() }
func (a *Aggregator22) recalcMaxTxNum() {
min := a.accounts.endTxNumMinimax()
if txNum := a.storage.endTxNumMinimax(); txNum < min {
@ -509,7 +509,7 @@ func (a *Aggregator22) recalcMaxTxNum() {
if txNum := a.tracesTo.endTxNumMinimax(); txNum < min {
min = txNum
}
a.maxTxNum = min
a.maxTxNum.Store(min)
}
type Ranges22 struct {
@ -754,15 +754,15 @@ func (a *Aggregator22) ReadyToFinishTx() bool {
}
func (a *Aggregator22) FinishTx() error {
if (a.txNum + 1) <= a.maxTxNum+2*a.aggregationStep { // Leave one step worth in the DB
if (a.txNum + 1) <= a.maxTxNum.Load()+2*a.aggregationStep { // Leave one step worth in the DB
return nil
}
step := a.maxTxNum / a.aggregationStep
step := a.maxTxNum.Load() / a.aggregationStep
if a.working.Load() {
return nil
}
if err := a.prune(0, a.maxTxNum); err != nil {
if err := a.prune(0, a.maxTxNum.Load()); err != nil {
return err
}