From 81d106bc9df901cf9c5a2cbae6aa695437ecd367 Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Sat, 23 Jul 2022 18:39:08 +0100 Subject: [PATCH] Experiment in parallel execution (#4652) * Restructure tx execution * fixes * Fixes and traces * Tracing * More tracing * Drain the result channel * Intermediate * more efficient parallel exec * Sorted buffer * Fix results size * fix for the recon * Fix compilation * Sort keys in Write and Read sets, fix compilation in rpcdaemon22 * Update to latest erigon-lib * Update to erigon-lib * Remove go.mod replace * Update erigon-lib * Update to erigon-lib main * Fix lint Co-authored-by: Alexey Sharp Co-authored-by: Alex Sharp --- cmd/rpcdaemon22/commands/eth_receipts.go | 12 +- cmd/rpcdaemon22/commands/trace_filtering.go | 7 +- cmd/state/commands/erigon22.go | 12 +- cmd/state/commands/history22.go | 2 +- cmd/state/commands/replay_tx.go | 195 +++++++++++ cmd/state/commands/state_recon.go | 61 ++-- cmd/state/commands/state_recon_1.go | 242 ++++++++++---- core/state/history_reader_22.go | 15 +- core/state/intra_block_state.go | 2 +- core/state/recon_state_1.go | 341 +++++++++++++------- go.mod | 2 +- go.sum | 4 +- 12 files changed, 654 insertions(+), 241 deletions(-) create mode 100644 cmd/state/commands/replay_tx.go diff --git a/cmd/rpcdaemon22/commands/eth_receipts.go b/cmd/rpcdaemon22/commands/eth_receipts.go index 1a72299a2..0dd4d4d6e 100644 --- a/cmd/rpcdaemon22/commands/eth_receipts.go +++ b/cmd/rpcdaemon22/commands/eth_receipts.go @@ -128,7 +128,9 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([ txNumbers := roaring64.New() txNumbers.AddRange(fromTxNum, toTxNum) // [min,max) - topicsBitmap, err := getTopicsBitmap(api._agg, tx, crit.Topics, fromTxNum, toTxNum) + ac := api._agg.MakeContext() + + topicsBitmap, err := getTopicsBitmap(ac, tx, crit.Topics, fromTxNum, toTxNum) if err != nil { return nil, err } @@ -139,7 +141,7 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([ var addrBitmap *roaring64.Bitmap for _, addr := range crit.Addresses { var bitmapForORing roaring64.Bitmap - it := api._agg.LogAddrIterator(addr.Bytes(), fromTxNum, toTxNum, nil) + it := ac.LogAddrIterator(addr.Bytes(), fromTxNum, toTxNum, nil) for it.HasNext() { bitmapForORing.Add(it.Next()) } @@ -162,7 +164,7 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([ var lastHeader *types.Header var lastSigner *types.Signer var lastRules *params.Rules - stateReader := state.NewHistoryReader22(api._agg, nil /* ReadIndices */) + stateReader := state.NewHistoryReader22(ac, nil /* ReadIndices */) iter := txNumbers.Iterator() for iter.HasNext() { txNum := iter.Next() @@ -233,12 +235,12 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([ // {{}, {B}} matches any topic in first position AND B in second position // {{A}, {B}} matches topic A in first position AND B in second position // {{A, B}, {C, D}} matches topic (A OR B) in first position AND (C OR D) in second position -func getTopicsBitmap(a *libstate.Aggregator, c kv.Tx, topics [][]common.Hash, from, to uint64) (*roaring64.Bitmap, error) { +func getTopicsBitmap(ac *libstate.AggregatorContext, c kv.Tx, topics [][]common.Hash, from, to uint64) (*roaring64.Bitmap, error) { var result *roaring64.Bitmap for _, sub := range topics { var bitmapForORing roaring64.Bitmap for _, topic := range sub { - it := a.LogTopicIterator(topic.Bytes(), from, to, nil) + it := ac.LogTopicIterator(topic.Bytes(), from, to, nil) for it.HasNext() { bitmapForORing.Add(it.Next()) } diff --git a/cmd/rpcdaemon22/commands/trace_filtering.go b/cmd/rpcdaemon22/commands/trace_filtering.go index 7dd353474..fc4568116 100644 --- a/cmd/rpcdaemon22/commands/trace_filtering.go +++ b/cmd/rpcdaemon22/commands/trace_filtering.go @@ -253,10 +253,11 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str allTxs roaring64.Bitmap txsTo roaring64.Bitmap ) + ac := api._agg.MakeContext() for _, addr := range req.FromAddress { if addr != nil { - it := api._agg.TraceFromIterator(addr.Bytes(), fromTxNum, toTxNum, nil) + it := ac.TraceFromIterator(addr.Bytes(), fromTxNum, toTxNum, nil) for it.HasNext() { allTxs.Add(it.Next()) } @@ -266,7 +267,7 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str for _, addr := range req.ToAddress { if addr != nil { - it := api._agg.TraceToIterator(addr.Bytes(), fromTxNum, toTxNum, nil) + it := ac.TraceToIterator(addr.Bytes(), fromTxNum, toTxNum, nil) for it.HasNext() { txsTo.Add(it.Next()) } @@ -319,7 +320,7 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str var lastHeader *types.Header var lastSigner *types.Signer var lastRules *params.Rules - stateReader := state.NewHistoryReader22(api._agg, nil /* ReadIndices */) + stateReader := state.NewHistoryReader22(ac, nil /* ReadIndices */) noop := state.NewNoopWriter() for it.HasNext() { txNum := uint64(it.Next()) diff --git a/cmd/state/commands/erigon22.go b/cmd/state/commands/erigon22.go index 0bcc8538c..3e0d82021 100644 --- a/cmd/state/commands/erigon22.go +++ b/cmd/state/commands/erigon22.go @@ -171,7 +171,7 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log } return h } - readWrapper := &ReaderWrapper22{r: agg, roTx: rwTx} + readWrapper := &ReaderWrapper22{ac: agg.MakeContext(), roTx: rwTx} writeWrapper := &WriterWrapper22{w: agg} for !interrupt { @@ -396,7 +396,7 @@ func processBlock22(startTxNum uint64, trace bool, txNumStart uint64, rw *Reader // Implements StateReader and StateWriter type ReaderWrapper22 struct { roTx kv.Tx - r *libstate.Aggregator + ac *libstate.AggregatorContext blockNum uint64 } @@ -406,7 +406,7 @@ type WriterWrapper22 struct { } func (rw *ReaderWrapper22) ReadAccountData(address common.Address) (*accounts.Account, error) { - enc, err := rw.r.ReadAccountData(address.Bytes(), rw.roTx) + enc, err := rw.ac.ReadAccountData(address.Bytes(), rw.roTx) if err != nil { return nil, err } @@ -444,7 +444,7 @@ func (rw *ReaderWrapper22) ReadAccountData(address common.Address) (*accounts.Ac } func (rw *ReaderWrapper22) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) { - enc, err := rw.r.ReadAccountStorage(address.Bytes(), key.Bytes(), rw.roTx) + enc, err := rw.ac.ReadAccountStorage(address.Bytes(), key.Bytes(), rw.roTx) if err != nil { return nil, err } @@ -458,11 +458,11 @@ func (rw *ReaderWrapper22) ReadAccountStorage(address common.Address, incarnatio } func (rw *ReaderWrapper22) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) { - return rw.r.ReadAccountCode(address.Bytes(), rw.roTx) + return rw.ac.ReadAccountCode(address.Bytes(), rw.roTx) } func (rw *ReaderWrapper22) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) { - return rw.r.ReadAccountCodeSize(address.Bytes(), rw.roTx) + return rw.ac.ReadAccountCodeSize(address.Bytes(), rw.roTx) } func (rw *ReaderWrapper22) ReadAccountIncarnation(address common.Address) (uint64, error) { diff --git a/cmd/state/commands/history22.go b/cmd/state/commands/history22.go index 02890f1f3..d1976cd7f 100644 --- a/cmd/state/commands/history22.go +++ b/cmd/state/commands/history22.go @@ -136,6 +136,7 @@ func History22(genesis *core.Genesis, logger log.Logger) error { return fmt.Errorf("reopen snapshot segments: %w", err) } blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) + readWrapper := state.NewHistoryReader22(h.MakeContext(), ri) for !interrupt { select { @@ -169,7 +170,6 @@ func History22(genesis *core.Genesis, logger log.Logger) error { txNum += uint64(len(b.Transactions())) + 2 // Pre and Post block transaction continue } - readWrapper := state.NewHistoryReader22(h, ri) if traceBlock != 0 { readWrapper.SetTrace(blockNum == uint64(traceBlock)) } diff --git a/cmd/state/commands/replay_tx.go b/cmd/state/commands/replay_tx.go new file mode 100644 index 000000000..e1c555259 --- /dev/null +++ b/cmd/state/commands/replay_tx.go @@ -0,0 +1,195 @@ +package commands + +import ( + "context" + "fmt" + "path" + "path/filepath" + "sort" + + "github.com/ledgerwatch/erigon-lib/kv/memdb" + libstate "github.com/ledgerwatch/erigon-lib/state" + "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon/core" + "github.com/ledgerwatch/erigon/core/state" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/core/vm" + "github.com/ledgerwatch/erigon/eth/ethconfig" + "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/erigon/turbo/snapshotsync" + "github.com/ledgerwatch/log/v3" + "github.com/spf13/cobra" +) + +var txhash string +var txnum uint64 + +func init() { + withDataDir(replayTxCmd) + rootCmd.AddCommand(replayTxCmd) + replayTxCmd.Flags().StringVar(&txhash, "txhash", "", "hash of the transaction to replay") + replayTxCmd.Flags().Uint64Var(&txnum, "txnum", 0, "tx num for replay") +} + +var replayTxCmd = &cobra.Command{ + Use: "replaytx", + Short: "Experimental command to replay a given transaction using only history", + RunE: func(cmd *cobra.Command, args []string) error { + return ReplayTx(genesis) + }, +} + +func ReplayTx(genesis *core.Genesis) error { + var blockReader services.FullBlockReader + var allSnapshots *snapshotsync.RoSnapshots + allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots")) + defer allSnapshots.Close() + if err := allSnapshots.Reopen(); err != nil { + return fmt.Errorf("reopen snapshot segments: %w", err) + } + blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) + // Compute mapping blockNum -> last TxNum in that block + txNums := make([]uint64, allSnapshots.BlocksAvailable()+1) + if err := allSnapshots.Bodies.View(func(bs []*snapshotsync.BodySegment) error { + for _, b := range bs { + if err := b.Iterate(func(blockNum, baseTxNum, txAmount uint64) { + txNums[blockNum] = baseTxNum + txAmount + }); err != nil { + return err + } + } + return nil + }); err != nil { + return fmt.Errorf("build txNum => blockNum mapping: %w", err) + } + ctx := context.Background() + var txNum uint64 + if txhash != "" { + txnHash := common.HexToHash(txhash) + fmt.Printf("Tx hash = [%x]\n", txnHash) + db := memdb.New() + roTx, err := db.BeginRo(ctx) + if err != nil { + return err + } + defer roTx.Rollback() + bn, ok, err := blockReader.TxnLookup(ctx, roTx, txnHash) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("transaction not found") + } + fmt.Printf("Found in block %d\n", bn) + var header *types.Header + if header, err = blockReader.HeaderByNumber(ctx, nil, bn); err != nil { + return err + } + blockHash := header.Hash() + b, _, err := blockReader.BlockWithSenders(ctx, nil, blockHash, bn) + if err != nil { + return err + } + txs := b.Transactions() + var txIndex int + for txIndex = 0; txIndex < len(txs); txIndex++ { + if txs[txIndex].Hash() == txnHash { + fmt.Printf("txIndex = %d\n", txIndex) + break + } + } + txNum = txNums[bn-1] + 1 + uint64(txIndex) + } else { + txNum = txnum + } + fmt.Printf("txNum = %d\n", txNum) + aggPath := filepath.Join(datadir, "erigon23") + agg, err := libstate.NewAggregator(aggPath, AggregationStep) + if err != nil { + return fmt.Errorf("create history: %w", err) + } + defer agg.Close() + ac := agg.MakeContext() + workCh := make(chan state.TxTask) + rs := state.NewReconState(workCh) + if err = replayTxNum(ctx, allSnapshots, blockReader, txNum, txNums, rs, ac); err != nil { + return err + } + return nil +} + +func replayTxNum(ctx context.Context, allSnapshots *snapshotsync.RoSnapshots, blockReader services.FullBlockReader, + txNum uint64, txNums []uint64, rs *state.ReconState, ac *libstate.AggregatorContext, +) error { + bn := uint64(sort.Search(len(txNums), func(i int) bool { + return txNums[i] > txNum + })) + txIndex := int(txNum - txNums[bn-1] - 1) + fmt.Printf("bn=%d, txIndex=%d\n", bn, txIndex) + var header *types.Header + var err error + if header, err = blockReader.HeaderByNumber(ctx, nil, bn); err != nil { + return err + } + blockHash := header.Hash() + b, _, err := blockReader.BlockWithSenders(ctx, nil, blockHash, bn) + if err != nil { + return err + } + txn := b.Transactions()[txIndex] + stateWriter := state.NewStateReconWriter(ac, rs) + stateReader := state.NewHistoryReaderNoState(ac, rs) + stateReader.SetTxNum(txNum) + stateWriter.SetTxNum(txNum) + noop := state.NewNoopWriter() + rules := chainConfig.Rules(bn) + for { + stateReader.ResetError() + ibs := state.New(stateReader) + gp := new(core.GasPool).AddGas(txn.GetGas()) + //fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d, gas=%d, input=[%x]\n", txNum, blockNum, txIndex, txn.GetGas(), txn.GetData()) + vmConfig := vm.Config{NoReceipts: true, SkipAnalysis: core.SkipAnalysis(chainConfig, bn)} + contractHasTEVM := func(contractHash common.Hash) (bool, error) { return false, nil } + getHeader := func(hash common.Hash, number uint64) *types.Header { + h, err := blockReader.Header(ctx, nil, hash, number) + if err != nil { + panic(err) + } + return h + } + getHashFn := core.GetHashFn(header, getHeader) + logger := log.New() + engine := initConsensusEngine(chainConfig, logger, allSnapshots) + txnHash := txn.Hash() + blockContext := core.NewEVMBlockContext(header, getHashFn, engine, nil /* author */, contractHasTEVM) + ibs.Prepare(txnHash, blockHash, txIndex) + msg, err := txn.AsMessage(*types.MakeSigner(chainConfig, bn), header.BaseFee, rules) + if err != nil { + return err + } + txContext := core.NewEVMTxContext(msg) + vmenv := vm.NewEVM(blockContext, txContext, ibs, chainConfig, vmConfig) + + _, err = core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */) + if err != nil { + return fmt.Errorf("could not apply tx %d [%x] failed: %w", txIndex, txnHash, err) + } + if err = ibs.FinalizeTx(rules, noop); err != nil { + return err + } + if dependency, ok := stateReader.ReadError(); ok { + fmt.Printf("dependency %d on %d\n", txNum, dependency) + if err = replayTxNum(ctx, allSnapshots, blockReader, dependency, txNums, rs, ac); err != nil { + return err + } + } else { + if err = ibs.CommitBlock(rules, stateWriter); err != nil { + return err + } + break + } + } + rs.CommitTxNum(txNum) + fmt.Printf("commited %d\n", txNum) + return nil +} diff --git a/cmd/state/commands/state_recon.go b/cmd/state/commands/state_recon.go index db5ea0462..12a214838 100644 --- a/cmd/state/commands/state_recon.go +++ b/cmd/state/commands/state_recon.go @@ -131,9 +131,7 @@ func (rw *ReconWorker) runTxTask(txTask state.TxTask) { } else if daoForkTx { //fmt.Printf("txNum=%d, blockNum=%d, DAO fork\n", txNum, blockNum) misc.ApplyDAOHardFork(ibs) - if err := ibs.FinalizeTx(rules, noop); err != nil { - panic(err) - } + ibs.SoftFinalise() } else if txTask.Final { if txTask.BlockNum > 0 { //fmt.Printf("txNum=%d, blockNum=%d, finalisation of the block\n", txNum, blockNum) @@ -147,16 +145,25 @@ func (rw *ReconWorker) runTxTask(txTask state.TxTask) { } else { txHash := txTask.Tx.Hash() gp := new(core.GasPool).AddGas(txTask.Tx.GetGas()) - //fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d, gas=%d, input=[%x]\n", txNum, blockNum, txIndex, txn.GetGas(), txn.GetData()) - usedGas := new(uint64) vmConfig := vm.Config{NoReceipts: true, SkipAnalysis: core.SkipAnalysis(rw.chainConfig, txTask.BlockNum)} contractHasTEVM := func(contractHash common.Hash) (bool, error) { return false, nil } - ibs.Prepare(txHash, txTask.BlockHash, txTask.TxIndex) getHashFn := core.GetHashFn(txTask.Header, rw.getHeader) - _, _, err = core.ApplyTransaction(rw.chainConfig, getHashFn, rw.engine, nil, gp, ibs, noop, txTask.Header, txTask.Tx, usedGas, vmConfig, contractHasTEVM) + blockContext := core.NewEVMBlockContext(txTask.Header, getHashFn, rw.engine, nil /* author */, contractHasTEVM) + ibs.Prepare(txHash, txTask.BlockHash, txTask.TxIndex) + msg, err := txTask.Tx.AsMessage(*types.MakeSigner(rw.chainConfig, txTask.BlockNum), txTask.Header.BaseFee, rules) + if err != nil { + panic(err) + } + txContext := core.NewEVMTxContext(msg) + vmenv := vm.NewEVM(blockContext, txContext, ibs, rw.chainConfig, vmConfig) + //fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d, evm=%p\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex, vmenv) + _, err = core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */) if err != nil { panic(fmt.Errorf("could not apply tx %d [%x] failed: %w", txTask.TxIndex, txHash, err)) } + if err = ibs.FinalizeTx(rules, noop); err != nil { + panic(err) + } } if dependency, ok := rw.stateReader.ReadError(); ok { //fmt.Printf("rollback %d\n", txNum) @@ -259,12 +266,8 @@ func (fw *FillWorker) fillStorage(plainStateCollector *etl.Collector) { fw.currentKey = key compositeKey := dbutils.PlainGenerateCompositeStorageKey(key[:20], state.FirstContractIncarnation, key[20:]) if len(val) > 0 { - if len(val) > 1 || val[0] != 0 { - if err := plainStateCollector.Collect(compositeKey, val); err != nil { - panic(err) - } - } else { - fmt.Printf("Storage [%x] => [%x]\n", compositeKey, val) + if err := plainStateCollector.Collect(compositeKey, val); err != nil { + panic(err) } //fmt.Printf("Storage [%x] => [%x]\n", compositeKey, val) } @@ -283,19 +286,15 @@ func (fw *FillWorker) fillCode(codeCollector, plainContractCollector *etl.Collec fw.currentKey = key compositeKey := dbutils.PlainGenerateStoragePrefix(key, state.FirstContractIncarnation) if len(val) > 0 { - if len(val) > 1 || val[0] != 0 { - codeHash, err := common.HashData(val) - if err != nil { - panic(err) - } - if err = codeCollector.Collect(codeHash[:], val); err != nil { - panic(err) - } - if err = plainContractCollector.Collect(compositeKey, codeHash[:]); err != nil { - panic(err) - } - } else { - fmt.Printf("Code [%x] => [%x]\n", compositeKey, val) + codeHash, err := common.HashData(val) + if err != nil { + panic(err) + } + if err = codeCollector.Collect(codeHash[:], val); err != nil { + panic(err) + } + if err = plainContractCollector.Collect(compositeKey, codeHash[:]); err != nil { + panic(err) } //fmt.Printf("Code [%x] => [%x]\n", compositeKey, val) } @@ -600,9 +599,9 @@ func Recon(genesis *core.Genesis, logger log.Logger) error { } }() var inputTxNum uint64 + var header *types.Header for bn := uint64(0); bn < blockNum; bn++ { - header, err := blockReader.HeaderByNumber(ctx, nil, bn) - if err != nil { + if header, err = blockReader.HeaderByNumber(ctx, nil, bn); err != nil { panic(err) } blockHash := header.Hash() @@ -851,11 +850,15 @@ func Recon(genesis *core.Genesis, logger log.Logger) error { if rwTx, err = db.BeginRw(ctx); err != nil { return err } - if _, err = stagedsync.RegenerateIntermediateHashes("recon", rwTx, stagedsync.StageTrieCfg(db, false /* checkRoot */, false /* saveHashesToDB */, false /* badBlockHalt */, tmpDir, blockReader, nil /* HeaderDownload */), common.Hash{}, make(chan struct{}, 1)); err != nil { + var rootHash common.Hash + if rootHash, err = stagedsync.RegenerateIntermediateHashes("recon", rwTx, stagedsync.StageTrieCfg(db, false /* checkRoot */, false /* saveHashesToDB */, false /* badBlockHalt */, tmpDir, blockReader, nil /* HeaderDownload */), common.Hash{}, make(chan struct{}, 1)); err != nil { return err } if err = rwTx.Commit(); err != nil { return err } + if rootHash != header.Root { + log.Error("Incorrect root hash", "expected", fmt.Sprintf("%x", header.Root)) + } return nil } diff --git a/cmd/state/commands/state_recon_1.go b/cmd/state/commands/state_recon_1.go index b149406a8..0ae69fe40 100644 --- a/cmd/state/commands/state_recon_1.go +++ b/cmd/state/commands/state_recon_1.go @@ -11,6 +11,7 @@ import ( "path/filepath" "runtime" "sync" + "sync/atomic" "syscall" "time" @@ -31,6 +32,7 @@ import ( "github.com/ledgerwatch/erigon/turbo/snapshotsync" "github.com/ledgerwatch/log/v3" "github.com/spf13/cobra" + "golang.org/x/sync/semaphore" ) func init() { @@ -103,11 +105,12 @@ func (rw *ReconWorker1) run() { } rw.engine = initConsensusEngine(rw.chainConfig, rw.logger, rw.allSnapshots) for txTask, ok := rw.rs.Schedule(); ok; txTask, ok = rw.rs.Schedule() { - rw.runTxTask(txTask) + rw.runTxTask(&txTask) + rw.resultCh <- txTask // Needs to have outside of the lock } } -func (rw *ReconWorker1) runTxTask(txTask state.TxTask) { +func (rw *ReconWorker1) runTxTask(txTask *state.TxTask) { rw.lock.Lock() defer rw.lock.Unlock() txTask.Error = nil @@ -115,74 +118,103 @@ func (rw *ReconWorker1) runTxTask(txTask state.TxTask) { rw.stateWriter.SetTxNum(txTask.TxNum) rw.stateReader.ResetReadSet() rw.stateWriter.ResetWriteSet() - rules := rw.chainConfig.Rules(txTask.BlockNum) ibs := state.New(rw.stateReader) daoForkTx := rw.chainConfig.DAOForkSupport && rw.chainConfig.DAOForkBlock != nil && rw.chainConfig.DAOForkBlock.Uint64() == txTask.BlockNum && txTask.TxIndex == -1 var err error if txTask.BlockNum == 0 && txTask.TxIndex == -1 { - fmt.Printf("txNum=%d, blockNum=%d, Genesis\n", txTask.TxNum, txTask.BlockNum) + //fmt.Printf("txNum=%d, blockNum=%d, Genesis\n", txTask.TxNum, txTask.BlockNum) // Genesis block _, ibs, err = rw.genesis.ToBlock() if err != nil { panic(err) } } else if daoForkTx { - fmt.Printf("txNum=%d, blockNum=%d, DAO fork\n", txTask.TxNum, txTask.BlockNum) + //fmt.Printf("txNum=%d, blockNum=%d, DAO fork\n", txTask.TxNum, txTask.BlockNum) misc.ApplyDAOHardFork(ibs) ibs.SoftFinalise() } else if txTask.TxIndex == -1 { // Block initialisation } else if txTask.Final { if txTask.BlockNum > 0 { - fmt.Printf("txNum=%d, blockNum=%d, finalisation of the block\n", txTask.TxNum, txTask.BlockNum) + //fmt.Printf("txNum=%d, blockNum=%d, finalisation of the block\n", txTask.TxNum, txTask.BlockNum) // End of block transaction in a block if _, _, err := rw.engine.Finalize(rw.chainConfig, txTask.Header, ibs, txTask.Block.Transactions(), txTask.Block.Uncles(), nil /* receipts */, nil, nil, nil); err != nil { panic(fmt.Errorf("finalize of block %d failed: %w", txTask.BlockNum, err)) } } } else { - fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex) + //fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex) txHash := txTask.Tx.Hash() gp := new(core.GasPool).AddGas(txTask.Tx.GetGas()) - //fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d, gas=%d, input=[%x]\n", txNum, blockNum, txIndex, txn.GetGas(), txn.GetData()) - usedGas := new(uint64) vmConfig := vm.Config{NoReceipts: true, SkipAnalysis: core.SkipAnalysis(rw.chainConfig, txTask.BlockNum)} contractHasTEVM := func(contractHash common.Hash) (bool, error) { return false, nil } ibs.Prepare(txHash, txTask.BlockHash, txTask.TxIndex) - vmConfig.SkipAnalysis = core.SkipAnalysis(rw.chainConfig, txTask.BlockNum) getHashFn := core.GetHashFn(txTask.Header, rw.getHeader) blockContext := core.NewEVMBlockContext(txTask.Header, getHashFn, rw.engine, nil /* author */, contractHasTEVM) - vmenv := vm.NewEVM(blockContext, vm.TxContext{}, ibs, rw.chainConfig, vmConfig) - msg, err := txTask.Tx.AsMessage(*types.MakeSigner(rw.chainConfig, txTask.BlockNum), txTask.Header.BaseFee, rules) + msg, err := txTask.Tx.AsMessage(*types.MakeSigner(rw.chainConfig, txTask.BlockNum), txTask.Header.BaseFee, txTask.Rules) if err != nil { panic(err) } txContext := core.NewEVMTxContext(msg) - - // Update the evm with the new transaction context. - vmenv.Reset(txContext, ibs) - - result, err := core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */) - if err != nil { + vmenv := vm.NewEVM(blockContext, txContext, ibs, rw.chainConfig, vmConfig) + if _, err = core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */); err != nil { txTask.Error = err + //fmt.Printf("error=%v\n", err) } // Update the state with pending changes ibs.SoftFinalise() - *usedGas += result.UsedGas } // Prepare read set, write set and balanceIncrease set and send for serialisation if txTask.Error == nil { txTask.BalanceIncreaseSet = ibs.BalanceIncreaseSet() - for addr, bal := range txTask.BalanceIncreaseSet { - fmt.Printf("[%x]=>[%d]\n", addr, &bal) - } - if err = ibs.MakeWriteSet(rules, rw.stateWriter); err != nil { + //for addr, bal := range txTask.BalanceIncreaseSet { + // fmt.Printf("[%x]=>[%d]\n", addr, &bal) + //} + if err = ibs.MakeWriteSet(txTask.Rules, rw.stateWriter); err != nil { panic(err) } - txTask.ReadKeys, txTask.ReadVals = rw.stateReader.ReadSet() - txTask.WriteKeys, txTask.WriteVals = rw.stateWriter.WriteSet() + txTask.ReadLists = rw.stateReader.ReadSet() + txTask.WriteLists = rw.stateWriter.WriteSet() + size := (20 + 32) * len(txTask.BalanceIncreaseSet) + for _, list := range txTask.ReadLists { + for _, b := range list.Keys { + size += len(b) + } + for _, b := range list.Vals { + size += len(b) + } + } + for _, list := range txTask.WriteLists { + for _, b := range list.Keys { + size += len(b) + } + for _, b := range list.Vals { + size += len(b) + } + } + txTask.ResultsSize = int64(size) + } +} + +func processResultQueue(rws *state.TxTaskQueue, outputTxNum *uint64, rs *state.ReconState1, applyTx kv.Tx, + triggerCount *uint64, outputBlockNum *uint64, repeatCount *uint64, resultsSize *int64) { + for rws.Len() > 0 && (*rws)[0].TxNum == *outputTxNum { + txTask := heap.Pop(rws).(state.TxTask) + atomic.AddInt64(resultsSize, -txTask.ResultsSize) + if txTask.Error == nil && rs.ReadsValid(txTask.ReadLists) { + if err := rs.Apply(txTask.Rules.IsSpuriousDragon, applyTx, txTask); err != nil { + panic(err) + } + *triggerCount += rs.CommitTxNum(txTask.Sender, txTask.TxNum) + *outputTxNum++ + *outputBlockNum = txTask.BlockNum + //fmt.Printf("Applied %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex) + } else { + rs.AddWork(txTask) + *repeatCount++ + //fmt.Printf("Rolled back %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex) + } } - rw.resultCh <- txTask } func Recon1(genesis *core.Genesis, logger log.Logger) error { @@ -204,7 +236,8 @@ func Recon1(genesis *core.Genesis, logger log.Logger) error { } else if err = os.RemoveAll(reconDbPath); err != nil { return err } - db, err := kv2.NewMDBX(logger).Path(reconDbPath).WriteMap().Open() + limiter := semaphore.NewWeighted(int64(runtime.NumCPU() + 1)) + db, err := kv2.NewMDBX(logger).Path(reconDbPath).RoTxsLimiter(limiter).Open() if err != nil { return err } @@ -236,9 +269,18 @@ func Recon1(genesis *core.Genesis, logger log.Logger) error { fmt.Printf("Corresponding block num = %d, txNum = %d\n", blockNum, txNum) workerCount := runtime.NumCPU() workCh := make(chan state.TxTask, 128) - rs := state.NewReconState1(workCh) + rs := state.NewReconState1() var lock sync.RWMutex reconWorkers := make([]*ReconWorker1, workerCount) + var applyTx kv.Tx + defer func() { + if applyTx != nil { + applyTx.Rollback() + } + }() + if applyTx, err = db.BeginRo(ctx); err != nil { + return err + } roTxs := make([]kv.Tx, workerCount) defer func() { for i := 0; i < workerCount; i++ { @@ -263,71 +305,118 @@ func Recon1(genesis *core.Genesis, logger log.Logger) error { for i := 0; i < workerCount; i++ { go reconWorkers[i].run() } - commitThreshold := uint64(256 * 1024 * 1024) + commitThreshold := uint64(1024 * 1024 * 1024) + resultsThreshold := int64(1024 * 1024 * 1024) count := uint64(0) - rollbackCount := uint64(0) + repeatCount := uint64(0) + triggerCount := uint64(0) total := txNum prevCount := uint64(0) - prevRollbackCount := uint64(0) + prevRepeatCount := uint64(0) + //prevTriggerCount := uint64(0) + resultsSize := int64(0) prevTime := time.Now() logEvery := time.NewTicker(logInterval) defer logEvery.Stop() var rws state.TxTaskQueue + var rwsLock sync.Mutex + rwsReceiveCond := sync.NewCond(&rwsLock) heap.Init(&rws) var outputTxNum uint64 + var inputBlockNum, outputBlockNum uint64 + var prevOutputBlockNum uint64 // Go-routine gathering results from the workers go func() { + defer rs.Finish() for outputTxNum < txNum { select { case txTask := <-resultCh: - if txTask.TxNum == outputTxNum { - // Try to apply without placing on the queue first - if txTask.Error == nil && rs.ReadsValid(txTask.ReadKeys, txTask.ReadVals) { - rs.Apply(txTask.WriteKeys, txTask.WriteVals, txTask.BalanceIncreaseSet) - rs.CommitTxNum(txTask.Sender, txTask.TxNum) - outputTxNum++ - } else { - rs.RollbackTx(txTask) - } - } else { + //fmt.Printf("Saved %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex) + func() { + rwsLock.Lock() + defer rwsLock.Unlock() + atomic.AddInt64(&resultsSize, txTask.ResultsSize) heap.Push(&rws, txTask) - } - for rws.Len() > 0 && rws[0].TxNum == outputTxNum { - txTask = heap.Pop(&rws).(state.TxTask) - if txTask.Error == nil && rs.ReadsValid(txTask.ReadKeys, txTask.ReadVals) { - rs.Apply(txTask.WriteKeys, txTask.WriteVals, txTask.BalanceIncreaseSet) - rs.CommitTxNum(txTask.Sender, txTask.TxNum) - outputTxNum++ - } else { - rs.RollbackTx(txTask) - } - } + processResultQueue(&rws, &outputTxNum, rs, applyTx, &triggerCount, &outputBlockNum, &repeatCount, &resultsSize) + rwsReceiveCond.Signal() + }() case <-logEvery.C: var m runtime.MemStats libcommon.ReadMemStats(&m) sizeEstimate := rs.SizeEstimate() count = rs.DoneCount() - rollbackCount = rs.RollbackCount() currentTime := time.Now() interval := currentTime.Sub(prevTime) speedTx := float64(count-prevCount) / (float64(interval) / float64(time.Second)) + speedBlock := float64(outputBlockNum-prevOutputBlockNum) / (float64(interval) / float64(time.Second)) progress := 100.0 * float64(count) / float64(total) var repeatRatio float64 if count > prevCount { - repeatRatio = 100.0 * float64(rollbackCount-prevRollbackCount) / float64(count-prevCount) + repeatRatio = 100.0 * float64(repeatCount-prevRepeatCount) / float64(count-prevCount) } - prevTime = currentTime - prevCount = count - prevRollbackCount = rollbackCount - log.Info("Transaction replay", "workers", workerCount, "progress", fmt.Sprintf("%.2f%%", progress), "tx/s", fmt.Sprintf("%.1f", speedTx), "repeat ratio", fmt.Sprintf("%.2f%%", repeatRatio), "buffer", libcommon.ByteCount(sizeEstimate), + log.Info("Transaction replay", + //"workers", workerCount, + "at block", outputBlockNum, + "input block", atomic.LoadUint64(&inputBlockNum), + "progress", fmt.Sprintf("%.2f%%", progress), + "blk/s", fmt.Sprintf("%.1f", speedBlock), + "tx/s", fmt.Sprintf("%.1f", speedTx), + //"repeats", repeatCount-prevRepeatCount, + //"triggered", triggerCount-prevTriggerCount, + "result queue", rws.Len(), + "results size", libcommon.ByteCount(uint64(atomic.LoadInt64(&resultsSize))), + "repeat ratio", fmt.Sprintf("%.2f%%", repeatRatio), + "buffer", libcommon.ByteCount(sizeEstimate), "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys), ) + prevTime = currentTime + prevCount = count + prevOutputBlockNum = outputBlockNum + prevRepeatCount = repeatCount + //prevTriggerCount = triggerCount if sizeEstimate >= commitThreshold { commitStart := time.Now() log.Info("Committing...") err := func() error { - lock.Lock() + rwsLock.Lock() + defer rwsLock.Unlock() + // Drain results (and process) channel because read sets do not carry over + for { + var drained bool + for !drained { + select { + case txTask := <-resultCh: + atomic.AddInt64(&resultsSize, txTask.ResultsSize) + heap.Push(&rws, txTask) + default: + drained = true + } + } + processResultQueue(&rws, &outputTxNum, rs, applyTx, &triggerCount, &outputBlockNum, &repeatCount, &resultsSize) + if rws.Len() == 0 { + break + } + } + rwsReceiveCond.Signal() + lock.Lock() // This is to prevent workers from starting work on any new txTask defer lock.Unlock() + // Drain results channel because read sets do not carry over + var drained bool + for !drained { + select { + case txTask := <-resultCh: + rs.AddWork(txTask) + default: + drained = true + } + } + // Drain results queue as well + for rws.Len() > 0 { + txTask := heap.Pop(&rws).(state.TxTask) + atomic.AddInt64(&resultsSize, -txTask.ResultsSize) + rs.AddWork(txTask) + } + applyTx.Rollback() for i := 0; i < workerCount; i++ { roTxs[i].Rollback() } @@ -341,6 +430,9 @@ func Recon1(genesis *core.Genesis, logger log.Logger) error { if err = rwTx.Commit(); err != nil { return err } + if applyTx, err = db.BeginRo(ctx); err != nil { + return err + } for i := 0; i < workerCount; i++ { if roTxs[i], err = db.BeginRo(ctx); err != nil { return err @@ -358,21 +450,32 @@ func Recon1(genesis *core.Genesis, logger log.Logger) error { } }() var inputTxNum uint64 + var header *types.Header for blockNum := uint64(0); blockNum <= block; blockNum++ { - header, err := blockReader.HeaderByNumber(ctx, nil, blockNum) - if err != nil { - panic(err) + atomic.StoreUint64(&inputBlockNum, blockNum) + rules := chainConfig.Rules(blockNum) + if header, err = blockReader.HeaderByNumber(ctx, nil, blockNum); err != nil { + return err } blockHash := header.Hash() b, _, err := blockReader.BlockWithSenders(ctx, nil, blockHash, blockNum) if err != nil { - panic(err) + return err } txs := b.Transactions() for txIndex := -1; txIndex <= len(txs); txIndex++ { + // Do not oversend, wait for the result heap to go under certain size + func() { + rwsLock.Lock() + defer rwsLock.Unlock() + for rws.Len() > 128 || atomic.LoadInt64(&resultsSize) >= resultsThreshold || rs.SizeEstimate() >= commitThreshold { + rwsReceiveCond.Wait() + } + }() txTask := state.TxTask{ Header: header, BlockNum: blockNum, + Rules: rules, Block: b, TxNum: inputTxNum, TxIndex: txIndex, @@ -384,13 +487,18 @@ func Recon1(genesis *core.Genesis, logger log.Logger) error { if sender, ok := txs[txIndex].GetSender(); ok { txTask.Sender = &sender } + if ok := rs.RegisterSender(txTask); ok { + rs.AddWork(txTask) + } + } else { + rs.AddWork(txTask) } - workCh <- txTask inputTxNum++ } } close(workCh) wg.Wait() + applyTx.Rollback() for i := 0; i < workerCount; i++ { roTxs[i].Rollback() } @@ -424,11 +532,15 @@ func Recon1(genesis *core.Genesis, logger log.Logger) error { if rwTx, err = db.BeginRw(ctx); err != nil { return err } - if _, err = stagedsync.RegenerateIntermediateHashes("recon", rwTx, stagedsync.StageTrieCfg(db, false /* checkRoot */, false /* saveHashesToDB */, false /* badBlockHalt */, tmpDir, blockReader, nil /* HeaderDownload */), common.Hash{}, make(chan struct{}, 1)); err != nil { + var rootHash common.Hash + if rootHash, err = stagedsync.RegenerateIntermediateHashes("recon", rwTx, stagedsync.StageTrieCfg(db, false /* checkRoot */, false /* saveHashesToDB */, false /* badBlockHalt */, tmpDir, blockReader, nil /* HeaderDownload */), common.Hash{}, make(chan struct{}, 1)); err != nil { return err } if err = rwTx.Commit(); err != nil { return err } + if rootHash != header.Root { + log.Error("Incorrect root hash", "expected", fmt.Sprintf("%x", header.Root)) + } return nil } diff --git a/core/state/history_reader_22.go b/core/state/history_reader_22.go index be89d3bf4..d0844f868 100644 --- a/core/state/history_reader_22.go +++ b/core/state/history_reader_22.go @@ -21,14 +21,14 @@ func bytesToUint64(buf []byte) (x uint64) { // Implements StateReader and StateWriter type HistoryReader22 struct { - a *libstate.Aggregator + ac *libstate.AggregatorContext ri *libstate.ReadIndices txNum uint64 trace bool } -func NewHistoryReader22(a *libstate.Aggregator, ri *libstate.ReadIndices) *HistoryReader22 { - return &HistoryReader22{a: a, ri: ri} +func NewHistoryReader22(ac *libstate.AggregatorContext, ri *libstate.ReadIndices) *HistoryReader22 { + return &HistoryReader22{ac: ac, ri: ri} } func (hr *HistoryReader22) SetTx(tx kv.RwTx) { @@ -37,7 +37,6 @@ func (hr *HistoryReader22) SetTx(tx kv.RwTx) { func (hr *HistoryReader22) SetTxNum(txNum uint64) { hr.txNum = txNum - hr.a.SetTxNum(txNum) if hr.ri != nil { hr.ri.SetTxNum(txNum) } @@ -57,7 +56,7 @@ func (hr *HistoryReader22) ReadAccountData(address common.Address) (*accounts.Ac return nil, err } } - enc, err := hr.a.ReadAccountDataBeforeTxNum(address.Bytes(), hr.txNum, nil /* roTx */) + enc, err := hr.ac.ReadAccountDataBeforeTxNum(address.Bytes(), hr.txNum, nil /* roTx */) if err != nil { return nil, err } @@ -108,7 +107,7 @@ func (hr *HistoryReader22) ReadAccountStorage(address common.Address, incarnatio return nil, err } } - enc, err := hr.a.ReadAccountStorageBeforeTxNum(address.Bytes(), key.Bytes(), hr.txNum, nil /* roTx */) + enc, err := hr.ac.ReadAccountStorageBeforeTxNum(address.Bytes(), key.Bytes(), hr.txNum, nil /* roTx */) if err != nil { return nil, err } @@ -131,7 +130,7 @@ func (hr *HistoryReader22) ReadAccountCode(address common.Address, incarnation u return nil, err } } - enc, err := hr.a.ReadAccountCodeBeforeTxNum(address.Bytes(), hr.txNum, nil /* roTx */) + enc, err := hr.ac.ReadAccountCodeBeforeTxNum(address.Bytes(), hr.txNum, nil /* roTx */) if err != nil { return nil, err } @@ -147,7 +146,7 @@ func (hr *HistoryReader22) ReadAccountCodeSize(address common.Address, incarnati return 0, err } } - size, err := hr.a.ReadAccountCodeSizeBeforeTxNum(address.Bytes(), hr.txNum, nil /* roTx */) + size, err := hr.ac.ReadAccountCodeSizeBeforeTxNum(address.Bytes(), hr.txNum, nil /* roTx */) if err != nil { return 0, err } diff --git a/core/state/intra_block_state.go b/core/state/intra_block_state.go index 4c8dc91db..ba4859c91 100644 --- a/core/state/intra_block_state.go +++ b/core/state/intra_block_state.go @@ -172,7 +172,7 @@ func (sdb *IntraBlockState) AddRefund(gas uint64) { func (sdb *IntraBlockState) SubRefund(gas uint64) { sdb.journal.append(refundChange{prev: sdb.refund}) if gas > sdb.refund { - panic("Refund counter below zero") + sdb.setErrorUnsafe(fmt.Errorf("Refund counter below zero")) } sdb.refund -= gas } diff --git a/core/state/recon_state_1.go b/core/state/recon_state_1.go index 3487a7e4a..7572a3a61 100644 --- a/core/state/recon_state_1.go +++ b/core/state/recon_state_1.go @@ -5,14 +5,19 @@ import ( "container/heap" "encoding/binary" "fmt" + "sort" "sync" + "unsafe" + "github.com/google/btree" "github.com/holiman/uint256" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/core/types/accounts" + "github.com/ledgerwatch/erigon/params" ) // ReadWriteSet contains ReadSet, WriteSet and BalanceIncrease of a transaction, @@ -21,6 +26,7 @@ import ( type TxTask struct { TxNum uint64 BlockNum uint64 + Rules *params.Rules Header *types.Header Block *types.Block BlockHash common.Hash @@ -29,10 +35,9 @@ type TxTask struct { Final bool Tx types.Transaction BalanceIncreaseSet map[common.Address]uint256.Int - ReadKeys map[string][][]byte - ReadVals map[string][][]byte - WriteKeys map[string][][]byte - WriteVals map[string][][]byte + ReadLists map[string]*KvList + WriteLists map[string]*KvList + ResultsSize int64 Error error } @@ -60,48 +65,50 @@ func (h *TxTaskQueue) Pop() interface{} { return c[len(c)-1] } +const CodeSizeTable = "CodeSize" + type ReconState1 struct { - lock sync.RWMutex - triggers map[uint64]TxTask - senderTxNums map[common.Address]uint64 - workCh chan TxTask - queue TxTaskQueue - changes map[string]map[string][]byte - sizeEstimate uint64 - rollbackCount uint64 - txsDone uint64 + lock sync.RWMutex + receiveWork *sync.Cond + triggers map[uint64]TxTask + senderTxNums map[common.Address]uint64 + triggerLock sync.RWMutex + queue TxTaskQueue + queueLock sync.Mutex + changes map[string]*btree.BTreeG[ReconStateItem1] + sizeEstimate uint64 + txsDone uint64 + finished bool } -func NewReconState1(workCh chan TxTask) *ReconState1 { +type ReconStateItem1 struct { + key []byte + val []byte +} + +func reconStateItem1Less(i, j ReconStateItem1) bool { + return bytes.Compare(i.key, j.key) < 0 +} + +func NewReconState1() *ReconState1 { rs := &ReconState1{ - workCh: workCh, triggers: map[uint64]TxTask{}, senderTxNums: map[common.Address]uint64{}, - changes: map[string]map[string][]byte{}, + changes: map[string]*btree.BTreeG[ReconStateItem1]{}, } + rs.receiveWork = sync.NewCond(&rs.queueLock) return rs } func (rs *ReconState1) put(table string, key, val []byte) { t, ok := rs.changes[table] if !ok { - t = map[string][]byte{} + t = btree.NewG[ReconStateItem1](32, reconStateItem1Less) rs.changes[table] = t } - t[string(key)] = val - rs.sizeEstimate += uint64(len(key)) + uint64(len(val)) -} - -func (rs *ReconState1) Delete(table string, key []byte) { - rs.lock.Lock() - defer rs.lock.Unlock() - t, ok := rs.changes[table] - if !ok { - t = map[string][]byte{} - rs.changes[table] = t - } - t[string(key)] = nil - rs.sizeEstimate += uint64(len(key)) + item := ReconStateItem1{key: libcommon.Copy(key), val: libcommon.Copy(val)} + t.ReplaceOrInsert(item) + rs.sizeEstimate += uint64(unsafe.Sizeof(item)) + uint64(len(key)) + uint64(len(val)) } func (rs *ReconState1) Get(table string, key []byte) []byte { @@ -115,40 +122,45 @@ func (rs *ReconState1) get(table string, key []byte) []byte { if !ok { return nil } - return t[string(key)] + if i, ok := t.Get(ReconStateItem1{key: key}); ok { + return i.val + } + return nil } func (rs *ReconState1) Flush(rwTx kv.RwTx) error { rs.lock.Lock() defer rs.lock.Unlock() for table, t := range rs.changes { - for ks, val := range t { - if len(val) > 0 { - if err := rwTx.Put(table, []byte(ks), val); err != nil { - return err + var err error + t.Ascend(func(item ReconStateItem1) bool { + if len(item.val) == 0 { + if err = rwTx.Delete(table, item.key, nil); err != nil { + return false } + //fmt.Printf("Flush [%x]=>\n", ks) + } else { + if err = rwTx.Put(table, item.key, item.val); err != nil { + return false + } + //fmt.Printf("Flush [%x]=>[%x]\n", ks, val) } + return true + }) + if err != nil { + return err } + t.Clear(true) } - rs.changes = map[string]map[string][]byte{} rs.sizeEstimate = 0 return nil } func (rs *ReconState1) Schedule() (TxTask, bool) { - rs.lock.Lock() - defer rs.lock.Unlock() - for rs.queue.Len() < 16 { - txTask, ok := <-rs.workCh - if !ok { - // No more work, channel is closed - break - } - if txTask.Sender == nil { - heap.Push(&rs.queue, txTask) - } else if rs.registerSender(txTask) { - heap.Push(&rs.queue, txTask) - } + rs.queueLock.Lock() + defer rs.queueLock.Unlock() + for !rs.finished && rs.queue.Len() == 0 { + rs.receiveWork.Wait() } if rs.queue.Len() > 0 { return heap.Pop(&rs.queue).(TxTask), true @@ -156,7 +168,9 @@ func (rs *ReconState1) Schedule() (TxTask, bool) { return TxTask{}, false } -func (rs *ReconState1) registerSender(txTask TxTask) bool { +func (rs *ReconState1) RegisterSender(txTask TxTask) bool { + rs.triggerLock.Lock() + defer rs.triggerLock.Unlock() lastTxNum, deferral := rs.senderTxNums[*txTask.Sender] if deferral { // Transactions with the same sender have obvious data dependency, no point running it before lastTxNum @@ -169,11 +183,16 @@ func (rs *ReconState1) registerSender(txTask TxTask) bool { return !deferral } -func (rs *ReconState1) CommitTxNum(sender *common.Address, txNum uint64) { - rs.lock.Lock() - defer rs.lock.Unlock() +func (rs *ReconState1) CommitTxNum(sender *common.Address, txNum uint64) uint64 { + rs.queueLock.Lock() + defer rs.queueLock.Unlock() + rs.triggerLock.Lock() + defer rs.triggerLock.Unlock() + count := uint64(0) if triggered, ok := rs.triggers[txNum]; ok { heap.Push(&rs.queue, triggered) + rs.receiveWork.Signal() + count++ delete(rs.triggers, txNum) } if sender != nil { @@ -183,37 +202,65 @@ func (rs *ReconState1) CommitTxNum(sender *common.Address, txNum uint64) { } } rs.txsDone++ + return count } -func (rs *ReconState1) RollbackTx(txTask TxTask) { - rs.lock.Lock() - defer rs.lock.Unlock() +func (rs *ReconState1) AddWork(txTask TxTask) { + txTask.BalanceIncreaseSet = nil + txTask.ReadLists = nil + txTask.WriteLists = nil + txTask.ResultsSize = 0 + rs.queueLock.Lock() + defer rs.queueLock.Unlock() heap.Push(&rs.queue, txTask) - rs.rollbackCount++ + rs.receiveWork.Signal() } -func (rs *ReconState1) Apply(writeKeys, writeVals map[string][][]byte, balanceIncreaseSet map[common.Address]uint256.Int) { +func (rs *ReconState1) Finish() { + rs.queueLock.Lock() + defer rs.queueLock.Unlock() + rs.finished = true + rs.receiveWork.Broadcast() +} + +func (rs *ReconState1) Apply(emptyRemoval bool, roTx kv.Tx, txTask TxTask) error { rs.lock.Lock() defer rs.lock.Unlock() - for table, keyList := range writeKeys { - valList := writeVals[table] - for i, key := range keyList { - val := valList[i] - rs.put(table, key, val) + if txTask.WriteLists != nil { + for table, list := range txTask.WriteLists { + for i, key := range list.Keys { + val := list.Vals[i] + rs.put(table, key, val) + } } } - for addr, increase := range balanceIncreaseSet { + for addr, increase := range txTask.BalanceIncreaseSet { + //if increase.IsZero() { + // continue + //} enc := rs.get(kv.PlainState, addr.Bytes()) + if enc == nil { + var err error + enc, err = roTx.GetOne(kv.PlainState, addr.Bytes()) + if err != nil { + return err + } + } var a accounts.Account if err := a.DecodeForStorage(enc); err != nil { - panic(err) + return err } a.Balance.Add(&a.Balance, &increase) - l := a.EncodingLengthForStorage() - enc = make([]byte, l) - a.EncodeForStorage(enc) + if emptyRemoval && a.Nonce == 0 && a.Balance.IsZero() && a.IsEmptyCodeHash() { + enc = []byte{} + } else { + l := a.EncodingLengthForStorage() + enc = make([]byte, l) + a.EncodeForStorage(enc) + } rs.put(kv.PlainState, addr.Bytes(), enc) } + return nil } func (rs *ReconState1) DoneCount() uint64 { @@ -222,49 +269,80 @@ func (rs *ReconState1) DoneCount() uint64 { return rs.txsDone } -func (rs *ReconState1) RollbackCount() uint64 { - rs.lock.RLock() - defer rs.lock.RUnlock() - return rs.rollbackCount -} - func (rs *ReconState1) SizeEstimate() uint64 { rs.lock.RLock() defer rs.lock.RUnlock() return rs.sizeEstimate } -func (rs *ReconState1) ReadsValid(readKeys, readVals map[string][][]byte) bool { +func (rs *ReconState1) ReadsValid(readLists map[string]*KvList) bool { rs.lock.RLock() defer rs.lock.RUnlock() - for table, keyList := range readKeys { - t, ok := rs.changes[table] + //fmt.Printf("ValidReads\n") + for table, list := range readLists { + //fmt.Printf("Table %s\n", table) + var t *btree.BTreeG[ReconStateItem1] + var ok bool + if table == CodeSizeTable { + t, ok = rs.changes[kv.Code] + } else { + t, ok = rs.changes[table] + } if !ok { continue } - valList := readVals[table] - for i, key := range keyList { - val := valList[i] - if rereadVal, ok := t[string(key)]; ok { - if !bytes.Equal(val, rereadVal) { + for i, key := range list.Keys { + val := list.Vals[i] + if item, ok := t.Get(ReconStateItem1{key: key}); ok { + //fmt.Printf("key [%x] => [%x] vs [%x]\n", key, val, rereadVal) + if table == CodeSizeTable { + if binary.BigEndian.Uint64(val) != uint64(len(item.val)) { + return false + } + } else if !bytes.Equal(val, item.val) { return false } + } else { + //fmt.Printf("key [%x] => [%x] not present in changes\n", key, val) } } } return true } +// KvList sort.Interface to sort write list by keys +type KvList struct { + Keys, Vals [][]byte +} + +func (l KvList) Len() int { + return len(l.Keys) +} + +func (l KvList) Less(i, j int) bool { + return bytes.Compare(l.Keys[i], l.Keys[j]) < 0 +} + +func (l *KvList) Swap(i, j int) { + l.Keys[i], l.Keys[j] = l.Keys[j], l.Keys[i] + l.Vals[i], l.Vals[j] = l.Vals[j], l.Vals[i] +} + type StateReconWriter1 struct { - rs *ReconState1 - txNum uint64 - writeKeys map[string][][]byte - writeVals map[string][][]byte + rs *ReconState1 + txNum uint64 + writeLists map[string]*KvList } func NewStateReconWriter1(rs *ReconState1) *StateReconWriter1 { return &StateReconWriter1{ rs: rs, + writeLists: map[string]*KvList{ + kv.PlainState: {}, + kv.Code: {}, + kv.PlainContractCode: {}, + kv.IncarnationMap: {}, + }, } } @@ -273,42 +351,49 @@ func (w *StateReconWriter1) SetTxNum(txNum uint64) { } func (w *StateReconWriter1) ResetWriteSet() { - w.writeKeys = map[string][][]byte{} - w.writeVals = map[string][][]byte{} + w.writeLists = map[string]*KvList{ + kv.PlainState: {}, + kv.Code: {}, + kv.PlainContractCode: {}, + kv.IncarnationMap: {}, + } } -func (w *StateReconWriter1) WriteSet() (map[string][][]byte, map[string][][]byte) { - return w.writeKeys, w.writeVals +func (w *StateReconWriter1) WriteSet() map[string]*KvList { + for _, list := range w.writeLists { + sort.Sort(list) + } + return w.writeLists } func (w *StateReconWriter1) UpdateAccountData(address common.Address, original, account *accounts.Account) error { value := make([]byte, account.EncodingLengthForStorage()) account.EncodeForStorage(value) //fmt.Printf("account [%x]=>{Balance: %d, Nonce: %d, Root: %x, CodeHash: %x} txNum: %d\n", address, &account.Balance, account.Nonce, account.Root, account.CodeHash, w.txNum) - w.writeKeys[kv.PlainState] = append(w.writeKeys[kv.PlainState], address.Bytes()) - w.writeVals[kv.PlainState] = append(w.writeVals[kv.PlainState], value) + w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, address.Bytes()) + w.writeLists[kv.PlainState].Vals = append(w.writeLists[kv.PlainState].Vals, value) return nil } func (w *StateReconWriter1) UpdateAccountCode(address common.Address, incarnation uint64, codeHash common.Hash, code []byte) error { - w.writeKeys[kv.Code] = append(w.writeKeys[kv.Code], codeHash.Bytes()) - w.writeVals[kv.Code] = append(w.writeVals[kv.Code], code) + w.writeLists[kv.Code].Keys = append(w.writeLists[kv.Code].Keys, codeHash.Bytes()) + w.writeLists[kv.Code].Vals = append(w.writeLists[kv.Code].Vals, code) if len(code) > 0 { //fmt.Printf("code [%x] => [%x] CodeHash: %x, txNum: %d\n", address, code, codeHash, w.txNum) - w.writeKeys[kv.PlainContractCode] = append(w.writeKeys[kv.PlainContractCode], dbutils.PlainGenerateStoragePrefix(address[:], incarnation)) - w.writeVals[kv.PlainContractCode] = append(w.writeVals[kv.PlainContractCode], codeHash.Bytes()) + w.writeLists[kv.PlainContractCode].Keys = append(w.writeLists[kv.PlainContractCode].Keys, dbutils.PlainGenerateStoragePrefix(address[:], incarnation)) + w.writeLists[kv.PlainContractCode].Vals = append(w.writeLists[kv.PlainContractCode].Vals, codeHash.Bytes()) } return nil } func (w *StateReconWriter1) DeleteAccount(address common.Address, original *accounts.Account) error { - w.writeKeys[kv.PlainState] = append(w.writeKeys[kv.PlainState], address.Bytes()) - w.writeVals[kv.PlainState] = append(w.writeVals[kv.PlainState], nil) + w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, address.Bytes()) + w.writeLists[kv.PlainState].Vals = append(w.writeLists[kv.PlainState].Vals, []byte{}) if original.Incarnation > 0 { var b [8]byte binary.BigEndian.PutUint64(b[:], original.Incarnation) - w.writeKeys[kv.IncarnationMap] = append(w.writeKeys[kv.IncarnationMap], address.Bytes()) - w.writeVals[kv.IncarnationMap] = append(w.writeVals[kv.IncarnationMap], b[:]) + w.writeLists[kv.IncarnationMap].Keys = append(w.writeLists[kv.IncarnationMap].Keys, address.Bytes()) + w.writeLists[kv.IncarnationMap].Vals = append(w.writeLists[kv.IncarnationMap].Vals, b[:]) } return nil } @@ -317,8 +402,8 @@ func (w *StateReconWriter1) WriteAccountStorage(address common.Address, incarnat if *original == *value { return nil } - w.writeKeys[kv.PlainState] = append(w.writeKeys[kv.PlainState], dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), incarnation, key.Bytes())) - w.writeVals[kv.PlainState] = append(w.writeVals[kv.PlainState], value.Bytes()) + w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), incarnation, key.Bytes())) + w.writeLists[kv.PlainState].Vals = append(w.writeLists[kv.PlainState].Vals, value.Bytes()) //fmt.Printf("storage [%x] [%x] => [%x], txNum: %d\n", address, *key, v, w.txNum) return nil } @@ -335,12 +420,19 @@ type StateReconReader1 struct { readError bool stateTxNum uint64 composite []byte - readKeys map[string][][]byte - readVals map[string][][]byte + readLists map[string]*KvList } func NewStateReconReader1(rs *ReconState1) *StateReconReader1 { - return &StateReconReader1{rs: rs} + return &StateReconReader1{ + rs: rs, + readLists: map[string]*KvList{ + kv.PlainState: {}, + kv.Code: {}, + CodeSizeTable: {}, + kv.IncarnationMap: {}, + }, + } } func (r *StateReconReader1) SetTxNum(txNum uint64) { @@ -352,12 +444,19 @@ func (r *StateReconReader1) SetTx(tx kv.Tx) { } func (r *StateReconReader1) ResetReadSet() { - r.readKeys = map[string][][]byte{} - r.readVals = map[string][][]byte{} + r.readLists = map[string]*KvList{ + kv.PlainState: {}, + kv.Code: {}, + CodeSizeTable: {}, + kv.IncarnationMap: {}, + } } -func (r *StateReconReader1) ReadSet() (map[string][][]byte, map[string][][]byte) { - return r.readKeys, r.readVals +func (r *StateReconReader1) ReadSet() map[string]*KvList { + for _, list := range r.readLists { + sort.Sort(list) + } + return r.readLists } func (r *StateReconReader1) SetTrace(trace bool) { @@ -373,8 +472,8 @@ func (r *StateReconReader1) ReadAccountData(address common.Address) (*accounts.A return nil, err } } - r.readKeys[kv.PlainState] = append(r.readKeys[kv.PlainState], address.Bytes()) - r.readVals[kv.PlainState] = append(r.readVals[kv.PlainState], enc) + r.readLists[kv.PlainState].Keys = append(r.readLists[kv.PlainState].Keys, address.Bytes()) + r.readLists[kv.PlainState].Vals = append(r.readLists[kv.PlainState].Vals, common.CopyBytes(enc)) if len(enc) == 0 { return nil, nil } @@ -406,8 +505,8 @@ func (r *StateReconReader1) ReadAccountStorage(address common.Address, incarnati return nil, err } } - r.readKeys[kv.PlainState] = append(r.readKeys[kv.PlainState], r.composite) - r.readVals[kv.PlainState] = append(r.readVals[kv.PlainState], enc) + r.readLists[kv.PlainState].Keys = append(r.readLists[kv.PlainState].Keys, common.CopyBytes(r.composite)) + r.readLists[kv.PlainState].Vals = append(r.readLists[kv.PlainState].Vals, common.CopyBytes(enc)) if r.trace { if enc == nil { fmt.Printf("ReadAccountStorage [%x] [%x] => [], txNum: %d\n", address, key.Bytes(), r.txNum) @@ -430,8 +529,8 @@ func (r *StateReconReader1) ReadAccountCode(address common.Address, incarnation return nil, err } } - r.readKeys[kv.Code] = append(r.readKeys[kv.Code], address.Bytes()) - r.readVals[kv.Code] = append(r.readVals[kv.Code], enc) + r.readLists[kv.Code].Keys = append(r.readLists[kv.Code].Keys, address.Bytes()) + r.readLists[kv.Code].Vals = append(r.readLists[kv.Code].Vals, common.CopyBytes(enc)) if r.trace { fmt.Printf("ReadAccountCode [%x] => [%x], txNum: %d\n", address, enc, r.txNum) } @@ -447,8 +546,10 @@ func (r *StateReconReader1) ReadAccountCodeSize(address common.Address, incarnat return 0, err } } - r.readKeys[kv.Code] = append(r.readKeys[kv.Code], address.Bytes()) - r.readVals[kv.Code] = append(r.readVals[kv.Code], enc) + var sizebuf [8]byte + binary.BigEndian.PutUint64(sizebuf[:], uint64(len(enc))) + r.readLists[CodeSizeTable].Keys = append(r.readLists[CodeSizeTable].Keys, address.Bytes()) + r.readLists[CodeSizeTable].Vals = append(r.readLists[CodeSizeTable].Vals, sizebuf[:]) size := len(enc) if r.trace { fmt.Printf("ReadAccountCodeSize [%x] => [%d], txNum: %d\n", address, size, r.txNum) @@ -465,8 +566,8 @@ func (r *StateReconReader1) ReadAccountIncarnation(address common.Address) (uint return 0, err } } - r.readKeys[kv.IncarnationMap] = append(r.readKeys[kv.IncarnationMap], address.Bytes()) - r.readVals[kv.IncarnationMap] = append(r.readVals[kv.IncarnationMap], enc) + r.readLists[kv.IncarnationMap].Keys = append(r.readLists[kv.IncarnationMap].Keys, address.Bytes()) + r.readLists[kv.IncarnationMap].Vals = append(r.readLists[kv.IncarnationMap].Vals, common.CopyBytes(enc)) if len(enc) == 0 { return 0, nil } diff --git a/go.mod b/go.mod index c733dbefb..f24623b10 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon go 1.18 require ( - github.com/ledgerwatch/erigon-lib v0.0.0-20220723031125-6f7794e88b5e + github.com/ledgerwatch/erigon-lib v0.0.0-20220723080652-596d10ea2e13 github.com/ledgerwatch/erigon-snapshot v1.0.0 github.com/ledgerwatch/log/v3 v3.4.1 github.com/ledgerwatch/secp256k1 v1.0.0 diff --git a/go.sum b/go.sum index 38d9a79d6..1ea373417 100644 --- a/go.sum +++ b/go.sum @@ -390,8 +390,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20220723031125-6f7794e88b5e h1:4tZnz9FCTIalm6VtGXBZX713Y+lcHqpMK6L3wP7OSHY= -github.com/ledgerwatch/erigon-lib v0.0.0-20220723031125-6f7794e88b5e/go.mod h1:mq8M03qcnaqXZ/yjNuWoyZQ5V8r5JbXw5JYmy4WNUZQ= +github.com/ledgerwatch/erigon-lib v0.0.0-20220723080652-596d10ea2e13 h1:GsmPUJO6xeifKSxxnG+BUwGEFggljkchaYm/HomvIQs= +github.com/ledgerwatch/erigon-lib v0.0.0-20220723080652-596d10ea2e13/go.mod h1:mq8M03qcnaqXZ/yjNuWoyZQ5V8r5JbXw5JYmy4WNUZQ= github.com/ledgerwatch/erigon-snapshot v1.0.0 h1:bp/7xoPdM5lK7LFdqEMH008RZmqxMZV0RUVEQiWs7v4= github.com/ledgerwatch/erigon-snapshot v1.0.0/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc=