From 69f3e1e99a3bd17826674d95f88f05aecf634234 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Thu, 25 Aug 2022 15:32:05 +0700 Subject: [PATCH] erigon22: step toward /tests --- cmd/state/exec22/txNums.go | 6 ++- eth/stagedsync/exec22.go | 4 +- eth/stagedsync/stage_bodies.go | 4 +- eth/stagedsync/stage_execute.go | 84 +++++++++------------------------ go.mod | 2 +- go.sum | 4 +- turbo/stages/mock_sentry.go | 5 ++ 7 files changed, 38 insertions(+), 71 deletions(-) diff --git a/cmd/state/exec22/txNums.go b/cmd/state/exec22/txNums.go index 64d94135c..5f22eb063 100644 --- a/cmd/state/exec22/txNums.go +++ b/cmd/state/exec22/txNums.go @@ -30,8 +30,12 @@ func (s *TxNums) Append(blockNum, maxTxnNum uint64) { panic(err) } s.nums = append(s.nums, maxTxnNum) + //fmt.Printf("append: %d, %d, %d\n", blockNum, maxTxnNum, len(s.nums)) +} +func (s *TxNums) Unwind(unwindTo uint64) { + s.nums = s.nums[:unwindTo] + //fmt.Printf("unwind: %d, %d\n", unwindTo, s.nums) } -func (s *TxNums) Unwind(unwindTo uint64) { s.nums = s.nums[:unwindTo] } func (s *TxNums) Find(endTxNumMinimax uint64) (ok bool, blockNum uint64) { blockNum = uint64(sort.Search(len(s.nums), func(i int) bool { return s.nums[i] > endTxNumMinimax diff --git a/eth/stagedsync/exec22.go b/eth/stagedsync/exec22.go index 3805ab45a..37d168ee8 100644 --- a/eth/stagedsync/exec22.go +++ b/eth/stagedsync/exec22.go @@ -226,11 +226,11 @@ loop: for blockNum = block; blockNum <= maxBlockNum; blockNum++ { atomic.StoreUint64(&inputBlockNum, blockNum) rules := chainConfig.Rules(blockNum) - if header, err = blockReader.HeaderByNumber(ctx, nil, blockNum); err != nil { + if header, err = blockReader.HeaderByNumber(ctx, applyTx, blockNum); err != nil { return err } blockHash := header.Hash() - b, _, err := blockReader.BlockWithSenders(ctx, nil, blockHash, blockNum) + b, _, err := blockReader.BlockWithSenders(ctx, applyTx, blockHash, blockNum) if err != nil { return err } diff --git a/eth/stagedsync/stage_bodies.go b/eth/stagedsync/stage_bodies.go index 5a4c0617c..1d029da40 100644 --- a/eth/stagedsync/stage_bodies.go +++ b/eth/stagedsync/stage_bodies.go @@ -198,12 +198,12 @@ Loop: } // Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call) - ok, lastBlockNum, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody) + ok, lastTxnNum, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody) if err != nil { return fmt.Errorf("WriteRawBodyIfNotExists: %w", err) } if cfg.historyV2 && ok { - cfg.txNums.Append(blockHeight, lastBlockNum) + cfg.txNums.Append(blockHeight, lastTxnNum) } if blockHeight > bodyProgress { diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 4e16733d8..a110be32b 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -8,7 +8,6 @@ import ( "math/big" "os" "os/signal" - "path" "runtime" "syscall" "time" @@ -16,7 +15,6 @@ import ( "github.com/c2h5oh/datasize" "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/cmp" - "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/erigon-lib/common/length" "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" @@ -292,57 +290,28 @@ func ExecBlock22(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx cont return nil } -func UnwindExec22(u *UnwindState, s *StageState, tx kv.RwTx, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) { - if u.UnwindPoint >= s.BlockNumber { - return nil - } - useExternalTx := tx != nil - if !useExternalTx { - tx, err = cfg.db.BeginRw(context.Background()) - if err != nil { - return err - } - defer tx.Rollback() - } - logPrefix := u.LogPrefix() - log.Info(fmt.Sprintf("[%s] Unwind Execution", logPrefix), "from", s.BlockNumber, "to", u.UnwindPoint) - - //rs := state.NewState22() - aggDir := path.Join(cfg.dirs.DataDir, "agg22") - dir.MustExist(aggDir) - agg, err := libstate.NewAggregator22(aggDir, AggregationStep) - if err != nil { - return err - } - defer agg.Close() - - prevStageProgress, err := senderStageProgress(tx, cfg.db) - if err != nil { - return err - } - - agg.SetLogPrefix(logPrefix) - agg.SetTxNum(cfg.txNums.MaxOf(prevStageProgress)) +func unwindExec22(u *UnwindState, s *StageState, tx kv.RwTx, ctx context.Context, cfg ExecuteBlockCfg) (err error) { + cfg.agg.SetLogPrefix(s.LogPrefix()) rs := state.NewState22() // unwind all txs of u.UnwindPoint block. 1 txn in begin/end of block - system txs - if err := rs.Unwind(ctx, tx, cfg.txNums.MaxOf(u.UnwindPoint), agg, cfg.accumulator); err != nil { + if err := rs.Unwind(ctx, tx, cfg.txNums.MaxOf(u.UnwindPoint), cfg.agg, cfg.accumulator); err != nil { return fmt.Errorf("State22.Unwind: %w", err) } if err := rs.Flush(tx); err != nil { return fmt.Errorf("State22.Flush: %w", err) } - /* - if err := rawdb.TruncateReceipts(tx, u.UnwindPoint+1); err != nil { - return fmt.Errorf("truncate receipts: %w", err) - } - if err := rawdb.TruncateBorReceipts(tx, u.UnwindPoint+1); err != nil { - return fmt.Errorf("truncate bor receipts: %w", err) - } - if err := rawdb.DeleteNewerEpochs(tx, u.UnwindPoint+1); err != nil { - return fmt.Errorf("delete newer epochs: %w", err) - } + if err := rawdb.TruncateReceipts(tx, u.UnwindPoint+1); err != nil { + return fmt.Errorf("truncate receipts: %w", err) + } + if err := rawdb.TruncateBorReceipts(tx, u.UnwindPoint+1); err != nil { + return fmt.Errorf("truncate bor receipts: %w", err) + } + if err := rawdb.DeleteNewerEpochs(tx, u.UnwindPoint+1); err != nil { + return fmt.Errorf("delete newer epochs: %w", err) + } + /* // Truncate CallTraceSet keyStart := dbutils.EncodeBlockNumber(u.UnwindPoint + 1) c, err := tx.RwCursorDupSort(kv.CallTraceSet) @@ -360,16 +329,6 @@ func UnwindExec22(u *UnwindState, s *StageState, tx kv.RwTx, ctx context.Context } } */ - if err = u.Done(tx); err != nil { - return err - } - - if !useExternalTx { - if err = tx.Commit(); err != nil { - return err - } - } - return nil } @@ -607,11 +566,6 @@ func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, current } func UnwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) { - if cfg.exec22 { - return UnwindExec22(u, s, tx, ctx, cfg, initialCycle) - } - - quit := ctx.Done() if u.UnwindPoint >= s.BlockNumber { return nil } @@ -626,7 +580,7 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, ctx context logPrefix := u.LogPrefix() log.Info(fmt.Sprintf("[%s] Unwind Execution", logPrefix), "from", s.BlockNumber, "to", u.UnwindPoint) - if err = unwindExecutionStage(u, s, tx, quit, cfg, initialCycle); err != nil { + if err = unwindExecutionStage(u, s, tx, ctx, cfg, initialCycle); err != nil { return err } if err = u.Done(tx); err != nil { @@ -641,7 +595,7 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, ctx context return nil } -func unwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, quit <-chan struct{}, cfg ExecuteBlockCfg, initialCycle bool) error { +func unwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) error { logPrefix := s.LogPrefix() stateBucket := kv.PlainState storageKeyLength := length.Addr + length.Incarnation + length.Hash @@ -661,9 +615,13 @@ func unwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, quit <-chan accumulator.StartChange(u.UnwindPoint, hash, txs, true) } + if cfg.exec22 { + return unwindExec22(u, s, tx, ctx, cfg) + } + changes := etl.NewCollector(logPrefix, cfg.dirs.Tmp, etl.NewOldestEntryBuffer(etl.BufferOptimalSize)) defer changes.Close() - errRewind := changeset.RewindData(tx, s.BlockNumber, u.UnwindPoint, changes, quit) + errRewind := changeset.RewindData(tx, s.BlockNumber, u.UnwindPoint, changes, ctx.Done()) if errRewind != nil { return fmt.Errorf("getting rewind data: %w", errRewind) } @@ -736,7 +694,7 @@ func unwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, quit <-chan } return nil - }, etl.TransformArgs{Quit: quit}); err != nil { + }, etl.TransformArgs{Quit: ctx.Done()}); err != nil { return err } diff --git a/go.mod b/go.mod index 944ba2010..b266d30c0 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon go 1.18 require ( - github.com/ledgerwatch/erigon-lib v0.0.0-20220824090247-23c7f503e0fb + github.com/ledgerwatch/erigon-lib v0.0.0-20220825082934-55029084e0c7 github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20220809023834-6309df4da4b1 github.com/ledgerwatch/log/v3 v3.4.1 github.com/ledgerwatch/secp256k1 v1.0.0 diff --git a/go.sum b/go.sum index bdace204a..0e16ba04f 100644 --- a/go.sum +++ b/go.sum @@ -390,8 +390,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20220824090247-23c7f503e0fb h1:2iP9NGTsxq2TJJctXbNT5w1hYVhibP8bV5z1NiipuGU= -github.com/ledgerwatch/erigon-lib v0.0.0-20220824090247-23c7f503e0fb/go.mod h1:LRF+TmrtTi//SEqtlKTBjG63GegA7b4gGUbuCQy76bQ= +github.com/ledgerwatch/erigon-lib v0.0.0-20220825082934-55029084e0c7 h1:lwm3cPefkOnt61ccTDHNETSvTXHqXqq2CKBtRiRUceM= +github.com/ledgerwatch/erigon-lib v0.0.0-20220825082934-55029084e0c7/go.mod h1:LRF+TmrtTi//SEqtlKTBjG63GegA7b4gGUbuCQy76bQ= github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20220809023834-6309df4da4b1 h1:qRIJu6cs6fbI8L52DSdPF27j3sOrEriXz1zQSuQvYpA= github.com/ledgerwatch/erigon-snapshot v1.0.1-0.20220809023834-6309df4da4b1/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc= diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 0d117c99d..eb9f3c575 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -239,6 +239,11 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey cfg.DeprecatedTxPool.Disable = !withTxPool cfg.DeprecatedTxPool.StartOnInit = true + _ = db.Update(ctx, func(tx kv.RwTx) error { + _, _ = rawdb.HistoryV2.WriteOnce(tx, cfg.HistoryV2) + return nil + }) + allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap) var txNums *exec22.TxNums