erigon22: unwind code #587

This commit is contained in:
Alex Sharov 2022-08-15 10:27:08 +07:00 committed by GitHub
parent e160c1ad9c
commit 4945162dd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -22,7 +22,6 @@ import (
math2 "math" math2 "math"
"github.com/RoaringBitmap/roaring/roaring64" "github.com/RoaringBitmap/roaring/roaring64"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
) )
@ -37,6 +36,7 @@ type Aggregator22 struct {
tracesFrom *InvertedIndex tracesFrom *InvertedIndex
tracesTo *InvertedIndex tracesTo *InvertedIndex
txNum uint64 txNum uint64
logPrefix string
rwTx kv.RwTx rwTx kv.RwTx
} }
@ -103,6 +103,8 @@ func (a *Aggregator22) Close() {
} }
} }
func (a *Aggregator22) SetLogPrefix(v string) { a.logPrefix = v }
func (a *Aggregator22) SetTx(tx kv.RwTx) { func (a *Aggregator22) SetTx(tx kv.RwTx) {
a.rwTx = tx a.rwTx = tx
a.accounts.SetTx(tx) a.accounts.SetTx(tx)
@ -285,11 +287,11 @@ func (a *Aggregator22) Unwind(ctx context.Context, txUnwindTo uint64, stateLoad
return nil return nil
} }
changes := etl.NewCollector("", "", etl.NewOldestEntryBuffer(etl.BufferOptimalSize)) stateChanges := etl.NewCollector(a.logPrefix, "", etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
defer changes.Close() defer stateChanges.Close()
if err := a.accounts.pruneF(step, txUnwindTo, math2.MaxUint64, func(txNum uint64, k, v []byte) error { if err := a.accounts.pruneF(step, txUnwindTo, math2.MaxUint64, func(txNum uint64, k, v []byte) error {
if err := changes.Collect(libcommon.Copy(k), libcommon.Copy(v)); err != nil { if err := stateChanges.Collect(k, v); err != nil {
return err return err
} }
return nil return nil
@ -297,7 +299,7 @@ func (a *Aggregator22) Unwind(ctx context.Context, txUnwindTo uint64, stateLoad
return err return err
} }
if err := a.storage.pruneF(step, txUnwindTo, math2.MaxUint64, func(txNu uint64, k, v []byte) error { if err := a.storage.pruneF(step, txUnwindTo, math2.MaxUint64, func(txNu uint64, k, v []byte) error {
if err := changes.Collect(libcommon.Copy(k), libcommon.Copy(v)); err != nil { if err := stateChanges.Collect(k, v); err != nil {
return err return err
} }
return nil return nil
@ -305,13 +307,25 @@ func (a *Aggregator22) Unwind(ctx context.Context, txUnwindTo uint64, stateLoad
return err return err
} }
if err := changes.Load(a.rwTx, kv.PlainState, stateLoad, etl.TransformArgs{Quit: ctx.Done()}); err != nil { if err := stateChanges.Load(a.rwTx, kv.PlainState, stateLoad, etl.TransformArgs{Quit: ctx.Done()}); err != nil {
return err return err
} }
if err := a.code.prune(step, txUnwindTo, math2.MaxUint64); err != nil { codeChanges := etl.NewCollector(a.logPrefix, "", etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
defer codeChanges.Close()
if err := a.code.pruneF(step, txUnwindTo, math2.MaxUint64, func(txNum uint64, k, v []byte) error {
if err := codeChanges.Collect(k, v); err != nil {
return err return err
} }
return nil
}); err != nil {
return err
}
if err := codeChanges.Load(a.rwTx, kv.PlainContractCode, etl.IdentityLoadFunc, etl.TransformArgs{Quit: ctx.Done()}); err != nil {
return err
}
if err := a.logAddrs.prune(txUnwindTo, math2.MaxUint64); err != nil { if err := a.logAddrs.prune(txUnwindTo, math2.MaxUint64); err != nil {
return err return err
} }
@ -689,6 +703,7 @@ func (a *Aggregator22) AddStoragePrev(addr []byte, loc []byte, prev []byte) erro
return nil return nil
} }
// AddCodePrev - addr+inc => code
func (a *Aggregator22) AddCodePrev(addr []byte, prev []byte) error { func (a *Aggregator22) AddCodePrev(addr []byte, prev []byte) error {
if err := a.code.AddPrevValue(addr, nil, prev); err != nil { if err := a.code.AddPrevValue(addr, nil, prev); err != nil {
return err return err