mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
erigon22: step toward /tests
This commit is contained in:
parent
79d02e24cb
commit
69f3e1e99a
@ -30,8 +30,12 @@ func (s *TxNums) Append(blockNum, maxTxnNum uint64) {
|
||||
panic(err)
|
||||
}
|
||||
s.nums = append(s.nums, maxTxnNum)
|
||||
//fmt.Printf("append: %d, %d, %d\n", blockNum, maxTxnNum, len(s.nums))
|
||||
}
|
||||
func (s *TxNums) Unwind(unwindTo uint64) {
|
||||
s.nums = s.nums[:unwindTo]
|
||||
//fmt.Printf("unwind: %d, %d\n", unwindTo, s.nums)
|
||||
}
|
||||
func (s *TxNums) Unwind(unwindTo uint64) { s.nums = s.nums[:unwindTo] }
|
||||
func (s *TxNums) Find(endTxNumMinimax uint64) (ok bool, blockNum uint64) {
|
||||
blockNum = uint64(sort.Search(len(s.nums), func(i int) bool {
|
||||
return s.nums[i] > endTxNumMinimax
|
||||
|
@ -226,11 +226,11 @@ loop:
|
||||
for blockNum = block; blockNum <= maxBlockNum; blockNum++ {
|
||||
atomic.StoreUint64(&inputBlockNum, blockNum)
|
||||
rules := chainConfig.Rules(blockNum)
|
||||
if header, err = blockReader.HeaderByNumber(ctx, nil, blockNum); err != nil {
|
||||
if header, err = blockReader.HeaderByNumber(ctx, applyTx, blockNum); err != nil {
|
||||
return err
|
||||
}
|
||||
blockHash := header.Hash()
|
||||
b, _, err := blockReader.BlockWithSenders(ctx, nil, blockHash, blockNum)
|
||||
b, _, err := blockReader.BlockWithSenders(ctx, applyTx, blockHash, blockNum)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -198,12 +198,12 @@ Loop:
|
||||
}
|
||||
|
||||
// Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call)
|
||||
ok, lastBlockNum, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody)
|
||||
ok, lastTxnNum, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody)
|
||||
if err != nil {
|
||||
return fmt.Errorf("WriteRawBodyIfNotExists: %w", err)
|
||||
}
|
||||
if cfg.historyV2 && ok {
|
||||
cfg.txNums.Append(blockHeight, lastBlockNum)
|
||||
cfg.txNums.Append(blockHeight, lastTxnNum)
|
||||
}
|
||||
|
||||
if blockHeight > bodyProgress {
|
||||
|
@ -8,7 +8,6 @@ import (
|
||||
"math/big"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path"
|
||||
"runtime"
|
||||
"syscall"
|
||||
"time"
|
||||
@ -16,7 +15,6 @@ import (
|
||||
"github.com/c2h5oh/datasize"
|
||||
"github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon-lib/common/cmp"
|
||||
"github.com/ledgerwatch/erigon-lib/common/dir"
|
||||
"github.com/ledgerwatch/erigon-lib/common/length"
|
||||
"github.com/ledgerwatch/erigon-lib/etl"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
@ -292,57 +290,28 @@ func ExecBlock22(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx cont
|
||||
return nil
|
||||
}
|
||||
|
||||
func UnwindExec22(u *UnwindState, s *StageState, tx kv.RwTx, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) {
|
||||
if u.UnwindPoint >= s.BlockNumber {
|
||||
return nil
|
||||
}
|
||||
useExternalTx := tx != nil
|
||||
if !useExternalTx {
|
||||
tx, err = cfg.db.BeginRw(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
}
|
||||
logPrefix := u.LogPrefix()
|
||||
log.Info(fmt.Sprintf("[%s] Unwind Execution", logPrefix), "from", s.BlockNumber, "to", u.UnwindPoint)
|
||||
|
||||
//rs := state.NewState22()
|
||||
aggDir := path.Join(cfg.dirs.DataDir, "agg22")
|
||||
dir.MustExist(aggDir)
|
||||
agg, err := libstate.NewAggregator22(aggDir, AggregationStep)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer agg.Close()
|
||||
|
||||
prevStageProgress, err := senderStageProgress(tx, cfg.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
agg.SetLogPrefix(logPrefix)
|
||||
agg.SetTxNum(cfg.txNums.MaxOf(prevStageProgress))
|
||||
func unwindExec22(u *UnwindState, s *StageState, tx kv.RwTx, ctx context.Context, cfg ExecuteBlockCfg) (err error) {
|
||||
cfg.agg.SetLogPrefix(s.LogPrefix())
|
||||
rs := state.NewState22()
|
||||
// unwind all txs of u.UnwindPoint block. 1 txn in begin/end of block - system txs
|
||||
if err := rs.Unwind(ctx, tx, cfg.txNums.MaxOf(u.UnwindPoint), agg, cfg.accumulator); err != nil {
|
||||
if err := rs.Unwind(ctx, tx, cfg.txNums.MaxOf(u.UnwindPoint), cfg.agg, cfg.accumulator); err != nil {
|
||||
return fmt.Errorf("State22.Unwind: %w", err)
|
||||
}
|
||||
if err := rs.Flush(tx); err != nil {
|
||||
return fmt.Errorf("State22.Flush: %w", err)
|
||||
}
|
||||
|
||||
/*
|
||||
if err := rawdb.TruncateReceipts(tx, u.UnwindPoint+1); err != nil {
|
||||
return fmt.Errorf("truncate receipts: %w", err)
|
||||
}
|
||||
if err := rawdb.TruncateBorReceipts(tx, u.UnwindPoint+1); err != nil {
|
||||
return fmt.Errorf("truncate bor receipts: %w", err)
|
||||
}
|
||||
if err := rawdb.DeleteNewerEpochs(tx, u.UnwindPoint+1); err != nil {
|
||||
return fmt.Errorf("delete newer epochs: %w", err)
|
||||
}
|
||||
if err := rawdb.TruncateReceipts(tx, u.UnwindPoint+1); err != nil {
|
||||
return fmt.Errorf("truncate receipts: %w", err)
|
||||
}
|
||||
if err := rawdb.TruncateBorReceipts(tx, u.UnwindPoint+1); err != nil {
|
||||
return fmt.Errorf("truncate bor receipts: %w", err)
|
||||
}
|
||||
if err := rawdb.DeleteNewerEpochs(tx, u.UnwindPoint+1); err != nil {
|
||||
return fmt.Errorf("delete newer epochs: %w", err)
|
||||
}
|
||||
|
||||
/*
|
||||
// Truncate CallTraceSet
|
||||
keyStart := dbutils.EncodeBlockNumber(u.UnwindPoint + 1)
|
||||
c, err := tx.RwCursorDupSort(kv.CallTraceSet)
|
||||
@ -360,16 +329,6 @@ func UnwindExec22(u *UnwindState, s *StageState, tx kv.RwTx, ctx context.Context
|
||||
}
|
||||
}
|
||||
*/
|
||||
if err = u.Done(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !useExternalTx {
|
||||
if err = tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -607,11 +566,6 @@ func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, current
|
||||
}
|
||||
|
||||
func UnwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) {
|
||||
if cfg.exec22 {
|
||||
return UnwindExec22(u, s, tx, ctx, cfg, initialCycle)
|
||||
}
|
||||
|
||||
quit := ctx.Done()
|
||||
if u.UnwindPoint >= s.BlockNumber {
|
||||
return nil
|
||||
}
|
||||
@ -626,7 +580,7 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, ctx context
|
||||
logPrefix := u.LogPrefix()
|
||||
log.Info(fmt.Sprintf("[%s] Unwind Execution", logPrefix), "from", s.BlockNumber, "to", u.UnwindPoint)
|
||||
|
||||
if err = unwindExecutionStage(u, s, tx, quit, cfg, initialCycle); err != nil {
|
||||
if err = unwindExecutionStage(u, s, tx, ctx, cfg, initialCycle); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = u.Done(tx); err != nil {
|
||||
@ -641,7 +595,7 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, ctx context
|
||||
return nil
|
||||
}
|
||||
|
||||
func unwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, quit <-chan struct{}, cfg ExecuteBlockCfg, initialCycle bool) error {
|
||||
func unwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) error {
|
||||
logPrefix := s.LogPrefix()
|
||||
stateBucket := kv.PlainState
|
||||
storageKeyLength := length.Addr + length.Incarnation + length.Hash
|
||||
@ -661,9 +615,13 @@ func unwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, quit <-chan
|
||||
accumulator.StartChange(u.UnwindPoint, hash, txs, true)
|
||||
}
|
||||
|
||||
if cfg.exec22 {
|
||||
return unwindExec22(u, s, tx, ctx, cfg)
|
||||
}
|
||||
|
||||
changes := etl.NewCollector(logPrefix, cfg.dirs.Tmp, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
|
||||
defer changes.Close()
|
||||
errRewind := changeset.RewindData(tx, s.BlockNumber, u.UnwindPoint, changes, quit)
|
||||
errRewind := changeset.RewindData(tx, s.BlockNumber, u.UnwindPoint, changes, ctx.Done())
|
||||
if errRewind != nil {
|
||||
return fmt.Errorf("getting rewind data: %w", errRewind)
|
||||
}
|
||||
@ -736,7 +694,7 @@ func unwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, quit <-chan
|
||||
}
|
||||
return nil
|
||||
|
||||
}, etl.TransformArgs{Quit: quit}); err != nil {
|
||||
}, etl.TransformArgs{Quit: ctx.Done()}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
2
go.mod
2
go.mod
@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
|
||||
go 1.18
|
||||
|
||||
require (
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20220824090247-23c7f503e0fb
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20220825082934-55029084e0c7
|
||||
github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20220809023834-6309df4da4b1
|
||||
github.com/ledgerwatch/log/v3 v3.4.1
|
||||
github.com/ledgerwatch/secp256k1 v1.0.0
|
||||
|
4
go.sum
4
go.sum
@ -390,8 +390,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
|
||||
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
|
||||
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20220824090247-23c7f503e0fb h1:2iP9NGTsxq2TJJctXbNT5w1hYVhibP8bV5z1NiipuGU=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20220824090247-23c7f503e0fb/go.mod h1:LRF+TmrtTi//SEqtlKTBjG63GegA7b4gGUbuCQy76bQ=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20220825082934-55029084e0c7 h1:lwm3cPefkOnt61ccTDHNETSvTXHqXqq2CKBtRiRUceM=
|
||||
github.com/ledgerwatch/erigon-lib v0.0.0-20220825082934-55029084e0c7/go.mod h1:LRF+TmrtTi//SEqtlKTBjG63GegA7b4gGUbuCQy76bQ=
|
||||
github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20220809023834-6309df4da4b1 h1:qRIJu6cs6fbI8L52DSdPF27j3sOrEriXz1zQSuQvYpA=
|
||||
github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20220809023834-6309df4da4b1/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
|
||||
github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc=
|
||||
|
@ -239,6 +239,11 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
|
||||
cfg.DeprecatedTxPool.Disable = !withTxPool
|
||||
cfg.DeprecatedTxPool.StartOnInit = true
|
||||
|
||||
_ = db.Update(ctx, func(tx kv.RwTx) error {
|
||||
_, _ = rawdb.HistoryV2.WriteOnce(tx, cfg.HistoryV2)
|
||||
return nil
|
||||
})
|
||||
|
||||
allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap)
|
||||
|
||||
var txNums *exec22.TxNums
|
||||
|
Loading…
Reference in New Issue
Block a user