From 09c104803a7c0948bd7eca3de03c7cf3d9dd3a07 Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Thu, 28 Jul 2022 12:16:37 +0100 Subject: [PATCH] Split erigon2.2 and erigon2.3 prototypes (#4811) * Split erigon2.2 and erigon2.3 prototypes * Renaming * Interruptible and resumable erigon22 * Always regenerate trie * Introduce aggregator * Fixes * cleanup * Fix lint * Update to erigon-lib main Co-authored-by: Alexey Sharp --- cmd/state/commands/erigon22.go | 1020 ++++++++++++---------- cmd/state/commands/erigon23.go | 564 ++++++++++++ cmd/state/commands/replay_tx.go | 6 +- cmd/state/commands/state_recon.go | 12 +- cmd/state/commands/state_recon_1.go | 546 ------------ core/state/history_reader_nostate.go | 4 +- core/state/{recon_state_1.go => rw22.go} | 338 +++++-- core/state/state_recon_writer.go | 4 +- go.mod | 2 +- go.sum | 4 +- 10 files changed, 1393 insertions(+), 1107 deletions(-) create mode 100644 cmd/state/commands/erigon23.go delete mode 100644 cmd/state/commands/state_recon_1.go rename core/state/{recon_state_1.go => rw22.go} (58%) diff --git a/cmd/state/commands/erigon22.go b/cmd/state/commands/erigon22.go index 459318446..1d821960b 100644 --- a/cmd/state/commands/erigon22.go +++ b/cmd/state/commands/erigon22.go @@ -1,63 +1,247 @@ package commands import ( + "container/heap" "context" "errors" "fmt" - "math/bits" "os" "os/signal" "path" "path/filepath" "runtime" + "sync" + "sync/atomic" "syscall" "time" - "github.com/holiman/uint256" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" libstate "github.com/ledgerwatch/erigon-lib/state" - "github.com/ledgerwatch/erigon/turbo/services" - "github.com/ledgerwatch/log/v3" - "github.com/spf13/cobra" - + "github.com/ledgerwatch/erigon/cmd/sentry/sentry" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus/misc" "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/state" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/core/types/accounts" "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/eth/ethconfig" + "github.com/ledgerwatch/erigon/eth/stagedsync" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + datadir2 "github.com/ledgerwatch/erigon/node/nodecfg/datadir" + "github.com/ledgerwatch/erigon/p2p" "github.com/ledgerwatch/erigon/params" + "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/snapshotsync" + stages2 "github.com/ledgerwatch/erigon/turbo/stages" + "github.com/ledgerwatch/log/v3" + "github.com/spf13/cobra" + "golang.org/x/sync/semaphore" ) -const ( - AggregationStep = 3_125_000 /* number of transactions in smallest static file */ +var ( + reset bool ) func init() { - withBlock(erigon22Cmd) + erigon22Cmd.Flags().BoolVar(&reset, "reset", false, "Resets the state database and static files") withDataDir(erigon22Cmd) - withChain(erigon22Cmd) - rootCmd.AddCommand(erigon22Cmd) } var erigon22Cmd = &cobra.Command{ Use: "erigon22", - Short: "Exerimental command to re-execute blocks from beginning using erigon2 state representation and histoty (ugrade 2)", + Short: "Exerimental command to re-execute blocks from beginning using erigon2 histoty (ugrade 2)", RunE: func(cmd *cobra.Command, args []string) error { logger := log.New() - return Erigon22(genesis, chainConfig, logger) + return Erigon22(genesis, logger) }, } -func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.Logger) error { +type Worker22 struct { + lock sync.Locker + db kv.RoDB + tx kv.Tx + wg *sync.WaitGroup + rs *state.State22 + blockReader services.FullBlockReader + allSnapshots *snapshotsync.RoSnapshots + stateWriter *state.StateWriter22 + stateReader *state.StateReader22 + getHeader func(hash common.Hash, number uint64) *types.Header + ctx context.Context + engine consensus.Engine + txNums []uint64 + chainConfig *params.ChainConfig + logger log.Logger + genesis *core.Genesis + resultCh chan state.TxTask +} + +func NewWorker22(lock sync.Locker, db kv.RoDB, wg *sync.WaitGroup, rs *state.State22, + blockReader services.FullBlockReader, allSnapshots *snapshotsync.RoSnapshots, + txNums []uint64, chainConfig *params.ChainConfig, logger log.Logger, genesis *core.Genesis, + resultCh chan state.TxTask, +) *Worker22 { + return &Worker22{ + lock: lock, + db: db, + wg: wg, + rs: rs, + blockReader: blockReader, + allSnapshots: allSnapshots, + ctx: context.Background(), + stateWriter: state.NewStateWriter22(rs), + stateReader: state.NewStateReader22(rs), + txNums: txNums, + chainConfig: chainConfig, + logger: logger, + genesis: genesis, + resultCh: resultCh, + } +} + +func (rw *Worker22) ResetTx() { + if rw.tx != nil { + rw.tx.Rollback() + rw.tx = nil + } +} + +func (rw *Worker22) run() { + defer rw.wg.Done() + rw.getHeader = func(hash common.Hash, number uint64) *types.Header { + h, err := rw.blockReader.Header(rw.ctx, nil, hash, number) + if err != nil { + panic(err) + } + return h + } + rw.engine = initConsensusEngine(rw.chainConfig, rw.logger, rw.allSnapshots) + for txTask, ok := rw.rs.Schedule(); ok; txTask, ok = rw.rs.Schedule() { + rw.runTxTask(&txTask) + rw.resultCh <- txTask // Needs to have outside of the lock + } +} + +func (rw *Worker22) runTxTask(txTask *state.TxTask) { + rw.lock.Lock() + defer rw.lock.Unlock() + if rw.tx == nil { + var err error + if rw.tx, err = rw.db.BeginRo(rw.ctx); err != nil { + panic(err) + } + rw.stateReader.SetTx(rw.tx) + } + txTask.Error = nil + rw.stateReader.SetTxNum(txTask.TxNum) + rw.stateWriter.SetTxNum(txTask.TxNum) + rw.stateReader.ResetReadSet() + rw.stateWriter.ResetWriteSet() + ibs := state.New(rw.stateReader) + daoForkTx := rw.chainConfig.DAOForkSupport && rw.chainConfig.DAOForkBlock != nil && rw.chainConfig.DAOForkBlock.Uint64() == txTask.BlockNum && txTask.TxIndex == -1 + var err error + if txTask.BlockNum == 0 && txTask.TxIndex == -1 { + //fmt.Printf("txNum=%d, blockNum=%d, Genesis\n", txTask.TxNum, txTask.BlockNum) + // Genesis block + _, ibs, err = rw.genesis.ToBlock() + if err != nil { + panic(err) + } + } else if daoForkTx { + //fmt.Printf("txNum=%d, blockNum=%d, DAO fork\n", txTask.TxNum, txTask.BlockNum) + misc.ApplyDAOHardFork(ibs) + ibs.SoftFinalise() + } else if txTask.TxIndex == -1 { + // Block initialisation + } else if txTask.Final { + if txTask.BlockNum > 0 { + //fmt.Printf("txNum=%d, blockNum=%d, finalisation of the block\n", txTask.TxNum, txTask.BlockNum) + // End of block transaction in a block + if _, _, err := rw.engine.Finalize(rw.chainConfig, txTask.Header, ibs, txTask.Block.Transactions(), txTask.Block.Uncles(), nil /* receipts */, nil, nil, nil); err != nil { + panic(fmt.Errorf("finalize of block %d failed: %w", txTask.BlockNum, err)) + } + } + } else { + //fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex) + txHash := txTask.Tx.Hash() + gp := new(core.GasPool).AddGas(txTask.Tx.GetGas()) + vmConfig := vm.Config{NoReceipts: true, SkipAnalysis: core.SkipAnalysis(rw.chainConfig, txTask.BlockNum)} + contractHasTEVM := func(contractHash common.Hash) (bool, error) { return false, nil } + ibs.Prepare(txHash, txTask.BlockHash, txTask.TxIndex) + getHashFn := core.GetHashFn(txTask.Header, rw.getHeader) + blockContext := core.NewEVMBlockContext(txTask.Header, getHashFn, rw.engine, nil /* author */, contractHasTEVM) + msg, err := txTask.Tx.AsMessage(*types.MakeSigner(rw.chainConfig, txTask.BlockNum), txTask.Header.BaseFee, txTask.Rules) + if err != nil { + panic(err) + } + txContext := core.NewEVMTxContext(msg) + vmenv := vm.NewEVM(blockContext, txContext, ibs, rw.chainConfig, vmConfig) + if _, err = core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */); err != nil { + txTask.Error = err + //fmt.Printf("error=%v\n", err) + } + // Update the state with pending changes + ibs.SoftFinalise() + } + // Prepare read set, write set and balanceIncrease set and send for serialisation + if txTask.Error == nil { + txTask.BalanceIncreaseSet = ibs.BalanceIncreaseSet() + //for addr, bal := range txTask.BalanceIncreaseSet { + // fmt.Printf("[%x]=>[%d]\n", addr, &bal) + //} + if err = ibs.MakeWriteSet(txTask.Rules, rw.stateWriter); err != nil { + panic(err) + } + txTask.ReadLists = rw.stateReader.ReadSet() + txTask.WriteLists = rw.stateWriter.WriteSet() + txTask.AccountPrevs, txTask.AccountDels, txTask.StoragePrevs, txTask.CodePrevs = rw.stateWriter.PrevAndDels() + size := (20 + 32) * len(txTask.BalanceIncreaseSet) + for _, list := range txTask.ReadLists { + for _, b := range list.Keys { + size += len(b) + } + for _, b := range list.Vals { + size += len(b) + } + } + for _, list := range txTask.WriteLists { + for _, b := range list.Keys { + size += len(b) + } + for _, b := range list.Vals { + size += len(b) + } + } + txTask.ResultsSize = int64(size) + } +} + +func processResultQueue(rws *state.TxTaskQueue, outputTxNum *uint64, rs *state.State22, agg *libstate.Aggregator22, applyTx kv.Tx, + triggerCount *uint64, outputBlockNum *uint64, repeatCount *uint64, resultsSize *int64) { + for rws.Len() > 0 && (*rws)[0].TxNum == *outputTxNum { + txTask := heap.Pop(rws).(state.TxTask) + atomic.AddInt64(resultsSize, -txTask.ResultsSize) + if txTask.Error == nil && rs.ReadsValid(txTask.ReadLists) { + if err := rs.Apply(txTask.Rules.IsSpuriousDragon, applyTx, txTask, agg); err != nil { + panic(err) + } + *triggerCount += rs.CommitTxNum(txTask.Sender, txTask.TxNum) + *outputTxNum++ + *outputBlockNum = txTask.BlockNum + //fmt.Printf("Applied %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex) + } else { + rs.AddWork(txTask) + *repeatCount++ + //fmt.Printf("Rolled back %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex) + } + } +} + +func Erigon22(genesis *core.Genesis, logger log.Logger) error { sigs := make(chan os.Signal, 1) interruptCh := make(chan bool, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) @@ -66,94 +250,24 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log <-sigs interruptCh <- true }() - - historyDb, err := kv2.NewMDBX(logger).Path(path.Join(datadir, "chaindata")).Open() - if err != nil { - return fmt.Errorf("opening chaindata as read only: %v", err) - } - defer historyDb.Close() - ctx := context.Background() - historyTx, err1 := historyDb.BeginRo(ctx) - if err1 != nil { - return err1 - } - defer historyTx.Rollback() - stateDbPath := path.Join(datadir, "statedb") - if _, err = os.Stat(stateDbPath); err != nil { - if !errors.Is(err, os.ErrNotExist) { + reconDbPath := path.Join(datadir, "db22") + var err error + if reset { + if _, err = os.Stat(reconDbPath); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return err + } + } else if err = os.RemoveAll(reconDbPath); err != nil { return err } - } else if err = os.RemoveAll(stateDbPath); err != nil { + } + limiter := semaphore.NewWeighted(int64(runtime.NumCPU() + 1)) + db, err := kv2.NewMDBX(logger).Path(reconDbPath).RoTxsLimiter(limiter).Open() + if err != nil { return err } - db, err2 := kv2.NewMDBX(logger).Path(stateDbPath).WriteMap().Open() - if err2 != nil { - return err2 - } - defer db.Close() - - aggPath := filepath.Join(datadir, "erigon22") - - var rwTx kv.RwTx - defer func() { - if rwTx != nil { - rwTx.Rollback() - } - }() - if rwTx, err = db.BeginRw(ctx); err != nil { - return err - } - - agg, err3 := libstate.NewAggregator(aggPath, AggregationStep) - if err3 != nil { - return fmt.Errorf("create aggregator: %w", err3) - } - defer agg.Close() - startTxNum := agg.EndTxNumMinimax() - fmt.Printf("Max txNum in files: %d\n", startTxNum) - - interrupt := false - if startTxNum == 0 { - _, genesisIbs, err := genesis.ToBlock() - if err != nil { - return err - } - agg.SetTx(rwTx) - agg.SetTxNum(0) - if err = genesisIbs.CommitBlock(¶ms.Rules{}, &WriterWrapper22{w: agg}); err != nil { - return fmt.Errorf("cannot write state: %w", err) - } - if err = agg.FinishTx(); err != nil { - return err - } - } - - logger.Info("Initialised chain configuration", "config", chainConfig) - - var ( - blockNum uint64 - trace bool - vmConfig vm.Config - - txNum uint64 = 2 // Consider that each block contains at least first system tx and enclosing transactions, except for Clique consensus engine - ) - - logEvery := time.NewTicker(logInterval) - defer logEvery.Stop() - - statx := &stat22{ - prevBlock: blockNum, - prevTime: time.Now(), - } - - go func() { - for range logEvery.C { - aStats := agg.Stats() - statx.delta(aStats, blockNum).print(aStats, logger) - } - }() - + startTime := time.Now() var blockReader services.FullBlockReader var allSnapshots *snapshotsync.RoSnapshots allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots")) @@ -162,403 +276,367 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log return fmt.Errorf("reopen snapshot segments: %w", err) } blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) - engine := initConsensusEngine(chainConfig, logger, allSnapshots) - - getHeader := func(hash common.Hash, number uint64) *types.Header { - h, err := blockReader.Header(ctx, historyTx, hash, number) - if err != nil { - panic(err) - } - return h - } - readWrapper := &ReaderWrapper22{ac: agg.MakeContext(), roTx: rwTx} - writeWrapper := &WriterWrapper22{w: agg} - - for !interrupt { - blockNum++ - trace = traceBlock > 0 && blockNum == uint64(traceBlock) - blockHash, err := blockReader.CanonicalHash(ctx, historyTx, blockNum) - if err != nil { - return err - } - - b, _, err := blockReader.BlockWithSenders(ctx, historyTx, blockHash, blockNum) - if err != nil { - return err - } - if b == nil { - log.Info("history: block is nil", "block", blockNum) - break - } - agg.SetTx(rwTx) - agg.SetTxNum(txNum) - - if txNum, _, err = processBlock22(startTxNum, trace, txNum, readWrapper, writeWrapper, chainConfig, engine, getHeader, b, vmConfig); err != nil { - return fmt.Errorf("processing block %d: %w", blockNum, err) - } - - // Check for interrupts - select { - case interrupt = <-interruptCh: - log.Info(fmt.Sprintf("interrupted, please wait for cleanup, next time start with --block %d", blockNum)) - default: - } - // Commit transaction only when interrupted or just before computing commitment (so it can be re-done) - commit := interrupt - if !commit && (blockNum+1)%uint64(commitmentFrequency) == 0 { - var spaceDirty uint64 - if spaceDirty, _, err = rwTx.(*mdbx.MdbxTx).SpaceDirty(); err != nil { - return fmt.Errorf("retrieving spaceDirty: %w", err) - } - if spaceDirty >= dirtySpaceThreshold { - log.Info("Initiated tx commit", "block", blockNum, "space dirty", libcommon.ByteCount(spaceDirty)) - commit = true - } - } - if commit { - if err = rwTx.Commit(); err != nil { + // Compute mapping blockNum -> last TxNum in that block + maxBlockNum := allSnapshots.BlocksAvailable() + 1 + txNums := make([]uint64, maxBlockNum) + if err = allSnapshots.Bodies.View(func(bs []*snapshotsync.BodySegment) error { + for _, b := range bs { + if err = b.Iterate(func(blockNum, baseTxNum, txAmount uint64) { + txNums[blockNum] = baseTxNum + txAmount + }); err != nil { return err } - if !interrupt { - if rwTx, err = db.BeginRw(ctx); err != nil { - return err - } - } - agg.SetTx(rwTx) - readWrapper.roTx = rwTx } + return nil + }); err != nil { + return fmt.Errorf("build txNum => blockNum mapping: %w", err) } + workerCount := runtime.NumCPU() + workCh := make(chan state.TxTask, 128) - return nil -} - -type stat22 struct { - blockNum uint64 - hits uint64 - misses uint64 - prevBlock uint64 - prevMisses uint64 - prevHits uint64 - hitMissRatio float64 - speed float64 - prevTime time.Time - mem runtime.MemStats -} - -func (s *stat22) print(aStats libstate.FilesStats, logger log.Logger) { - totalFiles := 0 - totalDatSize := 0 - totalIdxSize := 0 - - logger.Info("Progress", "block", s.blockNum, "blk/s", s.speed, "state files", totalFiles, - "total dat", libcommon.ByteCount(uint64(totalDatSize)), "total idx", libcommon.ByteCount(uint64(totalIdxSize)), - "hit ratio", s.hitMissRatio, "hits+misses", s.hits+s.misses, - "alloc", libcommon.ByteCount(s.mem.Alloc), "sys", libcommon.ByteCount(s.mem.Sys), + engine := initConsensusEngine(chainConfig, logger, allSnapshots) + sentryControlServer, err := sentry.NewMultiClient( + db, + "", + chainConfig, + common.Hash{}, + engine, + 1, + nil, + ethconfig.Defaults.Sync, + blockReader, + false, ) -} - -func (s *stat22) delta(aStats libstate.FilesStats, blockNum uint64) *stat22 { - currentTime := time.Now() - libcommon.ReadMemStats(&s.mem) - - interval := currentTime.Sub(s.prevTime).Seconds() - s.blockNum = blockNum - s.speed = float64(s.blockNum-s.prevBlock) / interval - s.prevBlock = blockNum - s.prevTime = currentTime - - total := s.hits + s.misses - if total > 0 { - s.hitMissRatio = float64(s.hits) / float64(total) + if err != nil { + return err } - return s -} - -func processBlock22(startTxNum uint64, trace bool, txNumStart uint64, rw *ReaderWrapper22, ww *WriterWrapper22, chainConfig *params.ChainConfig, - engine consensus.Engine, getHeader func(hash common.Hash, number uint64) *types.Header, block *types.Block, vmConfig vm.Config, -) (uint64, types.Receipts, error) { - defer blockExecutionTimer.UpdateDuration(time.Now()) - - header := block.Header() - vmConfig.Debug = true - gp := new(core.GasPool).AddGas(block.GasLimit()) - usedGas := new(uint64) - var receipts types.Receipts - rules := chainConfig.Rules(block.NumberU64()) - txNum := txNumStart - ww.w.SetTxNum(txNum) - rw.blockNum = block.NumberU64() - ww.blockNum = block.NumberU64() - - daoFork := txNum >= startTxNum && chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0 - if daoFork { - ibs := state.New(rw) - // TODO Actually add tracing to the DAO related accounts - misc.ApplyDAOHardFork(ibs) - if err := ibs.FinalizeTx(rules, ww); err != nil { - return 0, nil, err + cfg := ethconfig.Defaults + cfg.DeprecatedTxPool.Disable = true + cfg.Dirs = datadir2.New(datadir) + cfg.Snapshot = allSnapshots.Cfg() + stagedSync, err := stages2.NewStagedSync(context.Background(), logger, db, p2p.Config{}, cfg, sentryControlServer, datadir, &stagedsync.Notifications{}, nil, allSnapshots, nil, nil) + if err != nil { + return err + } + rwTx, err := db.BeginRw(ctx) + if err != nil { + return err + } + defer func() { + if rwTx != nil { + rwTx.Rollback() } - if err := ww.w.FinishTx(); err != nil { - return 0, nil, fmt.Errorf("finish daoFork failed: %w", err) + }() + execStage, err := stagedSync.StageState(stages.Execution, rwTx, db) + if err != nil { + return err + } + if !reset { + block = execStage.BlockNumber + 1 + } + rwTx.Rollback() + + rs := state.NewState22() + aggDir := path.Join(datadir, "agg22") + if reset { + if _, err = os.Stat(aggDir); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return err + } + } else if err = os.RemoveAll(aggDir); err != nil { + return err + } + if err = os.MkdirAll(aggDir, 0755); err != nil { + return err } } - - txNum++ // Pre-block transaction - ww.w.SetTxNum(txNum) - getHashFn := core.GetHashFn(header, getHeader) - - for i, tx := range block.Transactions() { - if txNum >= startTxNum { - ibs := state.New(rw) - ibs.Prepare(tx.Hash(), block.Hash(), i) - ct := NewCallTracer() - vmConfig.Tracer = ct - receipt, _, err := core.ApplyTransaction(chainConfig, getHashFn, engine, nil, gp, ibs, ww, header, tx, usedGas, vmConfig, nil) - if err != nil { - return 0, nil, fmt.Errorf("could not apply tx %d [%x] failed: %w", i, tx.Hash(), err) + agg, err := libstate.NewAggregator22(aggDir, AggregationStep) + if err != nil { + return err + } + defer agg.Close() + var lock sync.RWMutex + reconWorkers := make([]*Worker22, workerCount) + var wg sync.WaitGroup + resultCh := make(chan state.TxTask, 128) + for i := 0; i < workerCount; i++ { + reconWorkers[i] = NewWorker22(lock.RLocker(), db, &wg, rs, blockReader, allSnapshots, txNums, chainConfig, logger, genesis, resultCh) + } + defer func() { + for i := 0; i < workerCount; i++ { + reconWorkers[i].ResetTx() + } + }() + wg.Add(workerCount) + for i := 0; i < workerCount; i++ { + go reconWorkers[i].run() + } + commitThreshold := uint64(1024 * 1024 * 1024) + resultsThreshold := int64(1024 * 1024 * 1024) + count := uint64(0) + repeatCount := uint64(0) + triggerCount := uint64(0) + prevCount := uint64(0) + prevRepeatCount := uint64(0) + //prevTriggerCount := uint64(0) + resultsSize := int64(0) + prevTime := time.Now() + logEvery := time.NewTicker(logInterval) + defer logEvery.Stop() + var rws state.TxTaskQueue + var rwsLock sync.Mutex + rwsReceiveCond := sync.NewCond(&rwsLock) + heap.Init(&rws) + var outputTxNum uint64 + if block > 0 { + outputTxNum = txNums[block-1] + } + var inputBlockNum, outputBlockNum uint64 + var prevOutputBlockNum uint64 = block + // Go-routine gathering results from the workers + var maxTxNum uint64 = txNums[len(txNums)-1] + go func() { + var applyTx kv.RwTx + defer func() { + if applyTx != nil { + applyTx.Rollback() } - for from := range ct.froms { - if err := ww.w.AddTraceFrom(from[:]); err != nil { - return 0, nil, err + }() + if applyTx, err = db.BeginRw(ctx); err != nil { + panic(err) + } + agg.SetTx(applyTx) + defer rs.Finish() + for outputTxNum < atomic.LoadUint64(&maxTxNum) { + select { + case txTask := <-resultCh: + //fmt.Printf("Saved %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex) + func() { + rwsLock.Lock() + defer rwsLock.Unlock() + atomic.AddInt64(&resultsSize, txTask.ResultsSize) + heap.Push(&rws, txTask) + processResultQueue(&rws, &outputTxNum, rs, agg, applyTx, &triggerCount, &outputBlockNum, &repeatCount, &resultsSize) + rwsReceiveCond.Signal() + }() + case <-logEvery.C: + var m runtime.MemStats + libcommon.ReadMemStats(&m) + sizeEstimate := rs.SizeEstimate() + count = rs.DoneCount() + currentTime := time.Now() + interval := currentTime.Sub(prevTime) + speedTx := float64(count-prevCount) / (float64(interval) / float64(time.Second)) + speedBlock := float64(outputBlockNum-prevOutputBlockNum) / (float64(interval) / float64(time.Second)) + var repeatRatio float64 + if count > prevCount { + repeatRatio = 100.0 * float64(repeatCount-prevRepeatCount) / float64(count-prevCount) } - } - for to := range ct.tos { - if err := ww.w.AddTraceTo(to[:]); err != nil { - return 0, nil, err - } - } - receipts = append(receipts, receipt) - for _, log := range receipt.Logs { - if err = ww.w.AddLogAddr(log.Address[:]); err != nil { - return 0, nil, fmt.Errorf("adding event log for addr %x: %w", log.Address, err) - } - for _, topic := range log.Topics { - if err = ww.w.AddLogTopic(topic[:]); err != nil { - return 0, nil, fmt.Errorf("adding event log for topic %x: %w", topic, err) + log.Info("Transaction replay", + //"workers", workerCount, + "at block", outputBlockNum, + "input block", atomic.LoadUint64(&inputBlockNum), + "blk/s", fmt.Sprintf("%.1f", speedBlock), + "tx/s", fmt.Sprintf("%.1f", speedTx), + //"repeats", repeatCount-prevRepeatCount, + //"triggered", triggerCount-prevTriggerCount, + "result queue", rws.Len(), + "results size", libcommon.ByteCount(uint64(atomic.LoadInt64(&resultsSize))), + "repeat ratio", fmt.Sprintf("%.2f%%", repeatRatio), + "buffer", libcommon.ByteCount(sizeEstimate), + "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys), + ) + prevTime = currentTime + prevCount = count + prevOutputBlockNum = outputBlockNum + prevRepeatCount = repeatCount + //prevTriggerCount = triggerCount + if sizeEstimate >= commitThreshold { + commitStart := time.Now() + log.Info("Committing...") + err := func() error { + rwsLock.Lock() + defer rwsLock.Unlock() + // Drain results (and process) channel because read sets do not carry over + for { + var drained bool + for !drained { + select { + case txTask := <-resultCh: + atomic.AddInt64(&resultsSize, txTask.ResultsSize) + heap.Push(&rws, txTask) + default: + drained = true + } + } + processResultQueue(&rws, &outputTxNum, rs, agg, applyTx, &triggerCount, &outputBlockNum, &repeatCount, &resultsSize) + if rws.Len() == 0 { + break + } + } + rwsReceiveCond.Signal() + lock.Lock() // This is to prevent workers from starting work on any new txTask + defer lock.Unlock() + // Drain results channel because read sets do not carry over + var drained bool + for !drained { + select { + case txTask := <-resultCh: + rs.AddWork(txTask) + default: + drained = true + } + } + // Drain results queue as well + for rws.Len() > 0 { + txTask := heap.Pop(&rws).(state.TxTask) + atomic.AddInt64(&resultsSize, -txTask.ResultsSize) + rs.AddWork(txTask) + } + if err = applyTx.Commit(); err != nil { + return err + } + for i := 0; i < workerCount; i++ { + reconWorkers[i].ResetTx() + } + rwTx, err = db.BeginRw(ctx) + if err != nil { + return err + } + if err = rs.Flush(rwTx); err != nil { + return err + } + if err = rwTx.Commit(); err != nil { + return err + } + if applyTx, err = db.BeginRw(ctx); err != nil { + return err + } + agg.SetTx(applyTx) + return nil + }() + if err != nil { + panic(err) } + log.Info("Committed", "time", time.Since(commitStart)) } } - if err = ww.w.FinishTx(); err != nil { - return 0, nil, fmt.Errorf("finish tx %d [%x] failed: %w", i, tx.Hash(), err) - } - if trace { - fmt.Printf("FinishTx called for blockNum=%d, txIndex=%d, txNum=%d txHash=[%x]\n", block.NumberU64(), i, txNum, tx.Hash()) - } } - txNum++ - ww.w.SetTxNum(txNum) + if err = applyTx.Commit(); err != nil { + panic(err) + } + }() + var inputTxNum uint64 + if block > 0 { + inputTxNum = txNums[block-1] } - - if txNum >= startTxNum && chainConfig.IsByzantium(block.NumberU64()) { - receiptSha := types.DeriveSha(receipts) - if receiptSha != block.ReceiptHash() { - fmt.Printf("mismatched receipt headers for block %d\n", block.NumberU64()) - for j, receipt := range receipts { - fmt.Printf("tx %d, used gas: %d\n", j, receipt.GasUsed) + var header *types.Header + var blockNum uint64 +loop: + for blockNum = block; blockNum < maxBlockNum; blockNum++ { + atomic.StoreUint64(&inputBlockNum, blockNum) + rules := chainConfig.Rules(blockNum) + if header, err = blockReader.HeaderByNumber(ctx, nil, blockNum); err != nil { + return err + } + blockHash := header.Hash() + b, _, err := blockReader.BlockWithSenders(ctx, nil, blockHash, blockNum) + if err != nil { + return err + } + txs := b.Transactions() + for txIndex := -1; txIndex <= len(txs); txIndex++ { + // Do not oversend, wait for the result heap to go under certain size + func() { + rwsLock.Lock() + defer rwsLock.Unlock() + for rws.Len() > 128 || atomic.LoadInt64(&resultsSize) >= resultsThreshold || rs.SizeEstimate() >= commitThreshold { + rwsReceiveCond.Wait() + } + }() + txTask := state.TxTask{ + Header: header, + BlockNum: blockNum, + Rules: rules, + Block: b, + TxNum: inputTxNum, + TxIndex: txIndex, + BlockHash: blockHash, + Final: txIndex == len(txs), } + if txIndex >= 0 && txIndex < len(txs) { + txTask.Tx = txs[txIndex] + if sender, ok := txs[txIndex].GetSender(); ok { + txTask.Sender = &sender + } + if ok := rs.RegisterSender(txTask); ok { + rs.AddWork(txTask) + } + } else { + rs.AddWork(txTask) + } + inputTxNum++ + } + // Check for interrupts + select { + case <-interruptCh: + log.Info(fmt.Sprintf("interrupted, please wait for cleanup, next run will start with block %d", blockNum+1)) + atomic.StoreUint64(&maxTxNum, inputTxNum) + break loop + default: } } - - if txNum >= startTxNum { - ibs := state.New(rw) - if err := ww.w.AddTraceTo(block.Coinbase().Bytes()); err != nil { - return 0, nil, fmt.Errorf("adding coinbase trace: %w", err) - } - for _, uncle := range block.Uncles() { - if err := ww.w.AddTraceTo(uncle.Coinbase.Bytes()); err != nil { - return 0, nil, fmt.Errorf("adding uncle trace: %w", err) - } - } - - // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) - if _, _, err := engine.Finalize(chainConfig, header, ibs, block.Transactions(), block.Uncles(), receipts, nil, nil, nil); err != nil { - return 0, nil, fmt.Errorf("finalize of block %d failed: %w", block.NumberU64(), err) - } - - if err := ibs.CommitBlock(rules, ww); err != nil { - return 0, nil, fmt.Errorf("committing block %d failed: %w", block.NumberU64(), err) - } - - if err := ww.w.FinishTx(); err != nil { - return 0, nil, fmt.Errorf("failed to finish tx: %w", err) - } - if trace { - fmt.Printf("FinishTx called for %d block %d\n", txNum, block.NumberU64()) - } + close(workCh) + wg.Wait() + for i := 0; i < workerCount; i++ { + reconWorkers[i].ResetTx() } - - txNum++ // Post-block transaction - ww.w.SetTxNum(txNum) - - return txNum, receipts, nil -} - -// Implements StateReader and StateWriter -type ReaderWrapper22 struct { - roTx kv.Tx - ac *libstate.AggregatorContext - blockNum uint64 -} - -type WriterWrapper22 struct { - blockNum uint64 - w *libstate.Aggregator -} - -func (rw *ReaderWrapper22) ReadAccountData(address common.Address) (*accounts.Account, error) { - enc, err := rw.ac.ReadAccountData(address.Bytes(), rw.roTx) + rwTx, err = db.BeginRw(ctx) if err != nil { - return nil, err - } - if len(enc) == 0 { - return nil, nil - } - - var a accounts.Account - a.Reset() - pos := 0 - nonceBytes := int(enc[pos]) - pos++ - if nonceBytes > 0 { - a.Nonce = bytesToUint64(enc[pos : pos+nonceBytes]) - pos += nonceBytes - } - balanceBytes := int(enc[pos]) - pos++ - if balanceBytes > 0 { - a.Balance.SetBytes(enc[pos : pos+balanceBytes]) - pos += balanceBytes - } - codeHashBytes := int(enc[pos]) - pos++ - if codeHashBytes > 0 { - copy(a.CodeHash[:], enc[pos:pos+codeHashBytes]) - pos += codeHashBytes - } - incBytes := int(enc[pos]) - pos++ - if incBytes > 0 { - a.Incarnation = bytesToUint64(enc[pos : pos+incBytes]) - } - return &a, nil -} - -func (rw *ReaderWrapper22) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) { - enc, err := rw.ac.ReadAccountStorage(address.Bytes(), key.Bytes(), rw.roTx) - if err != nil { - return nil, err - } - if enc == nil { - return nil, nil - } - if len(enc) == 1 && enc[0] == 0 { - return nil, nil - } - return enc, nil -} - -func (rw *ReaderWrapper22) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) { - return rw.ac.ReadAccountCode(address.Bytes(), rw.roTx) -} - -func (rw *ReaderWrapper22) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) { - return rw.ac.ReadAccountCodeSize(address.Bytes(), rw.roTx) -} - -func (rw *ReaderWrapper22) ReadAccountIncarnation(address common.Address) (uint64, error) { - return 0, nil -} - -func (ww *WriterWrapper22) UpdateAccountData(address common.Address, original, account *accounts.Account) error { - var l int - l++ - if account.Nonce > 0 { - l += (bits.Len64(account.Nonce) + 7) / 8 - } - l++ - if !account.Balance.IsZero() { - l += account.Balance.ByteLen() - } - l++ - if !account.IsEmptyCodeHash() { - l += 32 - } - l++ - if account.Incarnation > 0 { - l += (bits.Len64(account.Incarnation) + 7) / 8 - } - value := make([]byte, l) - pos := 0 - if account.Nonce == 0 { - value[pos] = 0 - pos++ - } else { - nonceBytes := (bits.Len64(account.Nonce) + 7) / 8 - value[pos] = byte(nonceBytes) - var nonce = account.Nonce - for i := nonceBytes; i > 0; i-- { - value[pos+i] = byte(nonce) - nonce >>= 8 - } - pos += nonceBytes + 1 - } - if account.Balance.IsZero() { - value[pos] = 0 - pos++ - } else { - balanceBytes := account.Balance.ByteLen() - value[pos] = byte(balanceBytes) - pos++ - account.Balance.WriteToSlice(value[pos : pos+balanceBytes]) - pos += balanceBytes - } - if account.IsEmptyCodeHash() { - value[pos] = 0 - pos++ - } else { - value[pos] = 32 - pos++ - copy(value[pos:pos+32], account.CodeHash[:]) - pos += 32 - } - if account.Incarnation == 0 { - value[pos] = 0 - } else { - incBytes := (bits.Len64(account.Incarnation) + 7) / 8 - value[pos] = byte(incBytes) - var inc = account.Incarnation - for i := incBytes; i > 0; i-- { - value[pos+i] = byte(inc) - inc >>= 8 - } - } - if err := ww.w.UpdateAccountData(address.Bytes(), value); err != nil { return err } - return nil -} - -func (ww *WriterWrapper22) UpdateAccountCode(address common.Address, incarnation uint64, codeHash common.Hash, code []byte) error { - if err := ww.w.UpdateAccountCode(address.Bytes(), code); err != nil { + if err = rs.Flush(rwTx); err != nil { return err } + if err = execStage.Update(rwTx, blockNum); err != nil { + return err + } + if err = rwTx.Commit(); err != nil { + return err + } + if rwTx, err = db.BeginRw(ctx); err != nil { + return err + } + log.Info("Transaction replay complete", "duration", time.Since(startTime)) + log.Info("Computing hashed state") + tmpDir := filepath.Join(datadir, "tmp") + if err = rwTx.ClearBucket(kv.HashedAccounts); err != nil { + return err + } + if err = rwTx.ClearBucket(kv.HashedStorage); err != nil { + return err + } + if err = rwTx.ClearBucket(kv.ContractCode); err != nil { + return err + } + if err = stagedsync.PromoteHashedStateCleanly("recon", rwTx, stagedsync.StageHashStateCfg(db, tmpDir), ctx); err != nil { + return err + } + if err = rwTx.Commit(); err != nil { + return err + } + if rwTx, err = db.BeginRw(ctx); err != nil { + return err + } + var rootHash common.Hash + if rootHash, err = stagedsync.RegenerateIntermediateHashes("recon", rwTx, stagedsync.StageTrieCfg(db, false /* checkRoot */, false /* saveHashesToDB */, false /* badBlockHalt */, tmpDir, blockReader, nil /* HeaderDownload */), common.Hash{}, make(chan struct{}, 1)); err != nil { + return err + } + if err = rwTx.Commit(); err != nil { + return err + } + if rootHash != header.Root { + log.Error("Incorrect root hash", "expected", fmt.Sprintf("%x", header.Root)) + } return nil } - -func (ww *WriterWrapper22) DeleteAccount(address common.Address, original *accounts.Account) error { - if err := ww.w.DeleteAccount(address.Bytes()); err != nil { - return err - } - return nil -} - -func (ww *WriterWrapper22) WriteAccountStorage(address common.Address, incarnation uint64, key *common.Hash, original, value *uint256.Int) error { - if err := ww.w.WriteAccountStorage(address.Bytes(), key.Bytes(), value.Bytes()); err != nil { - return err - } - return nil -} - -func (ww *WriterWrapper22) CreateContract(address common.Address) error { - return nil -} diff --git a/cmd/state/commands/erigon23.go b/cmd/state/commands/erigon23.go new file mode 100644 index 000000000..c0974dfe3 --- /dev/null +++ b/cmd/state/commands/erigon23.go @@ -0,0 +1,564 @@ +package commands + +import ( + "context" + "errors" + "fmt" + "math/bits" + "os" + "os/signal" + "path" + "path/filepath" + "runtime" + "syscall" + "time" + + "github.com/holiman/uint256" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" + kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" + libstate "github.com/ledgerwatch/erigon-lib/state" + "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/log/v3" + "github.com/spf13/cobra" + + "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon/consensus" + "github.com/ledgerwatch/erigon/consensus/misc" + "github.com/ledgerwatch/erigon/core" + "github.com/ledgerwatch/erigon/core/state" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/core/types/accounts" + "github.com/ledgerwatch/erigon/core/vm" + "github.com/ledgerwatch/erigon/eth/ethconfig" + "github.com/ledgerwatch/erigon/params" + "github.com/ledgerwatch/erigon/turbo/snapshotsync" +) + +const ( + AggregationStep = 3_125_000 /* number of transactions in smallest static file */ +) + +func init() { + withBlock(erigon23Cmd) + withDataDir(erigon23Cmd) + withChain(erigon23Cmd) + + rootCmd.AddCommand(erigon23Cmd) +} + +var erigon23Cmd = &cobra.Command{ + Use: "erigon23", + Short: "Exerimental command to re-execute blocks from beginning using erigon2 state representation and histoty (ugrade 3)", + RunE: func(cmd *cobra.Command, args []string) error { + logger := log.New() + return Erigon23(genesis, chainConfig, logger) + }, +} + +func Erigon23(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.Logger) error { + sigs := make(chan os.Signal, 1) + interruptCh := make(chan bool, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-sigs + interruptCh <- true + }() + + historyDb, err := kv2.NewMDBX(logger).Path(path.Join(datadir, "chaindata")).Open() + if err != nil { + return fmt.Errorf("opening chaindata as read only: %v", err) + } + defer historyDb.Close() + + ctx := context.Background() + historyTx, err1 := historyDb.BeginRo(ctx) + if err1 != nil { + return err1 + } + defer historyTx.Rollback() + stateDbPath := path.Join(datadir, "db23") + if _, err = os.Stat(stateDbPath); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return err + } + } else if err = os.RemoveAll(stateDbPath); err != nil { + return err + } + db, err2 := kv2.NewMDBX(logger).Path(stateDbPath).WriteMap().Open() + if err2 != nil { + return err2 + } + defer db.Close() + + aggPath := filepath.Join(datadir, "erigon23") + + var rwTx kv.RwTx + defer func() { + if rwTx != nil { + rwTx.Rollback() + } + }() + if rwTx, err = db.BeginRw(ctx); err != nil { + return err + } + + agg, err3 := libstate.NewAggregator(aggPath, AggregationStep) + if err3 != nil { + return fmt.Errorf("create aggregator: %w", err3) + } + defer agg.Close() + startTxNum := agg.EndTxNumMinimax() + fmt.Printf("Max txNum in files: %d\n", startTxNum) + + interrupt := false + if startTxNum == 0 { + _, genesisIbs, err := genesis.ToBlock() + if err != nil { + return err + } + agg.SetTx(rwTx) + agg.SetTxNum(0) + if err = genesisIbs.CommitBlock(¶ms.Rules{}, &WriterWrapper23{w: agg}); err != nil { + return fmt.Errorf("cannot write state: %w", err) + } + if err = agg.FinishTx(); err != nil { + return err + } + } + + logger.Info("Initialised chain configuration", "config", chainConfig) + + var ( + blockNum uint64 + trace bool + vmConfig vm.Config + + txNum uint64 = 2 // Consider that each block contains at least first system tx and enclosing transactions, except for Clique consensus engine + ) + + logEvery := time.NewTicker(logInterval) + defer logEvery.Stop() + + statx := &stat23{ + prevBlock: blockNum, + prevTime: time.Now(), + } + + go func() { + for range logEvery.C { + aStats := agg.Stats() + statx.delta(aStats, blockNum).print(aStats, logger) + } + }() + + var blockReader services.FullBlockReader + var allSnapshots *snapshotsync.RoSnapshots + allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots")) + defer allSnapshots.Close() + if err := allSnapshots.ReopenWithDB(db); err != nil { + return fmt.Errorf("reopen snapshot segments: %w", err) + } + blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) + engine := initConsensusEngine(chainConfig, logger, allSnapshots) + + getHeader := func(hash common.Hash, number uint64) *types.Header { + h, err := blockReader.Header(ctx, historyTx, hash, number) + if err != nil { + panic(err) + } + return h + } + readWrapper := &ReaderWrapper23{ac: agg.MakeContext(), roTx: rwTx} + writeWrapper := &WriterWrapper23{w: agg} + + for !interrupt { + blockNum++ + trace = traceBlock > 0 && blockNum == uint64(traceBlock) + blockHash, err := blockReader.CanonicalHash(ctx, historyTx, blockNum) + if err != nil { + return err + } + + b, _, err := blockReader.BlockWithSenders(ctx, historyTx, blockHash, blockNum) + if err != nil { + return err + } + if b == nil { + log.Info("history: block is nil", "block", blockNum) + break + } + agg.SetTx(rwTx) + agg.SetTxNum(txNum) + + if txNum, _, err = processBlock23(startTxNum, trace, txNum, readWrapper, writeWrapper, chainConfig, engine, getHeader, b, vmConfig); err != nil { + return fmt.Errorf("processing block %d: %w", blockNum, err) + } + + // Check for interrupts + select { + case interrupt = <-interruptCh: + log.Info(fmt.Sprintf("interrupted, please wait for cleanup, next time start with --block %d", blockNum)) + default: + } + // Commit transaction only when interrupted or just before computing commitment (so it can be re-done) + commit := interrupt + if !commit && (blockNum+1)%uint64(commitmentFrequency) == 0 { + var spaceDirty uint64 + if spaceDirty, _, err = rwTx.(*mdbx.MdbxTx).SpaceDirty(); err != nil { + return fmt.Errorf("retrieving spaceDirty: %w", err) + } + if spaceDirty >= dirtySpaceThreshold { + log.Info("Initiated tx commit", "block", blockNum, "space dirty", libcommon.ByteCount(spaceDirty)) + commit = true + } + } + if commit { + if err = rwTx.Commit(); err != nil { + return err + } + if !interrupt { + if rwTx, err = db.BeginRw(ctx); err != nil { + return err + } + } + agg.SetTx(rwTx) + readWrapper.roTx = rwTx + } + } + + return nil +} + +type stat23 struct { + blockNum uint64 + hits uint64 + misses uint64 + prevBlock uint64 + prevMisses uint64 + prevHits uint64 + hitMissRatio float64 + speed float64 + prevTime time.Time + mem runtime.MemStats +} + +func (s *stat23) print(aStats libstate.FilesStats, logger log.Logger) { + totalFiles := 0 + totalDatSize := 0 + totalIdxSize := 0 + + logger.Info("Progress", "block", s.blockNum, "blk/s", s.speed, "state files", totalFiles, + "total dat", libcommon.ByteCount(uint64(totalDatSize)), "total idx", libcommon.ByteCount(uint64(totalIdxSize)), + "hit ratio", s.hitMissRatio, "hits+misses", s.hits+s.misses, + "alloc", libcommon.ByteCount(s.mem.Alloc), "sys", libcommon.ByteCount(s.mem.Sys), + ) +} + +func (s *stat23) delta(aStats libstate.FilesStats, blockNum uint64) *stat23 { + currentTime := time.Now() + libcommon.ReadMemStats(&s.mem) + + interval := currentTime.Sub(s.prevTime).Seconds() + s.blockNum = blockNum + s.speed = float64(s.blockNum-s.prevBlock) / interval + s.prevBlock = blockNum + s.prevTime = currentTime + + total := s.hits + s.misses + if total > 0 { + s.hitMissRatio = float64(s.hits) / float64(total) + } + return s +} + +func processBlock23(startTxNum uint64, trace bool, txNumStart uint64, rw *ReaderWrapper23, ww *WriterWrapper23, chainConfig *params.ChainConfig, + engine consensus.Engine, getHeader func(hash common.Hash, number uint64) *types.Header, block *types.Block, vmConfig vm.Config, +) (uint64, types.Receipts, error) { + defer blockExecutionTimer.UpdateDuration(time.Now()) + + header := block.Header() + vmConfig.Debug = true + gp := new(core.GasPool).AddGas(block.GasLimit()) + usedGas := new(uint64) + var receipts types.Receipts + rules := chainConfig.Rules(block.NumberU64()) + txNum := txNumStart + ww.w.SetTxNum(txNum) + rw.blockNum = block.NumberU64() + ww.blockNum = block.NumberU64() + + daoFork := txNum >= startTxNum && chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0 + if daoFork { + ibs := state.New(rw) + // TODO Actually add tracing to the DAO related accounts + misc.ApplyDAOHardFork(ibs) + if err := ibs.FinalizeTx(rules, ww); err != nil { + return 0, nil, err + } + if err := ww.w.FinishTx(); err != nil { + return 0, nil, fmt.Errorf("finish daoFork failed: %w", err) + } + } + + txNum++ // Pre-block transaction + ww.w.SetTxNum(txNum) + getHashFn := core.GetHashFn(header, getHeader) + + for i, tx := range block.Transactions() { + if txNum >= startTxNum { + ibs := state.New(rw) + ibs.Prepare(tx.Hash(), block.Hash(), i) + ct := NewCallTracer() + vmConfig.Tracer = ct + receipt, _, err := core.ApplyTransaction(chainConfig, getHashFn, engine, nil, gp, ibs, ww, header, tx, usedGas, vmConfig, nil) + if err != nil { + return 0, nil, fmt.Errorf("could not apply tx %d [%x] failed: %w", i, tx.Hash(), err) + } + for from := range ct.froms { + if err := ww.w.AddTraceFrom(from[:]); err != nil { + return 0, nil, err + } + } + for to := range ct.tos { + if err := ww.w.AddTraceTo(to[:]); err != nil { + return 0, nil, err + } + } + receipts = append(receipts, receipt) + for _, log := range receipt.Logs { + if err = ww.w.AddLogAddr(log.Address[:]); err != nil { + return 0, nil, fmt.Errorf("adding event log for addr %x: %w", log.Address, err) + } + for _, topic := range log.Topics { + if err = ww.w.AddLogTopic(topic[:]); err != nil { + return 0, nil, fmt.Errorf("adding event log for topic %x: %w", topic, err) + } + } + } + if err = ww.w.FinishTx(); err != nil { + return 0, nil, fmt.Errorf("finish tx %d [%x] failed: %w", i, tx.Hash(), err) + } + if trace { + fmt.Printf("FinishTx called for blockNum=%d, txIndex=%d, txNum=%d txHash=[%x]\n", block.NumberU64(), i, txNum, tx.Hash()) + } + } + txNum++ + ww.w.SetTxNum(txNum) + } + + if txNum >= startTxNum && chainConfig.IsByzantium(block.NumberU64()) { + receiptSha := types.DeriveSha(receipts) + if receiptSha != block.ReceiptHash() { + fmt.Printf("mismatched receipt headers for block %d\n", block.NumberU64()) + for j, receipt := range receipts { + fmt.Printf("tx %d, used gas: %d\n", j, receipt.GasUsed) + } + } + } + + if txNum >= startTxNum { + ibs := state.New(rw) + if err := ww.w.AddTraceTo(block.Coinbase().Bytes()); err != nil { + return 0, nil, fmt.Errorf("adding coinbase trace: %w", err) + } + for _, uncle := range block.Uncles() { + if err := ww.w.AddTraceTo(uncle.Coinbase.Bytes()); err != nil { + return 0, nil, fmt.Errorf("adding uncle trace: %w", err) + } + } + + // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) + if _, _, err := engine.Finalize(chainConfig, header, ibs, block.Transactions(), block.Uncles(), receipts, nil, nil, nil); err != nil { + return 0, nil, fmt.Errorf("finalize of block %d failed: %w", block.NumberU64(), err) + } + + if err := ibs.CommitBlock(rules, ww); err != nil { + return 0, nil, fmt.Errorf("committing block %d failed: %w", block.NumberU64(), err) + } + + if err := ww.w.FinishTx(); err != nil { + return 0, nil, fmt.Errorf("failed to finish tx: %w", err) + } + if trace { + fmt.Printf("FinishTx called for %d block %d\n", txNum, block.NumberU64()) + } + } + + txNum++ // Post-block transaction + ww.w.SetTxNum(txNum) + + return txNum, receipts, nil +} + +// Implements StateReader and StateWriter +type ReaderWrapper23 struct { + roTx kv.Tx + ac *libstate.AggregatorContext + blockNum uint64 +} + +type WriterWrapper23 struct { + blockNum uint64 + w *libstate.Aggregator +} + +func (rw *ReaderWrapper23) ReadAccountData(address common.Address) (*accounts.Account, error) { + enc, err := rw.ac.ReadAccountData(address.Bytes(), rw.roTx) + if err != nil { + return nil, err + } + if len(enc) == 0 { + return nil, nil + } + + var a accounts.Account + a.Reset() + pos := 0 + nonceBytes := int(enc[pos]) + pos++ + if nonceBytes > 0 { + a.Nonce = bytesToUint64(enc[pos : pos+nonceBytes]) + pos += nonceBytes + } + balanceBytes := int(enc[pos]) + pos++ + if balanceBytes > 0 { + a.Balance.SetBytes(enc[pos : pos+balanceBytes]) + pos += balanceBytes + } + codeHashBytes := int(enc[pos]) + pos++ + if codeHashBytes > 0 { + copy(a.CodeHash[:], enc[pos:pos+codeHashBytes]) + pos += codeHashBytes + } + incBytes := int(enc[pos]) + pos++ + if incBytes > 0 { + a.Incarnation = bytesToUint64(enc[pos : pos+incBytes]) + } + return &a, nil +} + +func (rw *ReaderWrapper23) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) { + enc, err := rw.ac.ReadAccountStorage(address.Bytes(), key.Bytes(), rw.roTx) + if err != nil { + return nil, err + } + if enc == nil { + return nil, nil + } + if len(enc) == 1 && enc[0] == 0 { + return nil, nil + } + return enc, nil +} + +func (rw *ReaderWrapper23) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) { + return rw.ac.ReadAccountCode(address.Bytes(), rw.roTx) +} + +func (rw *ReaderWrapper23) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) { + return rw.ac.ReadAccountCodeSize(address.Bytes(), rw.roTx) +} + +func (rw *ReaderWrapper23) ReadAccountIncarnation(address common.Address) (uint64, error) { + return 0, nil +} + +func (ww *WriterWrapper23) UpdateAccountData(address common.Address, original, account *accounts.Account) error { + var l int + l++ + if account.Nonce > 0 { + l += (bits.Len64(account.Nonce) + 7) / 8 + } + l++ + if !account.Balance.IsZero() { + l += account.Balance.ByteLen() + } + l++ + if !account.IsEmptyCodeHash() { + l += 32 + } + l++ + if account.Incarnation > 0 { + l += (bits.Len64(account.Incarnation) + 7) / 8 + } + value := make([]byte, l) + pos := 0 + if account.Nonce == 0 { + value[pos] = 0 + pos++ + } else { + nonceBytes := (bits.Len64(account.Nonce) + 7) / 8 + value[pos] = byte(nonceBytes) + var nonce = account.Nonce + for i := nonceBytes; i > 0; i-- { + value[pos+i] = byte(nonce) + nonce >>= 8 + } + pos += nonceBytes + 1 + } + if account.Balance.IsZero() { + value[pos] = 0 + pos++ + } else { + balanceBytes := account.Balance.ByteLen() + value[pos] = byte(balanceBytes) + pos++ + account.Balance.WriteToSlice(value[pos : pos+balanceBytes]) + pos += balanceBytes + } + if account.IsEmptyCodeHash() { + value[pos] = 0 + pos++ + } else { + value[pos] = 32 + pos++ + copy(value[pos:pos+32], account.CodeHash[:]) + pos += 32 + } + if account.Incarnation == 0 { + value[pos] = 0 + } else { + incBytes := (bits.Len64(account.Incarnation) + 7) / 8 + value[pos] = byte(incBytes) + var inc = account.Incarnation + for i := incBytes; i > 0; i-- { + value[pos+i] = byte(inc) + inc >>= 8 + } + } + if err := ww.w.UpdateAccountData(address.Bytes(), value); err != nil { + return err + } + return nil +} + +func (ww *WriterWrapper23) UpdateAccountCode(address common.Address, incarnation uint64, codeHash common.Hash, code []byte) error { + if err := ww.w.UpdateAccountCode(address.Bytes(), code); err != nil { + return err + } + return nil +} + +func (ww *WriterWrapper23) DeleteAccount(address common.Address, original *accounts.Account) error { + if err := ww.w.DeleteAccount(address.Bytes()); err != nil { + return err + } + return nil +} + +func (ww *WriterWrapper23) WriteAccountStorage(address common.Address, incarnation uint64, key *common.Hash, original, value *uint256.Int) error { + if err := ww.w.WriteAccountStorage(address.Bytes(), key.Bytes(), value.Bytes()); err != nil { + return err + } + return nil +} + +func (ww *WriterWrapper23) CreateContract(address common.Address) error { + return nil +} diff --git a/cmd/state/commands/replay_tx.go b/cmd/state/commands/replay_tx.go index 5eba47e33..417f5b71b 100644 --- a/cmd/state/commands/replay_tx.go +++ b/cmd/state/commands/replay_tx.go @@ -103,8 +103,8 @@ func ReplayTx(genesis *core.Genesis) error { txNum = txnum } fmt.Printf("txNum = %d\n", txNum) - aggPath := filepath.Join(datadir, "erigon23") - agg, err := libstate.NewAggregator(aggPath, AggregationStep) + aggPath := filepath.Join(datadir, "agg22") + agg, err := libstate.NewAggregator22(aggPath, AggregationStep) if err != nil { return fmt.Errorf("create history: %w", err) } @@ -119,7 +119,7 @@ func ReplayTx(genesis *core.Genesis) error { } func replayTxNum(ctx context.Context, allSnapshots *snapshotsync.RoSnapshots, blockReader services.FullBlockReader, - txNum uint64, txNums []uint64, rs *state.ReconState, ac *libstate.AggregatorContext, + txNum uint64, txNums []uint64, rs *state.ReconState, ac *libstate.Aggregator22Context, ) error { bn := uint64(sort.Search(len(txNums), func(i int) bool { return txNums[i] > txNum diff --git a/cmd/state/commands/state_recon.go b/cmd/state/commands/state_recon.go index aff2a98b3..336e0fcd0 100644 --- a/cmd/state/commands/state_recon.go +++ b/cmd/state/commands/state_recon.go @@ -72,7 +72,7 @@ type ReconWorker struct { } func NewReconWorker(lock sync.Locker, wg *sync.WaitGroup, rs *state.ReconState, - a *libstate.Aggregator, blockReader services.FullBlockReader, allSnapshots *snapshotsync.RoSnapshots, + a *libstate.Aggregator22, blockReader services.FullBlockReader, allSnapshots *snapshotsync.RoSnapshots, chainConfig *params.ChainConfig, logger log.Logger, genesis *core.Genesis, ) *ReconWorker { ac := a.MakeContext() @@ -122,7 +122,7 @@ func (rw *ReconWorker) runTxTask(txTask state.TxTask) { daoForkTx := rw.chainConfig.DAOForkSupport && rw.chainConfig.DAOForkBlock != nil && rw.chainConfig.DAOForkBlock.Uint64() == txTask.BlockNum && txTask.TxIndex == -1 var err error if txTask.BlockNum == 0 && txTask.TxIndex == -1 { - //fmt.Printf("txNum=%d, blockNum=%d, Genesis\n", txNum, blockNum) + //fmt.Printf("txNum=%d, blockNum=%d, Genesis\n", txTask.TxNum, txTask.BlockNum) // Genesis block _, ibs, err = rw.genesis.ToBlock() if err != nil { @@ -180,7 +180,7 @@ func (rw *ReconWorker) runTxTask(txTask state.TxTask) { type FillWorker struct { txNum uint64 doneCount *uint64 - ac *libstate.AggregatorContext + ac *libstate.Aggregator22Context fromKey, toKey []byte currentKey []byte bitmap roaring64.Bitmap @@ -188,7 +188,7 @@ type FillWorker struct { progress uint64 } -func NewFillWorker(txNum uint64, doneCount *uint64, a *libstate.Aggregator, fromKey, toKey []byte) *FillWorker { +func NewFillWorker(txNum uint64, doneCount *uint64, a *libstate.Aggregator22, fromKey, toKey []byte) *FillWorker { fw := &FillWorker{ txNum: txNum, doneCount: doneCount, @@ -355,8 +355,8 @@ func Recon(genesis *core.Genesis, logger log.Logger) error { interruptCh <- true }() ctx := context.Background() - aggPath := filepath.Join(datadir, "erigon23") - agg, err := libstate.NewAggregator(aggPath, AggregationStep) + aggPath := filepath.Join(datadir, "agg22") + agg, err := libstate.NewAggregator22(aggPath, AggregationStep) if err != nil { return fmt.Errorf("create history: %w", err) } diff --git a/cmd/state/commands/state_recon_1.go b/cmd/state/commands/state_recon_1.go deleted file mode 100644 index c72b17277..000000000 --- a/cmd/state/commands/state_recon_1.go +++ /dev/null @@ -1,546 +0,0 @@ -package commands - -import ( - "container/heap" - "context" - "errors" - "fmt" - "os" - "os/signal" - "path" - "path/filepath" - "runtime" - "sync" - "sync/atomic" - "syscall" - "time" - - libcommon "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon-lib/kv" - kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/consensus" - "github.com/ledgerwatch/erigon/consensus/misc" - "github.com/ledgerwatch/erigon/core" - "github.com/ledgerwatch/erigon/core/state" - "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/core/vm" - "github.com/ledgerwatch/erigon/eth/ethconfig" - "github.com/ledgerwatch/erigon/eth/stagedsync" - "github.com/ledgerwatch/erigon/params" - "github.com/ledgerwatch/erigon/turbo/services" - "github.com/ledgerwatch/erigon/turbo/snapshotsync" - "github.com/ledgerwatch/log/v3" - "github.com/spf13/cobra" - "golang.org/x/sync/semaphore" -) - -func init() { - withBlock(recon1Cmd) - withDataDir(recon1Cmd) - rootCmd.AddCommand(recon1Cmd) -} - -var recon1Cmd = &cobra.Command{ - Use: "recon1", - Short: "Exerimental command to reconstitute the state at given block", - RunE: func(cmd *cobra.Command, args []string) error { - logger := log.New() - return Recon1(genesis, logger) - }, -} - -type ReconWorker1 struct { - lock sync.Locker - wg *sync.WaitGroup - rs *state.ReconState1 - blockReader services.FullBlockReader - allSnapshots *snapshotsync.RoSnapshots - stateWriter *state.StateReconWriter1 - stateReader *state.StateReconReader1 - getHeader func(hash common.Hash, number uint64) *types.Header - ctx context.Context - engine consensus.Engine - txNums []uint64 - chainConfig *params.ChainConfig - logger log.Logger - genesis *core.Genesis - resultCh chan state.TxTask -} - -func NewReconWorker1(lock sync.Locker, wg *sync.WaitGroup, rs *state.ReconState1, - blockReader services.FullBlockReader, allSnapshots *snapshotsync.RoSnapshots, - txNums []uint64, chainConfig *params.ChainConfig, logger log.Logger, genesis *core.Genesis, - resultCh chan state.TxTask, -) *ReconWorker1 { - return &ReconWorker1{ - lock: lock, - wg: wg, - rs: rs, - blockReader: blockReader, - allSnapshots: allSnapshots, - ctx: context.Background(), - stateWriter: state.NewStateReconWriter1(rs), - stateReader: state.NewStateReconReader1(rs), - txNums: txNums, - chainConfig: chainConfig, - logger: logger, - genesis: genesis, - resultCh: resultCh, - } -} - -func (rw *ReconWorker1) SetTx(tx kv.Tx) { - rw.stateReader.SetTx(tx) -} - -func (rw *ReconWorker1) run() { - defer rw.wg.Done() - rw.getHeader = func(hash common.Hash, number uint64) *types.Header { - h, err := rw.blockReader.Header(rw.ctx, nil, hash, number) - if err != nil { - panic(err) - } - return h - } - rw.engine = initConsensusEngine(rw.chainConfig, rw.logger, rw.allSnapshots) - for txTask, ok := rw.rs.Schedule(); ok; txTask, ok = rw.rs.Schedule() { - rw.runTxTask(&txTask) - rw.resultCh <- txTask // Needs to have outside of the lock - } -} - -func (rw *ReconWorker1) runTxTask(txTask *state.TxTask) { - rw.lock.Lock() - defer rw.lock.Unlock() - txTask.Error = nil - rw.stateReader.SetTxNum(txTask.TxNum) - rw.stateWriter.SetTxNum(txTask.TxNum) - rw.stateReader.ResetReadSet() - rw.stateWriter.ResetWriteSet() - ibs := state.New(rw.stateReader) - daoForkTx := rw.chainConfig.DAOForkSupport && rw.chainConfig.DAOForkBlock != nil && rw.chainConfig.DAOForkBlock.Uint64() == txTask.BlockNum && txTask.TxIndex == -1 - var err error - if txTask.BlockNum == 0 && txTask.TxIndex == -1 { - //fmt.Printf("txNum=%d, blockNum=%d, Genesis\n", txTask.TxNum, txTask.BlockNum) - // Genesis block - _, ibs, err = rw.genesis.ToBlock() - if err != nil { - panic(err) - } - } else if daoForkTx { - //fmt.Printf("txNum=%d, blockNum=%d, DAO fork\n", txTask.TxNum, txTask.BlockNum) - misc.ApplyDAOHardFork(ibs) - ibs.SoftFinalise() - } else if txTask.TxIndex == -1 { - // Block initialisation - } else if txTask.Final { - if txTask.BlockNum > 0 { - //fmt.Printf("txNum=%d, blockNum=%d, finalisation of the block\n", txTask.TxNum, txTask.BlockNum) - // End of block transaction in a block - if _, _, err := rw.engine.Finalize(rw.chainConfig, txTask.Header, ibs, txTask.Block.Transactions(), txTask.Block.Uncles(), nil /* receipts */, nil, nil, nil); err != nil { - panic(fmt.Errorf("finalize of block %d failed: %w", txTask.BlockNum, err)) - } - } - } else { - //fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex) - txHash := txTask.Tx.Hash() - gp := new(core.GasPool).AddGas(txTask.Tx.GetGas()) - vmConfig := vm.Config{NoReceipts: true, SkipAnalysis: core.SkipAnalysis(rw.chainConfig, txTask.BlockNum)} - contractHasTEVM := func(contractHash common.Hash) (bool, error) { return false, nil } - ibs.Prepare(txHash, txTask.BlockHash, txTask.TxIndex) - getHashFn := core.GetHashFn(txTask.Header, rw.getHeader) - blockContext := core.NewEVMBlockContext(txTask.Header, getHashFn, rw.engine, nil /* author */, contractHasTEVM) - msg, err := txTask.Tx.AsMessage(*types.MakeSigner(rw.chainConfig, txTask.BlockNum), txTask.Header.BaseFee, txTask.Rules) - if err != nil { - panic(err) - } - txContext := core.NewEVMTxContext(msg) - vmenv := vm.NewEVM(blockContext, txContext, ibs, rw.chainConfig, vmConfig) - if _, err = core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */); err != nil { - txTask.Error = err - //fmt.Printf("error=%v\n", err) - } - // Update the state with pending changes - ibs.SoftFinalise() - } - // Prepare read set, write set and balanceIncrease set and send for serialisation - if txTask.Error == nil { - txTask.BalanceIncreaseSet = ibs.BalanceIncreaseSet() - //for addr, bal := range txTask.BalanceIncreaseSet { - // fmt.Printf("[%x]=>[%d]\n", addr, &bal) - //} - if err = ibs.MakeWriteSet(txTask.Rules, rw.stateWriter); err != nil { - panic(err) - } - txTask.ReadLists = rw.stateReader.ReadSet() - txTask.WriteLists = rw.stateWriter.WriteSet() - size := (20 + 32) * len(txTask.BalanceIncreaseSet) - for _, list := range txTask.ReadLists { - for _, b := range list.Keys { - size += len(b) - } - for _, b := range list.Vals { - size += len(b) - } - } - for _, list := range txTask.WriteLists { - for _, b := range list.Keys { - size += len(b) - } - for _, b := range list.Vals { - size += len(b) - } - } - txTask.ResultsSize = int64(size) - } -} - -func processResultQueue(rws *state.TxTaskQueue, outputTxNum *uint64, rs *state.ReconState1, applyTx kv.Tx, - triggerCount *uint64, outputBlockNum *uint64, repeatCount *uint64, resultsSize *int64) { - for rws.Len() > 0 && (*rws)[0].TxNum == *outputTxNum { - txTask := heap.Pop(rws).(state.TxTask) - atomic.AddInt64(resultsSize, -txTask.ResultsSize) - if txTask.Error == nil && rs.ReadsValid(txTask.ReadLists) { - if err := rs.Apply(txTask.Rules.IsSpuriousDragon, applyTx, txTask); err != nil { - panic(err) - } - *triggerCount += rs.CommitTxNum(txTask.Sender, txTask.TxNum) - *outputTxNum++ - *outputBlockNum = txTask.BlockNum - //fmt.Printf("Applied %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex) - } else { - rs.AddWork(txTask) - *repeatCount++ - //fmt.Printf("Rolled back %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex) - } - } -} - -func Recon1(genesis *core.Genesis, logger log.Logger) error { - sigs := make(chan os.Signal, 1) - interruptCh := make(chan bool, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - - go func() { - <-sigs - interruptCh <- true - }() - ctx := context.Background() - reconDbPath := path.Join(datadir, "recon1db") - var err error - if _, err = os.Stat(reconDbPath); err != nil { - if !errors.Is(err, os.ErrNotExist) { - return err - } - } else if err = os.RemoveAll(reconDbPath); err != nil { - return err - } - limiter := semaphore.NewWeighted(int64(runtime.NumCPU() + 1)) - db, err := kv2.NewMDBX(logger).Path(reconDbPath).RoTxsLimiter(limiter).Open() - if err != nil { - return err - } - startTime := time.Now() - var blockReader services.FullBlockReader - var allSnapshots *snapshotsync.RoSnapshots - allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots")) - defer allSnapshots.Close() - if err := allSnapshots.ReopenWithDB(db); err != nil { - return fmt.Errorf("reopen snapshot segments: %w", err) - } - blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) - // Compute mapping blockNum -> last TxNum in that block - txNums := make([]uint64, allSnapshots.BlocksAvailable()+1) - if err = allSnapshots.Bodies.View(func(bs []*snapshotsync.BodySegment) error { - for _, b := range bs { - if err = b.Iterate(func(blockNum, baseTxNum, txAmount uint64) { - txNums[blockNum] = baseTxNum + txAmount - }); err != nil { - return err - } - } - return nil - }); err != nil { - return fmt.Errorf("build txNum => blockNum mapping: %w", err) - } - blockNum := block + 1 - txNum := txNums[blockNum-1] - fmt.Printf("Corresponding block num = %d, txNum = %d\n", blockNum, txNum) - workerCount := runtime.NumCPU() - workCh := make(chan state.TxTask, 128) - rs := state.NewReconState1() - var lock sync.RWMutex - reconWorkers := make([]*ReconWorker1, workerCount) - var applyTx kv.Tx - defer func() { - if applyTx != nil { - applyTx.Rollback() - } - }() - if applyTx, err = db.BeginRo(ctx); err != nil { - return err - } - roTxs := make([]kv.Tx, workerCount) - defer func() { - for i := 0; i < workerCount; i++ { - if roTxs[i] != nil { - roTxs[i].Rollback() - } - } - }() - for i := 0; i < workerCount; i++ { - roTxs[i], err = db.BeginRo(ctx) - if err != nil { - return err - } - } - var wg sync.WaitGroup - resultCh := make(chan state.TxTask, 128) - for i := 0; i < workerCount; i++ { - reconWorkers[i] = NewReconWorker1(lock.RLocker(), &wg, rs, blockReader, allSnapshots, txNums, chainConfig, logger, genesis, resultCh) - reconWorkers[i].SetTx(roTxs[i]) - } - wg.Add(workerCount) - for i := 0; i < workerCount; i++ { - go reconWorkers[i].run() - } - commitThreshold := uint64(1024 * 1024 * 1024) - resultsThreshold := int64(1024 * 1024 * 1024) - count := uint64(0) - repeatCount := uint64(0) - triggerCount := uint64(0) - total := txNum - prevCount := uint64(0) - prevRepeatCount := uint64(0) - //prevTriggerCount := uint64(0) - resultsSize := int64(0) - prevTime := time.Now() - logEvery := time.NewTicker(logInterval) - defer logEvery.Stop() - var rws state.TxTaskQueue - var rwsLock sync.Mutex - rwsReceiveCond := sync.NewCond(&rwsLock) - heap.Init(&rws) - var outputTxNum uint64 - var inputBlockNum, outputBlockNum uint64 - var prevOutputBlockNum uint64 - // Go-routine gathering results from the workers - go func() { - defer rs.Finish() - for outputTxNum < txNum { - select { - case txTask := <-resultCh: - //fmt.Printf("Saved %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex) - func() { - rwsLock.Lock() - defer rwsLock.Unlock() - atomic.AddInt64(&resultsSize, txTask.ResultsSize) - heap.Push(&rws, txTask) - processResultQueue(&rws, &outputTxNum, rs, applyTx, &triggerCount, &outputBlockNum, &repeatCount, &resultsSize) - rwsReceiveCond.Signal() - }() - case <-logEvery.C: - var m runtime.MemStats - libcommon.ReadMemStats(&m) - sizeEstimate := rs.SizeEstimate() - count = rs.DoneCount() - currentTime := time.Now() - interval := currentTime.Sub(prevTime) - speedTx := float64(count-prevCount) / (float64(interval) / float64(time.Second)) - speedBlock := float64(outputBlockNum-prevOutputBlockNum) / (float64(interval) / float64(time.Second)) - progress := 100.0 * float64(count) / float64(total) - var repeatRatio float64 - if count > prevCount { - repeatRatio = 100.0 * float64(repeatCount-prevRepeatCount) / float64(count-prevCount) - } - log.Info("Transaction replay", - //"workers", workerCount, - "at block", outputBlockNum, - "input block", atomic.LoadUint64(&inputBlockNum), - "progress", fmt.Sprintf("%.2f%%", progress), - "blk/s", fmt.Sprintf("%.1f", speedBlock), - "tx/s", fmt.Sprintf("%.1f", speedTx), - //"repeats", repeatCount-prevRepeatCount, - //"triggered", triggerCount-prevTriggerCount, - "result queue", rws.Len(), - "results size", libcommon.ByteCount(uint64(atomic.LoadInt64(&resultsSize))), - "repeat ratio", fmt.Sprintf("%.2f%%", repeatRatio), - "buffer", libcommon.ByteCount(sizeEstimate), - "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys), - ) - prevTime = currentTime - prevCount = count - prevOutputBlockNum = outputBlockNum - prevRepeatCount = repeatCount - //prevTriggerCount = triggerCount - if sizeEstimate >= commitThreshold { - commitStart := time.Now() - log.Info("Committing...") - err := func() error { - rwsLock.Lock() - defer rwsLock.Unlock() - // Drain results (and process) channel because read sets do not carry over - for { - var drained bool - for !drained { - select { - case txTask := <-resultCh: - atomic.AddInt64(&resultsSize, txTask.ResultsSize) - heap.Push(&rws, txTask) - default: - drained = true - } - } - processResultQueue(&rws, &outputTxNum, rs, applyTx, &triggerCount, &outputBlockNum, &repeatCount, &resultsSize) - if rws.Len() == 0 { - break - } - } - rwsReceiveCond.Signal() - lock.Lock() // This is to prevent workers from starting work on any new txTask - defer lock.Unlock() - // Drain results channel because read sets do not carry over - var drained bool - for !drained { - select { - case txTask := <-resultCh: - rs.AddWork(txTask) - default: - drained = true - } - } - // Drain results queue as well - for rws.Len() > 0 { - txTask := heap.Pop(&rws).(state.TxTask) - atomic.AddInt64(&resultsSize, -txTask.ResultsSize) - rs.AddWork(txTask) - } - applyTx.Rollback() - for i := 0; i < workerCount; i++ { - roTxs[i].Rollback() - } - rwTx, err := db.BeginRw(ctx) - if err != nil { - return err - } - if err = rs.Flush(rwTx); err != nil { - return err - } - if err = rwTx.Commit(); err != nil { - return err - } - if applyTx, err = db.BeginRo(ctx); err != nil { - return err - } - for i := 0; i < workerCount; i++ { - if roTxs[i], err = db.BeginRo(ctx); err != nil { - return err - } - reconWorkers[i].SetTx(roTxs[i]) - } - return nil - }() - if err != nil { - panic(err) - } - log.Info("Committed", "time", time.Since(commitStart)) - } - } - } - }() - var inputTxNum uint64 - var header *types.Header - for blockNum := uint64(0); blockNum <= block; blockNum++ { - atomic.StoreUint64(&inputBlockNum, blockNum) - rules := chainConfig.Rules(blockNum) - if header, err = blockReader.HeaderByNumber(ctx, nil, blockNum); err != nil { - return err - } - blockHash := header.Hash() - b, _, err := blockReader.BlockWithSenders(ctx, nil, blockHash, blockNum) - if err != nil { - return err - } - txs := b.Transactions() - for txIndex := -1; txIndex <= len(txs); txIndex++ { - // Do not oversend, wait for the result heap to go under certain size - func() { - rwsLock.Lock() - defer rwsLock.Unlock() - for rws.Len() > 128 || atomic.LoadInt64(&resultsSize) >= resultsThreshold || rs.SizeEstimate() >= commitThreshold { - rwsReceiveCond.Wait() - } - }() - txTask := state.TxTask{ - Header: header, - BlockNum: blockNum, - Rules: rules, - Block: b, - TxNum: inputTxNum, - TxIndex: txIndex, - BlockHash: blockHash, - Final: txIndex == len(txs), - } - if txIndex >= 0 && txIndex < len(txs) { - txTask.Tx = txs[txIndex] - if sender, ok := txs[txIndex].GetSender(); ok { - txTask.Sender = &sender - } - if ok := rs.RegisterSender(txTask); ok { - rs.AddWork(txTask) - } - } else { - rs.AddWork(txTask) - } - inputTxNum++ - } - } - close(workCh) - wg.Wait() - applyTx.Rollback() - for i := 0; i < workerCount; i++ { - roTxs[i].Rollback() - } - rwTx, err := db.BeginRw(ctx) - if err != nil { - return err - } - defer func() { - if rwTx != nil { - rwTx.Rollback() - } - }() - if err = rs.Flush(rwTx); err != nil { - return err - } - if err = rwTx.Commit(); err != nil { - return err - } - if rwTx, err = db.BeginRw(ctx); err != nil { - return err - } - log.Info("Transaction replay complete", "duration", time.Since(startTime)) - log.Info("Computing hashed state") - tmpDir := filepath.Join(datadir, "tmp") - if err = stagedsync.PromoteHashedStateCleanly("recon", rwTx, stagedsync.StageHashStateCfg(db, tmpDir), ctx); err != nil { - return err - } - if err = rwTx.Commit(); err != nil { - return err - } - if rwTx, err = db.BeginRw(ctx); err != nil { - return err - } - var rootHash common.Hash - if rootHash, err = stagedsync.RegenerateIntermediateHashes("recon", rwTx, stagedsync.StageTrieCfg(db, false /* checkRoot */, false /* saveHashesToDB */, false /* badBlockHalt */, tmpDir, blockReader, nil /* HeaderDownload */), common.Hash{}, make(chan struct{}, 1)); err != nil { - return err - } - if err = rwTx.Commit(); err != nil { - return err - } - if rootHash != header.Root { - log.Error("Incorrect root hash", "expected", fmt.Sprintf("%x", header.Root)) - } - return nil -} diff --git a/core/state/history_reader_nostate.go b/core/state/history_reader_nostate.go index 4b489f42b..c07734929 100644 --- a/core/state/history_reader_nostate.go +++ b/core/state/history_reader_nostate.go @@ -19,7 +19,7 @@ func (r *RequiredStateError) Error() string { } type HistoryReaderNoState struct { - ac *libstate.AggregatorContext + ac *libstate.Aggregator22Context tx kv.Tx txNum uint64 trace bool @@ -29,7 +29,7 @@ type HistoryReaderNoState struct { composite []byte } -func NewHistoryReaderNoState(ac *libstate.AggregatorContext, rs *ReconState) *HistoryReaderNoState { +func NewHistoryReaderNoState(ac *libstate.Aggregator22Context, rs *ReconState) *HistoryReaderNoState { return &HistoryReaderNoState{ac: ac, rs: rs} } diff --git a/core/state/recon_state_1.go b/core/state/rw22.go similarity index 58% rename from core/state/recon_state_1.go rename to core/state/rw22.go index 249e454c5..1f75b73d4 100644 --- a/core/state/recon_state_1.go +++ b/core/state/rw22.go @@ -5,6 +5,7 @@ import ( "container/heap" "encoding/binary" "fmt" + "math/bits" "sort" "sync" "unsafe" @@ -13,6 +14,7 @@ import ( "github.com/holiman/uint256" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" + libstate "github.com/ledgerwatch/erigon-lib/state" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/core/types" @@ -37,6 +39,10 @@ type TxTask struct { BalanceIncreaseSet map[common.Address]uint256.Int ReadLists map[string]*KvList WriteLists map[string]*KvList + AccountPrevs map[string][]byte + AccountDels map[string]*accounts.Account + StoragePrevs map[string][]byte + CodePrevs map[string][]byte ResultsSize int64 Error error } @@ -67,7 +73,7 @@ func (h *TxTaskQueue) Pop() interface{} { const CodeSizeTable = "CodeSize" -type ReconState1 struct { +type State22 struct { lock sync.RWMutex receiveWork *sync.Cond triggers map[uint64]TxTask @@ -75,65 +81,65 @@ type ReconState1 struct { triggerLock sync.RWMutex queue TxTaskQueue queueLock sync.Mutex - changes map[string]*btree.BTreeG[ReconStateItem1] + changes map[string]*btree.BTreeG[StateItem] sizeEstimate uint64 txsDone uint64 finished bool } -type ReconStateItem1 struct { +type StateItem struct { key []byte val []byte } -func reconStateItem1Less(i, j ReconStateItem1) bool { +func stateItemLess(i, j StateItem) bool { return bytes.Compare(i.key, j.key) < 0 } -func NewReconState1() *ReconState1 { - rs := &ReconState1{ +func NewState22() *State22 { + rs := &State22{ triggers: map[uint64]TxTask{}, senderTxNums: map[common.Address]uint64{}, - changes: map[string]*btree.BTreeG[ReconStateItem1]{}, + changes: map[string]*btree.BTreeG[StateItem]{}, } rs.receiveWork = sync.NewCond(&rs.queueLock) return rs } -func (rs *ReconState1) put(table string, key, val []byte) { +func (rs *State22) put(table string, key, val []byte) { t, ok := rs.changes[table] if !ok { - t = btree.NewG[ReconStateItem1](32, reconStateItem1Less) + t = btree.NewG[StateItem](32, stateItemLess) rs.changes[table] = t } - item := ReconStateItem1{key: libcommon.Copy(key), val: libcommon.Copy(val)} + item := StateItem{key: key, val: val} t.ReplaceOrInsert(item) rs.sizeEstimate += uint64(unsafe.Sizeof(item)) + uint64(len(key)) + uint64(len(val)) } -func (rs *ReconState1) Get(table string, key []byte) []byte { +func (rs *State22) Get(table string, key []byte) []byte { rs.lock.RLock() defer rs.lock.RUnlock() return rs.get(table, key) } -func (rs *ReconState1) get(table string, key []byte) []byte { +func (rs *State22) get(table string, key []byte) []byte { t, ok := rs.changes[table] if !ok { return nil } - if i, ok := t.Get(ReconStateItem1{key: key}); ok { + if i, ok := t.Get(StateItem{key: key}); ok { return i.val } return nil } -func (rs *ReconState1) Flush(rwTx kv.RwTx) error { +func (rs *State22) Flush(rwTx kv.RwTx) error { rs.lock.Lock() defer rs.lock.Unlock() for table, t := range rs.changes { var err error - t.Ascend(func(item ReconStateItem1) bool { + t.Ascend(func(item StateItem) bool { if len(item.val) == 0 { if err = rwTx.Delete(table, item.key); err != nil { return false @@ -156,7 +162,7 @@ func (rs *ReconState1) Flush(rwTx kv.RwTx) error { return nil } -func (rs *ReconState1) Schedule() (TxTask, bool) { +func (rs *State22) Schedule() (TxTask, bool) { rs.queueLock.Lock() defer rs.queueLock.Unlock() for !rs.finished && rs.queue.Len() == 0 { @@ -168,7 +174,7 @@ func (rs *ReconState1) Schedule() (TxTask, bool) { return TxTask{}, false } -func (rs *ReconState1) RegisterSender(txTask TxTask) bool { +func (rs *State22) RegisterSender(txTask TxTask) bool { rs.triggerLock.Lock() defer rs.triggerLock.Unlock() lastTxNum, deferral := rs.senderTxNums[*txTask.Sender] @@ -183,7 +189,7 @@ func (rs *ReconState1) RegisterSender(txTask TxTask) bool { return !deferral } -func (rs *ReconState1) CommitTxNum(sender *common.Address, txNum uint64) uint64 { +func (rs *State22) CommitTxNum(sender *common.Address, txNum uint64) uint64 { rs.queueLock.Lock() defer rs.queueLock.Unlock() rs.triggerLock.Lock() @@ -205,7 +211,7 @@ func (rs *ReconState1) CommitTxNum(sender *common.Address, txNum uint64) uint64 return count } -func (rs *ReconState1) AddWork(txTask TxTask) { +func (rs *State22) AddWork(txTask TxTask) { txTask.BalanceIncreaseSet = nil txTask.ReadLists = nil txTask.WriteLists = nil @@ -216,72 +222,229 @@ func (rs *ReconState1) AddWork(txTask TxTask) { rs.receiveWork.Signal() } -func (rs *ReconState1) Finish() { +func (rs *State22) Finish() { rs.queueLock.Lock() defer rs.queueLock.Unlock() rs.finished = true rs.receiveWork.Broadcast() } -func (rs *ReconState1) Apply(emptyRemoval bool, roTx kv.Tx, txTask TxTask) error { - rs.lock.Lock() - defer rs.lock.Unlock() - if txTask.WriteLists != nil { - for table, list := range txTask.WriteLists { - for i, key := range list.Keys { - val := list.Vals[i] - rs.put(table, key, val) - } +func serialise2(a *accounts.Account) []byte { + var l int + l++ + if a.Nonce > 0 { + l += (bits.Len64(a.Nonce) + 7) / 8 + } + l++ + if !a.Balance.IsZero() { + l += a.Balance.ByteLen() + } + l++ + if !a.IsEmptyCodeHash() { + l += 32 + } + l++ + if a.Incarnation > 0 { + l += (bits.Len64(a.Incarnation) + 7) / 8 + } + value := make([]byte, l) + pos := 0 + if a.Nonce == 0 { + value[pos] = 0 + pos++ + } else { + nonceBytes := (bits.Len64(a.Nonce) + 7) / 8 + value[pos] = byte(nonceBytes) + var nonce = a.Nonce + for i := nonceBytes; i > 0; i-- { + value[pos+i] = byte(nonce) + nonce >>= 8 + } + pos += nonceBytes + 1 + } + if a.Balance.IsZero() { + value[pos] = 0 + pos++ + } else { + balanceBytes := a.Balance.ByteLen() + value[pos] = byte(balanceBytes) + pos++ + a.Balance.WriteToSlice(value[pos : pos+balanceBytes]) + pos += balanceBytes + } + if a.IsEmptyCodeHash() { + value[pos] = 0 + pos++ + } else { + value[pos] = 32 + pos++ + copy(value[pos:pos+32], a.CodeHash[:]) + pos += 32 + } + if a.Incarnation == 0 { + value[pos] = 0 + } else { + incBytes := (bits.Len64(a.Incarnation) + 7) / 8 + value[pos] = byte(incBytes) + var inc = a.Incarnation + for i := incBytes; i > 0; i-- { + value[pos+i] = byte(inc) + inc >>= 8 } } + return value +} + +func (rs *State22) Apply(emptyRemoval bool, roTx kv.Tx, txTask TxTask, agg *libstate.Aggregator22) error { + rs.lock.Lock() + defer rs.lock.Unlock() + agg.SetTxNum(txTask.TxNum) for addr, increase := range txTask.BalanceIncreaseSet { - //if increase.IsZero() { - // continue - //} - enc := rs.get(kv.PlainState, addr.Bytes()) - if enc == nil { + enc0 := rs.get(kv.PlainState, addr.Bytes()) + if enc0 == nil { var err error - enc, err = roTx.GetOne(kv.PlainState, addr.Bytes()) + enc0, err = roTx.GetOne(kv.PlainState, addr.Bytes()) if err != nil { return err } } var a accounts.Account - if err := a.DecodeForStorage(enc); err != nil { + if err := a.DecodeForStorage(enc0); err != nil { return err } + if len(enc0) > 0 { + // Need to convert before balance increase + enc0 = serialise2(&a) + } a.Balance.Add(&a.Balance, &increase) + var enc1 []byte if emptyRemoval && a.Nonce == 0 && a.Balance.IsZero() && a.IsEmptyCodeHash() { - enc = []byte{} + enc1 = []byte{} } else { l := a.EncodingLengthForStorage() - enc = make([]byte, l) - a.EncodeForStorage(enc) + enc1 = make([]byte, l) + a.EncodeForStorage(enc1) + } + rs.put(kv.PlainState, addr.Bytes(), enc1) + if err := agg.AddAccountPrev(addr.Bytes(), enc0); err != nil { + return err + } + } + for addrS, original := range txTask.AccountDels { + addr := []byte(addrS) + addr1 := make([]byte, len(addr)+8) + copy(addr1, addr) + binary.BigEndian.PutUint64(addr1[len(addr):], original.Incarnation) + prev := serialise2(original) + if err := agg.AddAccountPrev(addr, prev); err != nil { + return err + } + codePrev := rs.get(kv.Code, original.CodeHash.Bytes()) + if codePrev == nil { + var err error + codePrev, err = roTx.GetOne(kv.Code, original.CodeHash.Bytes()) + if err != nil { + return err + } + } + if err := agg.AddCodePrev(addr, codePrev); err != nil { + return err + } + // Iterate over storage + cursor, err := roTx.Cursor(kv.PlainState) + if err != nil { + return err + } + defer cursor.Close() + var k, v []byte + var e error + if k, v, e = cursor.Seek(addr1); err != nil { + return e + } + if !bytes.HasPrefix(k, addr1) { + k = nil + } + rs.changes[kv.PlainState].AscendGreaterOrEqual(StateItem{key: addr1}, func(item StateItem) bool { + if !bytes.HasPrefix(item.key, addr1) { + return false + } + for ; e == nil && k != nil && bytes.Compare(k, item.key) <= 0; k, v, e = cursor.Next() { + if !bytes.HasPrefix(k, addr1) { + k = nil + } + if !bytes.Equal(k, item.key) { + if e = agg.AddStoragePrev(addr, libcommon.Copy(k[28:]), libcommon.Copy(v)); e != nil { + return false + } + } + } + if e != nil { + return false + } + if e = agg.AddStoragePrev(addr, item.key[28:], item.val); e != nil { + return false + } + return true + }) + for ; e == nil && k != nil && bytes.HasPrefix(k, addr1); k, v, e = cursor.Next() { + if e = agg.AddStoragePrev(addr, libcommon.Copy(k[28:]), libcommon.Copy(v)); e != nil { + return e + } + } + if e != nil { + return e + } + } + for addrS, enc0 := range txTask.AccountPrevs { + if err := agg.AddAccountPrev([]byte(addrS), enc0); err != nil { + return err + } + } + for compositeS, val := range txTask.StoragePrevs { + composite := []byte(compositeS) + if err := agg.AddStoragePrev(composite[:20], composite[28:], val); err != nil { + return err + } + } + for addrS, val := range txTask.CodePrevs { + if err := agg.AddCodePrev([]byte(addrS), val); err != nil { + return err + } + } + if err := agg.FinishTx(); err != nil { + return err + } + if txTask.WriteLists == nil { + return nil + } + for table, list := range txTask.WriteLists { + for i, key := range list.Keys { + val := list.Vals[i] + rs.put(table, key, val) } - rs.put(kv.PlainState, addr.Bytes(), enc) } return nil } -func (rs *ReconState1) DoneCount() uint64 { +func (rs *State22) DoneCount() uint64 { rs.lock.RLock() defer rs.lock.RUnlock() return rs.txsDone } -func (rs *ReconState1) SizeEstimate() uint64 { +func (rs *State22) SizeEstimate() uint64 { rs.lock.RLock() defer rs.lock.RUnlock() return rs.sizeEstimate } -func (rs *ReconState1) ReadsValid(readLists map[string]*KvList) bool { +func (rs *State22) ReadsValid(readLists map[string]*KvList) bool { rs.lock.RLock() defer rs.lock.RUnlock() //fmt.Printf("ValidReads\n") for table, list := range readLists { //fmt.Printf("Table %s\n", table) - var t *btree.BTreeG[ReconStateItem1] + var t *btree.BTreeG[StateItem] var ok bool if table == CodeSizeTable { t, ok = rs.changes[kv.Code] @@ -293,7 +456,7 @@ func (rs *ReconState1) ReadsValid(readLists map[string]*KvList) bool { } for i, key := range list.Keys { val := list.Vals[i] - if item, ok := t.Get(ReconStateItem1{key: key}); ok { + if item, ok := t.Get(StateItem{key: key}); ok { //fmt.Printf("key [%x] => [%x] vs [%x]\n", key, val, rereadVal) if table == CodeSizeTable { if binary.BigEndian.Uint64(val) != uint64(len(item.val)) { @@ -328,14 +491,18 @@ func (l *KvList) Swap(i, j int) { l.Vals[i], l.Vals[j] = l.Vals[j], l.Vals[i] } -type StateReconWriter1 struct { - rs *ReconState1 - txNum uint64 - writeLists map[string]*KvList +type StateWriter22 struct { + rs *State22 + txNum uint64 + writeLists map[string]*KvList + accountPrevs map[string][]byte + accountDels map[string]*accounts.Account + storagePrevs map[string][]byte + codePrevs map[string][]byte } -func NewStateReconWriter1(rs *ReconState1) *StateReconWriter1 { - return &StateReconWriter1{ +func NewStateWriter22(rs *State22) *StateWriter22 { + return &StateWriter22{ rs: rs, writeLists: map[string]*KvList{ kv.PlainState: {}, @@ -343,39 +510,56 @@ func NewStateReconWriter1(rs *ReconState1) *StateReconWriter1 { kv.PlainContractCode: {}, kv.IncarnationMap: {}, }, + accountPrevs: map[string][]byte{}, + accountDels: map[string]*accounts.Account{}, + storagePrevs: map[string][]byte{}, + codePrevs: map[string][]byte{}, } } -func (w *StateReconWriter1) SetTxNum(txNum uint64) { +func (w *StateWriter22) SetTxNum(txNum uint64) { w.txNum = txNum } -func (w *StateReconWriter1) ResetWriteSet() { +func (w *StateWriter22) ResetWriteSet() { w.writeLists = map[string]*KvList{ kv.PlainState: {}, kv.Code: {}, kv.PlainContractCode: {}, kv.IncarnationMap: {}, } + w.accountPrevs = map[string][]byte{} + w.accountDels = map[string]*accounts.Account{} + w.storagePrevs = map[string][]byte{} + w.codePrevs = map[string][]byte{} } -func (w *StateReconWriter1) WriteSet() map[string]*KvList { +func (w *StateWriter22) WriteSet() map[string]*KvList { for _, list := range w.writeLists { sort.Sort(list) } return w.writeLists } -func (w *StateReconWriter1) UpdateAccountData(address common.Address, original, account *accounts.Account) error { +func (w *StateWriter22) PrevAndDels() (map[string][]byte, map[string]*accounts.Account, map[string][]byte, map[string][]byte) { + return w.accountPrevs, w.accountDels, w.storagePrevs, w.codePrevs +} + +func (w *StateWriter22) UpdateAccountData(address common.Address, original, account *accounts.Account) error { value := make([]byte, account.EncodingLengthForStorage()) account.EncodeForStorage(value) //fmt.Printf("account [%x]=>{Balance: %d, Nonce: %d, Root: %x, CodeHash: %x} txNum: %d\n", address, &account.Balance, account.Nonce, account.Root, account.CodeHash, w.txNum) w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, address.Bytes()) w.writeLists[kv.PlainState].Vals = append(w.writeLists[kv.PlainState].Vals, value) + var prev []byte + if original.Initialised { + prev = serialise2(original) + } + w.accountPrevs[string(address.Bytes())] = prev return nil } -func (w *StateReconWriter1) UpdateAccountCode(address common.Address, incarnation uint64, codeHash common.Hash, code []byte) error { +func (w *StateWriter22) UpdateAccountCode(address common.Address, incarnation uint64, codeHash common.Hash, code []byte) error { w.writeLists[kv.Code].Keys = append(w.writeLists[kv.Code].Keys, codeHash.Bytes()) w.writeLists[kv.Code].Vals = append(w.writeLists[kv.Code].Vals, code) if len(code) > 0 { @@ -383,10 +567,11 @@ func (w *StateReconWriter1) UpdateAccountCode(address common.Address, incarnatio w.writeLists[kv.PlainContractCode].Keys = append(w.writeLists[kv.PlainContractCode].Keys, dbutils.PlainGenerateStoragePrefix(address[:], incarnation)) w.writeLists[kv.PlainContractCode].Vals = append(w.writeLists[kv.PlainContractCode].Vals, codeHash.Bytes()) } + w.codePrevs[string(address.Bytes())] = nil return nil } -func (w *StateReconWriter1) DeleteAccount(address common.Address, original *accounts.Account) error { +func (w *StateWriter22) DeleteAccount(address common.Address, original *accounts.Account) error { w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, address.Bytes()) w.writeLists[kv.PlainState].Vals = append(w.writeLists[kv.PlainState].Vals, []byte{}) if original.Incarnation > 0 { @@ -395,36 +580,41 @@ func (w *StateReconWriter1) DeleteAccount(address common.Address, original *acco w.writeLists[kv.IncarnationMap].Keys = append(w.writeLists[kv.IncarnationMap].Keys, address.Bytes()) w.writeLists[kv.IncarnationMap].Vals = append(w.writeLists[kv.IncarnationMap].Vals, b[:]) } + if original.Initialised { + w.accountDels[string(address.Bytes())] = original + } return nil } -func (w *StateReconWriter1) WriteAccountStorage(address common.Address, incarnation uint64, key *common.Hash, original, value *uint256.Int) error { +func (w *StateWriter22) WriteAccountStorage(address common.Address, incarnation uint64, key *common.Hash, original, value *uint256.Int) error { if *original == *value { return nil } - w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), incarnation, key.Bytes())) + composite := dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), incarnation, key.Bytes()) + w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, composite) w.writeLists[kv.PlainState].Vals = append(w.writeLists[kv.PlainState].Vals, value.Bytes()) //fmt.Printf("storage [%x] [%x] => [%x], txNum: %d\n", address, *key, v, w.txNum) + w.storagePrevs[string(composite)] = original.Bytes() return nil } -func (w *StateReconWriter1) CreateContract(address common.Address) error { +func (w *StateWriter22) CreateContract(address common.Address) error { return nil } -type StateReconReader1 struct { +type StateReader22 struct { tx kv.Tx txNum uint64 trace bool - rs *ReconState1 + rs *State22 readError bool stateTxNum uint64 composite []byte readLists map[string]*KvList } -func NewStateReconReader1(rs *ReconState1) *StateReconReader1 { - return &StateReconReader1{ +func NewStateReader22(rs *State22) *StateReader22 { + return &StateReader22{ rs: rs, readLists: map[string]*KvList{ kv.PlainState: {}, @@ -435,15 +625,15 @@ func NewStateReconReader1(rs *ReconState1) *StateReconReader1 { } } -func (r *StateReconReader1) SetTxNum(txNum uint64) { +func (r *StateReader22) SetTxNum(txNum uint64) { r.txNum = txNum } -func (r *StateReconReader1) SetTx(tx kv.Tx) { +func (r *StateReader22) SetTx(tx kv.Tx) { r.tx = tx } -func (r *StateReconReader1) ResetReadSet() { +func (r *StateReader22) ResetReadSet() { r.readLists = map[string]*KvList{ kv.PlainState: {}, kv.Code: {}, @@ -452,18 +642,18 @@ func (r *StateReconReader1) ResetReadSet() { } } -func (r *StateReconReader1) ReadSet() map[string]*KvList { +func (r *StateReader22) ReadSet() map[string]*KvList { for _, list := range r.readLists { sort.Sort(list) } return r.readLists } -func (r *StateReconReader1) SetTrace(trace bool) { +func (r *StateReader22) SetTrace(trace bool) { r.trace = trace } -func (r *StateReconReader1) ReadAccountData(address common.Address) (*accounts.Account, error) { +func (r *StateReader22) ReadAccountData(address common.Address) (*accounts.Account, error) { enc := r.rs.Get(kv.PlainState, address.Bytes()) if enc == nil { var err error @@ -487,7 +677,7 @@ func (r *StateReconReader1) ReadAccountData(address common.Address) (*accounts.A return &a, nil } -func (r *StateReconReader1) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) { +func (r *StateReader22) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) { if cap(r.composite) < 20+8+32 { r.composite = make([]byte, 20+8+32) } else if len(r.composite) != 20+8+32 { @@ -520,7 +710,7 @@ func (r *StateReconReader1) ReadAccountStorage(address common.Address, incarnati return enc, nil } -func (r *StateReconReader1) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) { +func (r *StateReader22) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) { enc := r.rs.Get(kv.Code, codeHash.Bytes()) if enc == nil { var err error @@ -537,7 +727,7 @@ func (r *StateReconReader1) ReadAccountCode(address common.Address, incarnation return enc, nil } -func (r *StateReconReader1) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) { +func (r *StateReader22) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) { enc := r.rs.Get(kv.Code, codeHash.Bytes()) if enc == nil { var err error @@ -557,7 +747,7 @@ func (r *StateReconReader1) ReadAccountCodeSize(address common.Address, incarnat return size, nil } -func (r *StateReconReader1) ReadAccountIncarnation(address common.Address) (uint64, error) { +func (r *StateReader22) ReadAccountIncarnation(address common.Address) (uint64, error) { enc := r.rs.Get(kv.IncarnationMap, address.Bytes()) if enc == nil { var err error diff --git a/core/state/state_recon_writer.go b/core/state/state_recon_writer.go index 0d1c8e8e1..4363c25c1 100644 --- a/core/state/state_recon_writer.go +++ b/core/state/state_recon_writer.go @@ -188,12 +188,12 @@ func (rs *ReconState) SizeEstimate() uint64 { } type StateReconWriter struct { - ac *libstate.AggregatorContext + ac *libstate.Aggregator22Context rs *ReconState txNum uint64 } -func NewStateReconWriter(ac *libstate.AggregatorContext, rs *ReconState) *StateReconWriter { +func NewStateReconWriter(ac *libstate.Aggregator22Context, rs *ReconState) *StateReconWriter { return &StateReconWriter{ ac: ac, rs: rs, diff --git a/go.mod b/go.mod index 843d60a39..a9b33bea1 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon go 1.18 require ( - github.com/ledgerwatch/erigon-lib v0.0.0-20220727050920-4a04d9284002 + github.com/ledgerwatch/erigon-lib v0.0.0-20220728074713-fadc9b21d1dd github.com/ledgerwatch/erigon-snapshot v1.0.0 github.com/ledgerwatch/log/v3 v3.4.1 github.com/ledgerwatch/secp256k1 v1.0.0 diff --git a/go.sum b/go.sum index 66e3dae85..783d7f8dc 100644 --- a/go.sum +++ b/go.sum @@ -390,8 +390,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20220727050920-4a04d9284002 h1:37VrYyM+RBV6CCxnb3i0RW4euw//PO31ld+KqRi8EUs= -github.com/ledgerwatch/erigon-lib v0.0.0-20220727050920-4a04d9284002/go.mod h1:19wwSb5qbagorz9a4QN9FzNqSPjmOJkwa5TezGjloks= +github.com/ledgerwatch/erigon-lib v0.0.0-20220728074713-fadc9b21d1dd h1:N8Erhb3TNWyXBFYZD6tthmDaIhwWN4bVoATuoxJuSpA= +github.com/ledgerwatch/erigon-lib v0.0.0-20220728074713-fadc9b21d1dd/go.mod h1:19wwSb5qbagorz9a4QN9FzNqSPjmOJkwa5TezGjloks= github.com/ledgerwatch/erigon-snapshot v1.0.0 h1:bp/7xoPdM5lK7LFdqEMH008RZmqxMZV0RUVEQiWs7v4= github.com/ledgerwatch/erigon-snapshot v1.0.0/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc=