Experiment in parallel execution (#4652)

* Restructure tx execution

* fixes

* Fixes and traces

* Tracing

* More tracing

* Drain the result channel

* Intermediate

* more efficient parallel exec

* Sorted buffer

* Fix results size

* fix for the recon

* Fix compilation

* Sort keys in Write and Read sets, fix compilation in rpcdaemon22

* Update to latest erigon-lib

* Update to erigon-lib

* Remove go.mod replace

* Update erigon-lib

* Update to erigon-lib main

* Fix lint

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
This commit is contained in:
ledgerwatch 2022-07-23 18:39:08 +01:00 committed by GitHub
parent 1cb6be02a5
commit 81d106bc9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 654 additions and 241 deletions

View File

@ -128,7 +128,9 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([
txNumbers := roaring64.New()
txNumbers.AddRange(fromTxNum, toTxNum) // [min,max)
topicsBitmap, err := getTopicsBitmap(api._agg, tx, crit.Topics, fromTxNum, toTxNum)
ac := api._agg.MakeContext()
topicsBitmap, err := getTopicsBitmap(ac, tx, crit.Topics, fromTxNum, toTxNum)
if err != nil {
return nil, err
}
@ -139,7 +141,7 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([
var addrBitmap *roaring64.Bitmap
for _, addr := range crit.Addresses {
var bitmapForORing roaring64.Bitmap
it := api._agg.LogAddrIterator(addr.Bytes(), fromTxNum, toTxNum, nil)
it := ac.LogAddrIterator(addr.Bytes(), fromTxNum, toTxNum, nil)
for it.HasNext() {
bitmapForORing.Add(it.Next())
}
@ -162,7 +164,7 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([
var lastHeader *types.Header
var lastSigner *types.Signer
var lastRules *params.Rules
stateReader := state.NewHistoryReader22(api._agg, nil /* ReadIndices */)
stateReader := state.NewHistoryReader22(ac, nil /* ReadIndices */)
iter := txNumbers.Iterator()
for iter.HasNext() {
txNum := iter.Next()
@ -233,12 +235,12 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([
// {{}, {B}} matches any topic in first position AND B in second position
// {{A}, {B}} matches topic A in first position AND B in second position
// {{A, B}, {C, D}} matches topic (A OR B) in first position AND (C OR D) in second position
func getTopicsBitmap(a *libstate.Aggregator, c kv.Tx, topics [][]common.Hash, from, to uint64) (*roaring64.Bitmap, error) {
func getTopicsBitmap(ac *libstate.AggregatorContext, c kv.Tx, topics [][]common.Hash, from, to uint64) (*roaring64.Bitmap, error) {
var result *roaring64.Bitmap
for _, sub := range topics {
var bitmapForORing roaring64.Bitmap
for _, topic := range sub {
it := a.LogTopicIterator(topic.Bytes(), from, to, nil)
it := ac.LogTopicIterator(topic.Bytes(), from, to, nil)
for it.HasNext() {
bitmapForORing.Add(it.Next())
}

View File

@ -253,10 +253,11 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
allTxs roaring64.Bitmap
txsTo roaring64.Bitmap
)
ac := api._agg.MakeContext()
for _, addr := range req.FromAddress {
if addr != nil {
it := api._agg.TraceFromIterator(addr.Bytes(), fromTxNum, toTxNum, nil)
it := ac.TraceFromIterator(addr.Bytes(), fromTxNum, toTxNum, nil)
for it.HasNext() {
allTxs.Add(it.Next())
}
@ -266,7 +267,7 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
for _, addr := range req.ToAddress {
if addr != nil {
it := api._agg.TraceToIterator(addr.Bytes(), fromTxNum, toTxNum, nil)
it := ac.TraceToIterator(addr.Bytes(), fromTxNum, toTxNum, nil)
for it.HasNext() {
txsTo.Add(it.Next())
}
@ -319,7 +320,7 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
var lastHeader *types.Header
var lastSigner *types.Signer
var lastRules *params.Rules
stateReader := state.NewHistoryReader22(api._agg, nil /* ReadIndices */)
stateReader := state.NewHistoryReader22(ac, nil /* ReadIndices */)
noop := state.NewNoopWriter()
for it.HasNext() {
txNum := uint64(it.Next())

View File

@ -171,7 +171,7 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log
}
return h
}
readWrapper := &ReaderWrapper22{r: agg, roTx: rwTx}
readWrapper := &ReaderWrapper22{ac: agg.MakeContext(), roTx: rwTx}
writeWrapper := &WriterWrapper22{w: agg}
for !interrupt {
@ -396,7 +396,7 @@ func processBlock22(startTxNum uint64, trace bool, txNumStart uint64, rw *Reader
// Implements StateReader and StateWriter
type ReaderWrapper22 struct {
roTx kv.Tx
r *libstate.Aggregator
ac *libstate.AggregatorContext
blockNum uint64
}
@ -406,7 +406,7 @@ type WriterWrapper22 struct {
}
func (rw *ReaderWrapper22) ReadAccountData(address common.Address) (*accounts.Account, error) {
enc, err := rw.r.ReadAccountData(address.Bytes(), rw.roTx)
enc, err := rw.ac.ReadAccountData(address.Bytes(), rw.roTx)
if err != nil {
return nil, err
}
@ -444,7 +444,7 @@ func (rw *ReaderWrapper22) ReadAccountData(address common.Address) (*accounts.Ac
}
func (rw *ReaderWrapper22) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
enc, err := rw.r.ReadAccountStorage(address.Bytes(), key.Bytes(), rw.roTx)
enc, err := rw.ac.ReadAccountStorage(address.Bytes(), key.Bytes(), rw.roTx)
if err != nil {
return nil, err
}
@ -458,11 +458,11 @@ func (rw *ReaderWrapper22) ReadAccountStorage(address common.Address, incarnatio
}
func (rw *ReaderWrapper22) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) {
return rw.r.ReadAccountCode(address.Bytes(), rw.roTx)
return rw.ac.ReadAccountCode(address.Bytes(), rw.roTx)
}
func (rw *ReaderWrapper22) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) {
return rw.r.ReadAccountCodeSize(address.Bytes(), rw.roTx)
return rw.ac.ReadAccountCodeSize(address.Bytes(), rw.roTx)
}
func (rw *ReaderWrapper22) ReadAccountIncarnation(address common.Address) (uint64, error) {

View File

@ -136,6 +136,7 @@ func History22(genesis *core.Genesis, logger log.Logger) error {
return fmt.Errorf("reopen snapshot segments: %w", err)
}
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
readWrapper := state.NewHistoryReader22(h.MakeContext(), ri)
for !interrupt {
select {
@ -169,7 +170,6 @@ func History22(genesis *core.Genesis, logger log.Logger) error {
txNum += uint64(len(b.Transactions())) + 2 // Pre and Post block transaction
continue
}
readWrapper := state.NewHistoryReader22(h, ri)
if traceBlock != 0 {
readWrapper.SetTrace(blockNum == uint64(traceBlock))
}

View File

@ -0,0 +1,195 @@
package commands
import (
"context"
"fmt"
"path"
"path/filepath"
"sort"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
)
var txhash string
var txnum uint64
func init() {
withDataDir(replayTxCmd)
rootCmd.AddCommand(replayTxCmd)
replayTxCmd.Flags().StringVar(&txhash, "txhash", "", "hash of the transaction to replay")
replayTxCmd.Flags().Uint64Var(&txnum, "txnum", 0, "tx num for replay")
}
var replayTxCmd = &cobra.Command{
Use: "replaytx",
Short: "Experimental command to replay a given transaction using only history",
RunE: func(cmd *cobra.Command, args []string) error {
return ReplayTx(genesis)
},
}
func ReplayTx(genesis *core.Genesis) error {
var blockReader services.FullBlockReader
var allSnapshots *snapshotsync.RoSnapshots
allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots"))
defer allSnapshots.Close()
if err := allSnapshots.Reopen(); err != nil {
return fmt.Errorf("reopen snapshot segments: %w", err)
}
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
// Compute mapping blockNum -> last TxNum in that block
txNums := make([]uint64, allSnapshots.BlocksAvailable()+1)
if err := allSnapshots.Bodies.View(func(bs []*snapshotsync.BodySegment) error {
for _, b := range bs {
if err := b.Iterate(func(blockNum, baseTxNum, txAmount uint64) {
txNums[blockNum] = baseTxNum + txAmount
}); err != nil {
return err
}
}
return nil
}); err != nil {
return fmt.Errorf("build txNum => blockNum mapping: %w", err)
}
ctx := context.Background()
var txNum uint64
if txhash != "" {
txnHash := common.HexToHash(txhash)
fmt.Printf("Tx hash = [%x]\n", txnHash)
db := memdb.New()
roTx, err := db.BeginRo(ctx)
if err != nil {
return err
}
defer roTx.Rollback()
bn, ok, err := blockReader.TxnLookup(ctx, roTx, txnHash)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("transaction not found")
}
fmt.Printf("Found in block %d\n", bn)
var header *types.Header
if header, err = blockReader.HeaderByNumber(ctx, nil, bn); err != nil {
return err
}
blockHash := header.Hash()
b, _, err := blockReader.BlockWithSenders(ctx, nil, blockHash, bn)
if err != nil {
return err
}
txs := b.Transactions()
var txIndex int
for txIndex = 0; txIndex < len(txs); txIndex++ {
if txs[txIndex].Hash() == txnHash {
fmt.Printf("txIndex = %d\n", txIndex)
break
}
}
txNum = txNums[bn-1] + 1 + uint64(txIndex)
} else {
txNum = txnum
}
fmt.Printf("txNum = %d\n", txNum)
aggPath := filepath.Join(datadir, "erigon23")
agg, err := libstate.NewAggregator(aggPath, AggregationStep)
if err != nil {
return fmt.Errorf("create history: %w", err)
}
defer agg.Close()
ac := agg.MakeContext()
workCh := make(chan state.TxTask)
rs := state.NewReconState(workCh)
if err = replayTxNum(ctx, allSnapshots, blockReader, txNum, txNums, rs, ac); err != nil {
return err
}
return nil
}
func replayTxNum(ctx context.Context, allSnapshots *snapshotsync.RoSnapshots, blockReader services.FullBlockReader,
txNum uint64, txNums []uint64, rs *state.ReconState, ac *libstate.AggregatorContext,
) error {
bn := uint64(sort.Search(len(txNums), func(i int) bool {
return txNums[i] > txNum
}))
txIndex := int(txNum - txNums[bn-1] - 1)
fmt.Printf("bn=%d, txIndex=%d\n", bn, txIndex)
var header *types.Header
var err error
if header, err = blockReader.HeaderByNumber(ctx, nil, bn); err != nil {
return err
}
blockHash := header.Hash()
b, _, err := blockReader.BlockWithSenders(ctx, nil, blockHash, bn)
if err != nil {
return err
}
txn := b.Transactions()[txIndex]
stateWriter := state.NewStateReconWriter(ac, rs)
stateReader := state.NewHistoryReaderNoState(ac, rs)
stateReader.SetTxNum(txNum)
stateWriter.SetTxNum(txNum)
noop := state.NewNoopWriter()
rules := chainConfig.Rules(bn)
for {
stateReader.ResetError()
ibs := state.New(stateReader)
gp := new(core.GasPool).AddGas(txn.GetGas())
//fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d, gas=%d, input=[%x]\n", txNum, blockNum, txIndex, txn.GetGas(), txn.GetData())
vmConfig := vm.Config{NoReceipts: true, SkipAnalysis: core.SkipAnalysis(chainConfig, bn)}
contractHasTEVM := func(contractHash common.Hash) (bool, error) { return false, nil }
getHeader := func(hash common.Hash, number uint64) *types.Header {
h, err := blockReader.Header(ctx, nil, hash, number)
if err != nil {
panic(err)
}
return h
}
getHashFn := core.GetHashFn(header, getHeader)
logger := log.New()
engine := initConsensusEngine(chainConfig, logger, allSnapshots)
txnHash := txn.Hash()
blockContext := core.NewEVMBlockContext(header, getHashFn, engine, nil /* author */, contractHasTEVM)
ibs.Prepare(txnHash, blockHash, txIndex)
msg, err := txn.AsMessage(*types.MakeSigner(chainConfig, bn), header.BaseFee, rules)
if err != nil {
return err
}
txContext := core.NewEVMTxContext(msg)
vmenv := vm.NewEVM(blockContext, txContext, ibs, chainConfig, vmConfig)
_, err = core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */)
if err != nil {
return fmt.Errorf("could not apply tx %d [%x] failed: %w", txIndex, txnHash, err)
}
if err = ibs.FinalizeTx(rules, noop); err != nil {
return err
}
if dependency, ok := stateReader.ReadError(); ok {
fmt.Printf("dependency %d on %d\n", txNum, dependency)
if err = replayTxNum(ctx, allSnapshots, blockReader, dependency, txNums, rs, ac); err != nil {
return err
}
} else {
if err = ibs.CommitBlock(rules, stateWriter); err != nil {
return err
}
break
}
}
rs.CommitTxNum(txNum)
fmt.Printf("commited %d\n", txNum)
return nil
}

View File

@ -131,9 +131,7 @@ func (rw *ReconWorker) runTxTask(txTask state.TxTask) {
} else if daoForkTx {
//fmt.Printf("txNum=%d, blockNum=%d, DAO fork\n", txNum, blockNum)
misc.ApplyDAOHardFork(ibs)
if err := ibs.FinalizeTx(rules, noop); err != nil {
panic(err)
}
ibs.SoftFinalise()
} else if txTask.Final {
if txTask.BlockNum > 0 {
//fmt.Printf("txNum=%d, blockNum=%d, finalisation of the block\n", txNum, blockNum)
@ -147,16 +145,25 @@ func (rw *ReconWorker) runTxTask(txTask state.TxTask) {
} else {
txHash := txTask.Tx.Hash()
gp := new(core.GasPool).AddGas(txTask.Tx.GetGas())
//fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d, gas=%d, input=[%x]\n", txNum, blockNum, txIndex, txn.GetGas(), txn.GetData())
usedGas := new(uint64)
vmConfig := vm.Config{NoReceipts: true, SkipAnalysis: core.SkipAnalysis(rw.chainConfig, txTask.BlockNum)}
contractHasTEVM := func(contractHash common.Hash) (bool, error) { return false, nil }
ibs.Prepare(txHash, txTask.BlockHash, txTask.TxIndex)
getHashFn := core.GetHashFn(txTask.Header, rw.getHeader)
_, _, err = core.ApplyTransaction(rw.chainConfig, getHashFn, rw.engine, nil, gp, ibs, noop, txTask.Header, txTask.Tx, usedGas, vmConfig, contractHasTEVM)
blockContext := core.NewEVMBlockContext(txTask.Header, getHashFn, rw.engine, nil /* author */, contractHasTEVM)
ibs.Prepare(txHash, txTask.BlockHash, txTask.TxIndex)
msg, err := txTask.Tx.AsMessage(*types.MakeSigner(rw.chainConfig, txTask.BlockNum), txTask.Header.BaseFee, rules)
if err != nil {
panic(err)
}
txContext := core.NewEVMTxContext(msg)
vmenv := vm.NewEVM(blockContext, txContext, ibs, rw.chainConfig, vmConfig)
//fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d, evm=%p\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex, vmenv)
_, err = core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */)
if err != nil {
panic(fmt.Errorf("could not apply tx %d [%x] failed: %w", txTask.TxIndex, txHash, err))
}
if err = ibs.FinalizeTx(rules, noop); err != nil {
panic(err)
}
}
if dependency, ok := rw.stateReader.ReadError(); ok {
//fmt.Printf("rollback %d\n", txNum)
@ -259,12 +266,8 @@ func (fw *FillWorker) fillStorage(plainStateCollector *etl.Collector) {
fw.currentKey = key
compositeKey := dbutils.PlainGenerateCompositeStorageKey(key[:20], state.FirstContractIncarnation, key[20:])
if len(val) > 0 {
if len(val) > 1 || val[0] != 0 {
if err := plainStateCollector.Collect(compositeKey, val); err != nil {
panic(err)
}
} else {
fmt.Printf("Storage [%x] => [%x]\n", compositeKey, val)
if err := plainStateCollector.Collect(compositeKey, val); err != nil {
panic(err)
}
//fmt.Printf("Storage [%x] => [%x]\n", compositeKey, val)
}
@ -283,19 +286,15 @@ func (fw *FillWorker) fillCode(codeCollector, plainContractCollector *etl.Collec
fw.currentKey = key
compositeKey := dbutils.PlainGenerateStoragePrefix(key, state.FirstContractIncarnation)
if len(val) > 0 {
if len(val) > 1 || val[0] != 0 {
codeHash, err := common.HashData(val)
if err != nil {
panic(err)
}
if err = codeCollector.Collect(codeHash[:], val); err != nil {
panic(err)
}
if err = plainContractCollector.Collect(compositeKey, codeHash[:]); err != nil {
panic(err)
}
} else {
fmt.Printf("Code [%x] => [%x]\n", compositeKey, val)
codeHash, err := common.HashData(val)
if err != nil {
panic(err)
}
if err = codeCollector.Collect(codeHash[:], val); err != nil {
panic(err)
}
if err = plainContractCollector.Collect(compositeKey, codeHash[:]); err != nil {
panic(err)
}
//fmt.Printf("Code [%x] => [%x]\n", compositeKey, val)
}
@ -600,9 +599,9 @@ func Recon(genesis *core.Genesis, logger log.Logger) error {
}
}()
var inputTxNum uint64
var header *types.Header
for bn := uint64(0); bn < blockNum; bn++ {
header, err := blockReader.HeaderByNumber(ctx, nil, bn)
if err != nil {
if header, err = blockReader.HeaderByNumber(ctx, nil, bn); err != nil {
panic(err)
}
blockHash := header.Hash()
@ -851,11 +850,15 @@ func Recon(genesis *core.Genesis, logger log.Logger) error {
if rwTx, err = db.BeginRw(ctx); err != nil {
return err
}
if _, err = stagedsync.RegenerateIntermediateHashes("recon", rwTx, stagedsync.StageTrieCfg(db, false /* checkRoot */, false /* saveHashesToDB */, false /* badBlockHalt */, tmpDir, blockReader, nil /* HeaderDownload */), common.Hash{}, make(chan struct{}, 1)); err != nil {
var rootHash common.Hash
if rootHash, err = stagedsync.RegenerateIntermediateHashes("recon", rwTx, stagedsync.StageTrieCfg(db, false /* checkRoot */, false /* saveHashesToDB */, false /* badBlockHalt */, tmpDir, blockReader, nil /* HeaderDownload */), common.Hash{}, make(chan struct{}, 1)); err != nil {
return err
}
if err = rwTx.Commit(); err != nil {
return err
}
if rootHash != header.Root {
log.Error("Incorrect root hash", "expected", fmt.Sprintf("%x", header.Root))
}
return nil
}

View File

@ -11,6 +11,7 @@ import (
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"syscall"
"time"
@ -31,6 +32,7 @@ import (
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"golang.org/x/sync/semaphore"
)
func init() {
@ -103,11 +105,12 @@ func (rw *ReconWorker1) run() {
}
rw.engine = initConsensusEngine(rw.chainConfig, rw.logger, rw.allSnapshots)
for txTask, ok := rw.rs.Schedule(); ok; txTask, ok = rw.rs.Schedule() {
rw.runTxTask(txTask)
rw.runTxTask(&txTask)
rw.resultCh <- txTask // Needs to have outside of the lock
}
}
func (rw *ReconWorker1) runTxTask(txTask state.TxTask) {
func (rw *ReconWorker1) runTxTask(txTask *state.TxTask) {
rw.lock.Lock()
defer rw.lock.Unlock()
txTask.Error = nil
@ -115,74 +118,103 @@ func (rw *ReconWorker1) runTxTask(txTask state.TxTask) {
rw.stateWriter.SetTxNum(txTask.TxNum)
rw.stateReader.ResetReadSet()
rw.stateWriter.ResetWriteSet()
rules := rw.chainConfig.Rules(txTask.BlockNum)
ibs := state.New(rw.stateReader)
daoForkTx := rw.chainConfig.DAOForkSupport && rw.chainConfig.DAOForkBlock != nil && rw.chainConfig.DAOForkBlock.Uint64() == txTask.BlockNum && txTask.TxIndex == -1
var err error
if txTask.BlockNum == 0 && txTask.TxIndex == -1 {
fmt.Printf("txNum=%d, blockNum=%d, Genesis\n", txTask.TxNum, txTask.BlockNum)
//fmt.Printf("txNum=%d, blockNum=%d, Genesis\n", txTask.TxNum, txTask.BlockNum)
// Genesis block
_, ibs, err = rw.genesis.ToBlock()
if err != nil {
panic(err)
}
} else if daoForkTx {
fmt.Printf("txNum=%d, blockNum=%d, DAO fork\n", txTask.TxNum, txTask.BlockNum)
//fmt.Printf("txNum=%d, blockNum=%d, DAO fork\n", txTask.TxNum, txTask.BlockNum)
misc.ApplyDAOHardFork(ibs)
ibs.SoftFinalise()
} else if txTask.TxIndex == -1 {
// Block initialisation
} else if txTask.Final {
if txTask.BlockNum > 0 {
fmt.Printf("txNum=%d, blockNum=%d, finalisation of the block\n", txTask.TxNum, txTask.BlockNum)
//fmt.Printf("txNum=%d, blockNum=%d, finalisation of the block\n", txTask.TxNum, txTask.BlockNum)
// End of block transaction in a block
if _, _, err := rw.engine.Finalize(rw.chainConfig, txTask.Header, ibs, txTask.Block.Transactions(), txTask.Block.Uncles(), nil /* receipts */, nil, nil, nil); err != nil {
panic(fmt.Errorf("finalize of block %d failed: %w", txTask.BlockNum, err))
}
}
} else {
fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
//fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
txHash := txTask.Tx.Hash()
gp := new(core.GasPool).AddGas(txTask.Tx.GetGas())
//fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d, gas=%d, input=[%x]\n", txNum, blockNum, txIndex, txn.GetGas(), txn.GetData())
usedGas := new(uint64)
vmConfig := vm.Config{NoReceipts: true, SkipAnalysis: core.SkipAnalysis(rw.chainConfig, txTask.BlockNum)}
contractHasTEVM := func(contractHash common.Hash) (bool, error) { return false, nil }
ibs.Prepare(txHash, txTask.BlockHash, txTask.TxIndex)
vmConfig.SkipAnalysis = core.SkipAnalysis(rw.chainConfig, txTask.BlockNum)
getHashFn := core.GetHashFn(txTask.Header, rw.getHeader)
blockContext := core.NewEVMBlockContext(txTask.Header, getHashFn, rw.engine, nil /* author */, contractHasTEVM)
vmenv := vm.NewEVM(blockContext, vm.TxContext{}, ibs, rw.chainConfig, vmConfig)
msg, err := txTask.Tx.AsMessage(*types.MakeSigner(rw.chainConfig, txTask.BlockNum), txTask.Header.BaseFee, rules)
msg, err := txTask.Tx.AsMessage(*types.MakeSigner(rw.chainConfig, txTask.BlockNum), txTask.Header.BaseFee, txTask.Rules)
if err != nil {
panic(err)
}
txContext := core.NewEVMTxContext(msg)
// Update the evm with the new transaction context.
vmenv.Reset(txContext, ibs)
result, err := core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */)
if err != nil {
vmenv := vm.NewEVM(blockContext, txContext, ibs, rw.chainConfig, vmConfig)
if _, err = core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */); err != nil {
txTask.Error = err
//fmt.Printf("error=%v\n", err)
}
// Update the state with pending changes
ibs.SoftFinalise()
*usedGas += result.UsedGas
}
// Prepare read set, write set and balanceIncrease set and send for serialisation
if txTask.Error == nil {
txTask.BalanceIncreaseSet = ibs.BalanceIncreaseSet()
for addr, bal := range txTask.BalanceIncreaseSet {
fmt.Printf("[%x]=>[%d]\n", addr, &bal)
}
if err = ibs.MakeWriteSet(rules, rw.stateWriter); err != nil {
//for addr, bal := range txTask.BalanceIncreaseSet {
// fmt.Printf("[%x]=>[%d]\n", addr, &bal)
//}
if err = ibs.MakeWriteSet(txTask.Rules, rw.stateWriter); err != nil {
panic(err)
}
txTask.ReadKeys, txTask.ReadVals = rw.stateReader.ReadSet()
txTask.WriteKeys, txTask.WriteVals = rw.stateWriter.WriteSet()
txTask.ReadLists = rw.stateReader.ReadSet()
txTask.WriteLists = rw.stateWriter.WriteSet()
size := (20 + 32) * len(txTask.BalanceIncreaseSet)
for _, list := range txTask.ReadLists {
for _, b := range list.Keys {
size += len(b)
}
for _, b := range list.Vals {
size += len(b)
}
}
for _, list := range txTask.WriteLists {
for _, b := range list.Keys {
size += len(b)
}
for _, b := range list.Vals {
size += len(b)
}
}
txTask.ResultsSize = int64(size)
}
}
func processResultQueue(rws *state.TxTaskQueue, outputTxNum *uint64, rs *state.ReconState1, applyTx kv.Tx,
triggerCount *uint64, outputBlockNum *uint64, repeatCount *uint64, resultsSize *int64) {
for rws.Len() > 0 && (*rws)[0].TxNum == *outputTxNum {
txTask := heap.Pop(rws).(state.TxTask)
atomic.AddInt64(resultsSize, -txTask.ResultsSize)
if txTask.Error == nil && rs.ReadsValid(txTask.ReadLists) {
if err := rs.Apply(txTask.Rules.IsSpuriousDragon, applyTx, txTask); err != nil {
panic(err)
}
*triggerCount += rs.CommitTxNum(txTask.Sender, txTask.TxNum)
*outputTxNum++
*outputBlockNum = txTask.BlockNum
//fmt.Printf("Applied %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
} else {
rs.AddWork(txTask)
*repeatCount++
//fmt.Printf("Rolled back %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
}
}
rw.resultCh <- txTask
}
func Recon1(genesis *core.Genesis, logger log.Logger) error {
@ -204,7 +236,8 @@ func Recon1(genesis *core.Genesis, logger log.Logger) error {
} else if err = os.RemoveAll(reconDbPath); err != nil {
return err
}
db, err := kv2.NewMDBX(logger).Path(reconDbPath).WriteMap().Open()
limiter := semaphore.NewWeighted(int64(runtime.NumCPU() + 1))
db, err := kv2.NewMDBX(logger).Path(reconDbPath).RoTxsLimiter(limiter).Open()
if err != nil {
return err
}
@ -236,9 +269,18 @@ func Recon1(genesis *core.Genesis, logger log.Logger) error {
fmt.Printf("Corresponding block num = %d, txNum = %d\n", blockNum, txNum)
workerCount := runtime.NumCPU()
workCh := make(chan state.TxTask, 128)
rs := state.NewReconState1(workCh)
rs := state.NewReconState1()
var lock sync.RWMutex
reconWorkers := make([]*ReconWorker1, workerCount)
var applyTx kv.Tx
defer func() {
if applyTx != nil {
applyTx.Rollback()
}
}()
if applyTx, err = db.BeginRo(ctx); err != nil {
return err
}
roTxs := make([]kv.Tx, workerCount)
defer func() {
for i := 0; i < workerCount; i++ {
@ -263,71 +305,118 @@ func Recon1(genesis *core.Genesis, logger log.Logger) error {
for i := 0; i < workerCount; i++ {
go reconWorkers[i].run()
}
commitThreshold := uint64(256 * 1024 * 1024)
commitThreshold := uint64(1024 * 1024 * 1024)
resultsThreshold := int64(1024 * 1024 * 1024)
count := uint64(0)
rollbackCount := uint64(0)
repeatCount := uint64(0)
triggerCount := uint64(0)
total := txNum
prevCount := uint64(0)
prevRollbackCount := uint64(0)
prevRepeatCount := uint64(0)
//prevTriggerCount := uint64(0)
resultsSize := int64(0)
prevTime := time.Now()
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
var rws state.TxTaskQueue
var rwsLock sync.Mutex
rwsReceiveCond := sync.NewCond(&rwsLock)
heap.Init(&rws)
var outputTxNum uint64
var inputBlockNum, outputBlockNum uint64
var prevOutputBlockNum uint64
// Go-routine gathering results from the workers
go func() {
defer rs.Finish()
for outputTxNum < txNum {
select {
case txTask := <-resultCh:
if txTask.TxNum == outputTxNum {
// Try to apply without placing on the queue first
if txTask.Error == nil && rs.ReadsValid(txTask.ReadKeys, txTask.ReadVals) {
rs.Apply(txTask.WriteKeys, txTask.WriteVals, txTask.BalanceIncreaseSet)
rs.CommitTxNum(txTask.Sender, txTask.TxNum)
outputTxNum++
} else {
rs.RollbackTx(txTask)
}
} else {
//fmt.Printf("Saved %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
func() {
rwsLock.Lock()
defer rwsLock.Unlock()
atomic.AddInt64(&resultsSize, txTask.ResultsSize)
heap.Push(&rws, txTask)
}
for rws.Len() > 0 && rws[0].TxNum == outputTxNum {
txTask = heap.Pop(&rws).(state.TxTask)
if txTask.Error == nil && rs.ReadsValid(txTask.ReadKeys, txTask.ReadVals) {
rs.Apply(txTask.WriteKeys, txTask.WriteVals, txTask.BalanceIncreaseSet)
rs.CommitTxNum(txTask.Sender, txTask.TxNum)
outputTxNum++
} else {
rs.RollbackTx(txTask)
}
}
processResultQueue(&rws, &outputTxNum, rs, applyTx, &triggerCount, &outputBlockNum, &repeatCount, &resultsSize)
rwsReceiveCond.Signal()
}()
case <-logEvery.C:
var m runtime.MemStats
libcommon.ReadMemStats(&m)
sizeEstimate := rs.SizeEstimate()
count = rs.DoneCount()
rollbackCount = rs.RollbackCount()
currentTime := time.Now()
interval := currentTime.Sub(prevTime)
speedTx := float64(count-prevCount) / (float64(interval) / float64(time.Second))
speedBlock := float64(outputBlockNum-prevOutputBlockNum) / (float64(interval) / float64(time.Second))
progress := 100.0 * float64(count) / float64(total)
var repeatRatio float64
if count > prevCount {
repeatRatio = 100.0 * float64(rollbackCount-prevRollbackCount) / float64(count-prevCount)
repeatRatio = 100.0 * float64(repeatCount-prevRepeatCount) / float64(count-prevCount)
}
prevTime = currentTime
prevCount = count
prevRollbackCount = rollbackCount
log.Info("Transaction replay", "workers", workerCount, "progress", fmt.Sprintf("%.2f%%", progress), "tx/s", fmt.Sprintf("%.1f", speedTx), "repeat ratio", fmt.Sprintf("%.2f%%", repeatRatio), "buffer", libcommon.ByteCount(sizeEstimate),
log.Info("Transaction replay",
//"workers", workerCount,
"at block", outputBlockNum,
"input block", atomic.LoadUint64(&inputBlockNum),
"progress", fmt.Sprintf("%.2f%%", progress),
"blk/s", fmt.Sprintf("%.1f", speedBlock),
"tx/s", fmt.Sprintf("%.1f", speedTx),
//"repeats", repeatCount-prevRepeatCount,
//"triggered", triggerCount-prevTriggerCount,
"result queue", rws.Len(),
"results size", libcommon.ByteCount(uint64(atomic.LoadInt64(&resultsSize))),
"repeat ratio", fmt.Sprintf("%.2f%%", repeatRatio),
"buffer", libcommon.ByteCount(sizeEstimate),
"alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys),
)
prevTime = currentTime
prevCount = count
prevOutputBlockNum = outputBlockNum
prevRepeatCount = repeatCount
//prevTriggerCount = triggerCount
if sizeEstimate >= commitThreshold {
commitStart := time.Now()
log.Info("Committing...")
err := func() error {
lock.Lock()
rwsLock.Lock()
defer rwsLock.Unlock()
// Drain results (and process) channel because read sets do not carry over
for {
var drained bool
for !drained {
select {
case txTask := <-resultCh:
atomic.AddInt64(&resultsSize, txTask.ResultsSize)
heap.Push(&rws, txTask)
default:
drained = true
}
}
processResultQueue(&rws, &outputTxNum, rs, applyTx, &triggerCount, &outputBlockNum, &repeatCount, &resultsSize)
if rws.Len() == 0 {
break
}
}
rwsReceiveCond.Signal()
lock.Lock() // This is to prevent workers from starting work on any new txTask
defer lock.Unlock()
// Drain results channel because read sets do not carry over
var drained bool
for !drained {
select {
case txTask := <-resultCh:
rs.AddWork(txTask)
default:
drained = true
}
}
// Drain results queue as well
for rws.Len() > 0 {
txTask := heap.Pop(&rws).(state.TxTask)
atomic.AddInt64(&resultsSize, -txTask.ResultsSize)
rs.AddWork(txTask)
}
applyTx.Rollback()
for i := 0; i < workerCount; i++ {
roTxs[i].Rollback()
}
@ -341,6 +430,9 @@ func Recon1(genesis *core.Genesis, logger log.Logger) error {
if err = rwTx.Commit(); err != nil {
return err
}
if applyTx, err = db.BeginRo(ctx); err != nil {
return err
}
for i := 0; i < workerCount; i++ {
if roTxs[i], err = db.BeginRo(ctx); err != nil {
return err
@ -358,21 +450,32 @@ func Recon1(genesis *core.Genesis, logger log.Logger) error {
}
}()
var inputTxNum uint64
var header *types.Header
for blockNum := uint64(0); blockNum <= block; blockNum++ {
header, err := blockReader.HeaderByNumber(ctx, nil, blockNum)
if err != nil {
panic(err)
atomic.StoreUint64(&inputBlockNum, blockNum)
rules := chainConfig.Rules(blockNum)
if header, err = blockReader.HeaderByNumber(ctx, nil, blockNum); err != nil {
return err
}
blockHash := header.Hash()
b, _, err := blockReader.BlockWithSenders(ctx, nil, blockHash, blockNum)
if err != nil {
panic(err)
return err
}
txs := b.Transactions()
for txIndex := -1; txIndex <= len(txs); txIndex++ {
// Do not oversend, wait for the result heap to go under certain size
func() {
rwsLock.Lock()
defer rwsLock.Unlock()
for rws.Len() > 128 || atomic.LoadInt64(&resultsSize) >= resultsThreshold || rs.SizeEstimate() >= commitThreshold {
rwsReceiveCond.Wait()
}
}()
txTask := state.TxTask{
Header: header,
BlockNum: blockNum,
Rules: rules,
Block: b,
TxNum: inputTxNum,
TxIndex: txIndex,
@ -384,13 +487,18 @@ func Recon1(genesis *core.Genesis, logger log.Logger) error {
if sender, ok := txs[txIndex].GetSender(); ok {
txTask.Sender = &sender
}
if ok := rs.RegisterSender(txTask); ok {
rs.AddWork(txTask)
}
} else {
rs.AddWork(txTask)
}
workCh <- txTask
inputTxNum++
}
}
close(workCh)
wg.Wait()
applyTx.Rollback()
for i := 0; i < workerCount; i++ {
roTxs[i].Rollback()
}
@ -424,11 +532,15 @@ func Recon1(genesis *core.Genesis, logger log.Logger) error {
if rwTx, err = db.BeginRw(ctx); err != nil {
return err
}
if _, err = stagedsync.RegenerateIntermediateHashes("recon", rwTx, stagedsync.StageTrieCfg(db, false /* checkRoot */, false /* saveHashesToDB */, false /* badBlockHalt */, tmpDir, blockReader, nil /* HeaderDownload */), common.Hash{}, make(chan struct{}, 1)); err != nil {
var rootHash common.Hash
if rootHash, err = stagedsync.RegenerateIntermediateHashes("recon", rwTx, stagedsync.StageTrieCfg(db, false /* checkRoot */, false /* saveHashesToDB */, false /* badBlockHalt */, tmpDir, blockReader, nil /* HeaderDownload */), common.Hash{}, make(chan struct{}, 1)); err != nil {
return err
}
if err = rwTx.Commit(); err != nil {
return err
}
if rootHash != header.Root {
log.Error("Incorrect root hash", "expected", fmt.Sprintf("%x", header.Root))
}
return nil
}

View File

@ -21,14 +21,14 @@ func bytesToUint64(buf []byte) (x uint64) {
// Implements StateReader and StateWriter
type HistoryReader22 struct {
a *libstate.Aggregator
ac *libstate.AggregatorContext
ri *libstate.ReadIndices
txNum uint64
trace bool
}
func NewHistoryReader22(a *libstate.Aggregator, ri *libstate.ReadIndices) *HistoryReader22 {
return &HistoryReader22{a: a, ri: ri}
func NewHistoryReader22(ac *libstate.AggregatorContext, ri *libstate.ReadIndices) *HistoryReader22 {
return &HistoryReader22{ac: ac, ri: ri}
}
func (hr *HistoryReader22) SetTx(tx kv.RwTx) {
@ -37,7 +37,6 @@ func (hr *HistoryReader22) SetTx(tx kv.RwTx) {
func (hr *HistoryReader22) SetTxNum(txNum uint64) {
hr.txNum = txNum
hr.a.SetTxNum(txNum)
if hr.ri != nil {
hr.ri.SetTxNum(txNum)
}
@ -57,7 +56,7 @@ func (hr *HistoryReader22) ReadAccountData(address common.Address) (*accounts.Ac
return nil, err
}
}
enc, err := hr.a.ReadAccountDataBeforeTxNum(address.Bytes(), hr.txNum, nil /* roTx */)
enc, err := hr.ac.ReadAccountDataBeforeTxNum(address.Bytes(), hr.txNum, nil /* roTx */)
if err != nil {
return nil, err
}
@ -108,7 +107,7 @@ func (hr *HistoryReader22) ReadAccountStorage(address common.Address, incarnatio
return nil, err
}
}
enc, err := hr.a.ReadAccountStorageBeforeTxNum(address.Bytes(), key.Bytes(), hr.txNum, nil /* roTx */)
enc, err := hr.ac.ReadAccountStorageBeforeTxNum(address.Bytes(), key.Bytes(), hr.txNum, nil /* roTx */)
if err != nil {
return nil, err
}
@ -131,7 +130,7 @@ func (hr *HistoryReader22) ReadAccountCode(address common.Address, incarnation u
return nil, err
}
}
enc, err := hr.a.ReadAccountCodeBeforeTxNum(address.Bytes(), hr.txNum, nil /* roTx */)
enc, err := hr.ac.ReadAccountCodeBeforeTxNum(address.Bytes(), hr.txNum, nil /* roTx */)
if err != nil {
return nil, err
}
@ -147,7 +146,7 @@ func (hr *HistoryReader22) ReadAccountCodeSize(address common.Address, incarnati
return 0, err
}
}
size, err := hr.a.ReadAccountCodeSizeBeforeTxNum(address.Bytes(), hr.txNum, nil /* roTx */)
size, err := hr.ac.ReadAccountCodeSizeBeforeTxNum(address.Bytes(), hr.txNum, nil /* roTx */)
if err != nil {
return 0, err
}

View File

@ -172,7 +172,7 @@ func (sdb *IntraBlockState) AddRefund(gas uint64) {
func (sdb *IntraBlockState) SubRefund(gas uint64) {
sdb.journal.append(refundChange{prev: sdb.refund})
if gas > sdb.refund {
panic("Refund counter below zero")
sdb.setErrorUnsafe(fmt.Errorf("Refund counter below zero"))
}
sdb.refund -= gas
}

View File

@ -5,14 +5,19 @@ import (
"container/heap"
"encoding/binary"
"fmt"
"sort"
"sync"
"unsafe"
"github.com/google/btree"
"github.com/holiman/uint256"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/types/accounts"
"github.com/ledgerwatch/erigon/params"
)
// ReadWriteSet contains ReadSet, WriteSet and BalanceIncrease of a transaction,
@ -21,6 +26,7 @@ import (
type TxTask struct {
TxNum uint64
BlockNum uint64
Rules *params.Rules
Header *types.Header
Block *types.Block
BlockHash common.Hash
@ -29,10 +35,9 @@ type TxTask struct {
Final bool
Tx types.Transaction
BalanceIncreaseSet map[common.Address]uint256.Int
ReadKeys map[string][][]byte
ReadVals map[string][][]byte
WriteKeys map[string][][]byte
WriteVals map[string][][]byte
ReadLists map[string]*KvList
WriteLists map[string]*KvList
ResultsSize int64
Error error
}
@ -60,48 +65,50 @@ func (h *TxTaskQueue) Pop() interface{} {
return c[len(c)-1]
}
const CodeSizeTable = "CodeSize"
type ReconState1 struct {
lock sync.RWMutex
triggers map[uint64]TxTask
senderTxNums map[common.Address]uint64
workCh chan TxTask
queue TxTaskQueue
changes map[string]map[string][]byte
sizeEstimate uint64
rollbackCount uint64
txsDone uint64
lock sync.RWMutex
receiveWork *sync.Cond
triggers map[uint64]TxTask
senderTxNums map[common.Address]uint64
triggerLock sync.RWMutex
queue TxTaskQueue
queueLock sync.Mutex
changes map[string]*btree.BTreeG[ReconStateItem1]
sizeEstimate uint64
txsDone uint64
finished bool
}
func NewReconState1(workCh chan TxTask) *ReconState1 {
type ReconStateItem1 struct {
key []byte
val []byte
}
func reconStateItem1Less(i, j ReconStateItem1) bool {
return bytes.Compare(i.key, j.key) < 0
}
func NewReconState1() *ReconState1 {
rs := &ReconState1{
workCh: workCh,
triggers: map[uint64]TxTask{},
senderTxNums: map[common.Address]uint64{},
changes: map[string]map[string][]byte{},
changes: map[string]*btree.BTreeG[ReconStateItem1]{},
}
rs.receiveWork = sync.NewCond(&rs.queueLock)
return rs
}
func (rs *ReconState1) put(table string, key, val []byte) {
t, ok := rs.changes[table]
if !ok {
t = map[string][]byte{}
t = btree.NewG[ReconStateItem1](32, reconStateItem1Less)
rs.changes[table] = t
}
t[string(key)] = val
rs.sizeEstimate += uint64(len(key)) + uint64(len(val))
}
func (rs *ReconState1) Delete(table string, key []byte) {
rs.lock.Lock()
defer rs.lock.Unlock()
t, ok := rs.changes[table]
if !ok {
t = map[string][]byte{}
rs.changes[table] = t
}
t[string(key)] = nil
rs.sizeEstimate += uint64(len(key))
item := ReconStateItem1{key: libcommon.Copy(key), val: libcommon.Copy(val)}
t.ReplaceOrInsert(item)
rs.sizeEstimate += uint64(unsafe.Sizeof(item)) + uint64(len(key)) + uint64(len(val))
}
func (rs *ReconState1) Get(table string, key []byte) []byte {
@ -115,40 +122,45 @@ func (rs *ReconState1) get(table string, key []byte) []byte {
if !ok {
return nil
}
return t[string(key)]
if i, ok := t.Get(ReconStateItem1{key: key}); ok {
return i.val
}
return nil
}
func (rs *ReconState1) Flush(rwTx kv.RwTx) error {
rs.lock.Lock()
defer rs.lock.Unlock()
for table, t := range rs.changes {
for ks, val := range t {
if len(val) > 0 {
if err := rwTx.Put(table, []byte(ks), val); err != nil {
return err
var err error
t.Ascend(func(item ReconStateItem1) bool {
if len(item.val) == 0 {
if err = rwTx.Delete(table, item.key, nil); err != nil {
return false
}
//fmt.Printf("Flush [%x]=>\n", ks)
} else {
if err = rwTx.Put(table, item.key, item.val); err != nil {
return false
}
//fmt.Printf("Flush [%x]=>[%x]\n", ks, val)
}
return true
})
if err != nil {
return err
}
t.Clear(true)
}
rs.changes = map[string]map[string][]byte{}
rs.sizeEstimate = 0
return nil
}
func (rs *ReconState1) Schedule() (TxTask, bool) {
rs.lock.Lock()
defer rs.lock.Unlock()
for rs.queue.Len() < 16 {
txTask, ok := <-rs.workCh
if !ok {
// No more work, channel is closed
break
}
if txTask.Sender == nil {
heap.Push(&rs.queue, txTask)
} else if rs.registerSender(txTask) {
heap.Push(&rs.queue, txTask)
}
rs.queueLock.Lock()
defer rs.queueLock.Unlock()
for !rs.finished && rs.queue.Len() == 0 {
rs.receiveWork.Wait()
}
if rs.queue.Len() > 0 {
return heap.Pop(&rs.queue).(TxTask), true
@ -156,7 +168,9 @@ func (rs *ReconState1) Schedule() (TxTask, bool) {
return TxTask{}, false
}
func (rs *ReconState1) registerSender(txTask TxTask) bool {
func (rs *ReconState1) RegisterSender(txTask TxTask) bool {
rs.triggerLock.Lock()
defer rs.triggerLock.Unlock()
lastTxNum, deferral := rs.senderTxNums[*txTask.Sender]
if deferral {
// Transactions with the same sender have obvious data dependency, no point running it before lastTxNum
@ -169,11 +183,16 @@ func (rs *ReconState1) registerSender(txTask TxTask) bool {
return !deferral
}
func (rs *ReconState1) CommitTxNum(sender *common.Address, txNum uint64) {
rs.lock.Lock()
defer rs.lock.Unlock()
func (rs *ReconState1) CommitTxNum(sender *common.Address, txNum uint64) uint64 {
rs.queueLock.Lock()
defer rs.queueLock.Unlock()
rs.triggerLock.Lock()
defer rs.triggerLock.Unlock()
count := uint64(0)
if triggered, ok := rs.triggers[txNum]; ok {
heap.Push(&rs.queue, triggered)
rs.receiveWork.Signal()
count++
delete(rs.triggers, txNum)
}
if sender != nil {
@ -183,37 +202,65 @@ func (rs *ReconState1) CommitTxNum(sender *common.Address, txNum uint64) {
}
}
rs.txsDone++
return count
}
func (rs *ReconState1) RollbackTx(txTask TxTask) {
rs.lock.Lock()
defer rs.lock.Unlock()
func (rs *ReconState1) AddWork(txTask TxTask) {
txTask.BalanceIncreaseSet = nil
txTask.ReadLists = nil
txTask.WriteLists = nil
txTask.ResultsSize = 0
rs.queueLock.Lock()
defer rs.queueLock.Unlock()
heap.Push(&rs.queue, txTask)
rs.rollbackCount++
rs.receiveWork.Signal()
}
func (rs *ReconState1) Apply(writeKeys, writeVals map[string][][]byte, balanceIncreaseSet map[common.Address]uint256.Int) {
func (rs *ReconState1) Finish() {
rs.queueLock.Lock()
defer rs.queueLock.Unlock()
rs.finished = true
rs.receiveWork.Broadcast()
}
func (rs *ReconState1) Apply(emptyRemoval bool, roTx kv.Tx, txTask TxTask) error {
rs.lock.Lock()
defer rs.lock.Unlock()
for table, keyList := range writeKeys {
valList := writeVals[table]
for i, key := range keyList {
val := valList[i]
rs.put(table, key, val)
if txTask.WriteLists != nil {
for table, list := range txTask.WriteLists {
for i, key := range list.Keys {
val := list.Vals[i]
rs.put(table, key, val)
}
}
}
for addr, increase := range balanceIncreaseSet {
for addr, increase := range txTask.BalanceIncreaseSet {
//if increase.IsZero() {
// continue
//}
enc := rs.get(kv.PlainState, addr.Bytes())
if enc == nil {
var err error
enc, err = roTx.GetOne(kv.PlainState, addr.Bytes())
if err != nil {
return err
}
}
var a accounts.Account
if err := a.DecodeForStorage(enc); err != nil {
panic(err)
return err
}
a.Balance.Add(&a.Balance, &increase)
l := a.EncodingLengthForStorage()
enc = make([]byte, l)
a.EncodeForStorage(enc)
if emptyRemoval && a.Nonce == 0 && a.Balance.IsZero() && a.IsEmptyCodeHash() {
enc = []byte{}
} else {
l := a.EncodingLengthForStorage()
enc = make([]byte, l)
a.EncodeForStorage(enc)
}
rs.put(kv.PlainState, addr.Bytes(), enc)
}
return nil
}
func (rs *ReconState1) DoneCount() uint64 {
@ -222,49 +269,80 @@ func (rs *ReconState1) DoneCount() uint64 {
return rs.txsDone
}
func (rs *ReconState1) RollbackCount() uint64 {
rs.lock.RLock()
defer rs.lock.RUnlock()
return rs.rollbackCount
}
func (rs *ReconState1) SizeEstimate() uint64 {
rs.lock.RLock()
defer rs.lock.RUnlock()
return rs.sizeEstimate
}
func (rs *ReconState1) ReadsValid(readKeys, readVals map[string][][]byte) bool {
func (rs *ReconState1) ReadsValid(readLists map[string]*KvList) bool {
rs.lock.RLock()
defer rs.lock.RUnlock()
for table, keyList := range readKeys {
t, ok := rs.changes[table]
//fmt.Printf("ValidReads\n")
for table, list := range readLists {
//fmt.Printf("Table %s\n", table)
var t *btree.BTreeG[ReconStateItem1]
var ok bool
if table == CodeSizeTable {
t, ok = rs.changes[kv.Code]
} else {
t, ok = rs.changes[table]
}
if !ok {
continue
}
valList := readVals[table]
for i, key := range keyList {
val := valList[i]
if rereadVal, ok := t[string(key)]; ok {
if !bytes.Equal(val, rereadVal) {
for i, key := range list.Keys {
val := list.Vals[i]
if item, ok := t.Get(ReconStateItem1{key: key}); ok {
//fmt.Printf("key [%x] => [%x] vs [%x]\n", key, val, rereadVal)
if table == CodeSizeTable {
if binary.BigEndian.Uint64(val) != uint64(len(item.val)) {
return false
}
} else if !bytes.Equal(val, item.val) {
return false
}
} else {
//fmt.Printf("key [%x] => [%x] not present in changes\n", key, val)
}
}
}
return true
}
// KvList sort.Interface to sort write list by keys
type KvList struct {
Keys, Vals [][]byte
}
func (l KvList) Len() int {
return len(l.Keys)
}
func (l KvList) Less(i, j int) bool {
return bytes.Compare(l.Keys[i], l.Keys[j]) < 0
}
func (l *KvList) Swap(i, j int) {
l.Keys[i], l.Keys[j] = l.Keys[j], l.Keys[i]
l.Vals[i], l.Vals[j] = l.Vals[j], l.Vals[i]
}
type StateReconWriter1 struct {
rs *ReconState1
txNum uint64
writeKeys map[string][][]byte
writeVals map[string][][]byte
rs *ReconState1
txNum uint64
writeLists map[string]*KvList
}
func NewStateReconWriter1(rs *ReconState1) *StateReconWriter1 {
return &StateReconWriter1{
rs: rs,
writeLists: map[string]*KvList{
kv.PlainState: {},
kv.Code: {},
kv.PlainContractCode: {},
kv.IncarnationMap: {},
},
}
}
@ -273,42 +351,49 @@ func (w *StateReconWriter1) SetTxNum(txNum uint64) {
}
func (w *StateReconWriter1) ResetWriteSet() {
w.writeKeys = map[string][][]byte{}
w.writeVals = map[string][][]byte{}
w.writeLists = map[string]*KvList{
kv.PlainState: {},
kv.Code: {},
kv.PlainContractCode: {},
kv.IncarnationMap: {},
}
}
func (w *StateReconWriter1) WriteSet() (map[string][][]byte, map[string][][]byte) {
return w.writeKeys, w.writeVals
func (w *StateReconWriter1) WriteSet() map[string]*KvList {
for _, list := range w.writeLists {
sort.Sort(list)
}
return w.writeLists
}
func (w *StateReconWriter1) UpdateAccountData(address common.Address, original, account *accounts.Account) error {
value := make([]byte, account.EncodingLengthForStorage())
account.EncodeForStorage(value)
//fmt.Printf("account [%x]=>{Balance: %d, Nonce: %d, Root: %x, CodeHash: %x} txNum: %d\n", address, &account.Balance, account.Nonce, account.Root, account.CodeHash, w.txNum)
w.writeKeys[kv.PlainState] = append(w.writeKeys[kv.PlainState], address.Bytes())
w.writeVals[kv.PlainState] = append(w.writeVals[kv.PlainState], value)
w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, address.Bytes())
w.writeLists[kv.PlainState].Vals = append(w.writeLists[kv.PlainState].Vals, value)
return nil
}
func (w *StateReconWriter1) UpdateAccountCode(address common.Address, incarnation uint64, codeHash common.Hash, code []byte) error {
w.writeKeys[kv.Code] = append(w.writeKeys[kv.Code], codeHash.Bytes())
w.writeVals[kv.Code] = append(w.writeVals[kv.Code], code)
w.writeLists[kv.Code].Keys = append(w.writeLists[kv.Code].Keys, codeHash.Bytes())
w.writeLists[kv.Code].Vals = append(w.writeLists[kv.Code].Vals, code)
if len(code) > 0 {
//fmt.Printf("code [%x] => [%x] CodeHash: %x, txNum: %d\n", address, code, codeHash, w.txNum)
w.writeKeys[kv.PlainContractCode] = append(w.writeKeys[kv.PlainContractCode], dbutils.PlainGenerateStoragePrefix(address[:], incarnation))
w.writeVals[kv.PlainContractCode] = append(w.writeVals[kv.PlainContractCode], codeHash.Bytes())
w.writeLists[kv.PlainContractCode].Keys = append(w.writeLists[kv.PlainContractCode].Keys, dbutils.PlainGenerateStoragePrefix(address[:], incarnation))
w.writeLists[kv.PlainContractCode].Vals = append(w.writeLists[kv.PlainContractCode].Vals, codeHash.Bytes())
}
return nil
}
func (w *StateReconWriter1) DeleteAccount(address common.Address, original *accounts.Account) error {
w.writeKeys[kv.PlainState] = append(w.writeKeys[kv.PlainState], address.Bytes())
w.writeVals[kv.PlainState] = append(w.writeVals[kv.PlainState], nil)
w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, address.Bytes())
w.writeLists[kv.PlainState].Vals = append(w.writeLists[kv.PlainState].Vals, []byte{})
if original.Incarnation > 0 {
var b [8]byte
binary.BigEndian.PutUint64(b[:], original.Incarnation)
w.writeKeys[kv.IncarnationMap] = append(w.writeKeys[kv.IncarnationMap], address.Bytes())
w.writeVals[kv.IncarnationMap] = append(w.writeVals[kv.IncarnationMap], b[:])
w.writeLists[kv.IncarnationMap].Keys = append(w.writeLists[kv.IncarnationMap].Keys, address.Bytes())
w.writeLists[kv.IncarnationMap].Vals = append(w.writeLists[kv.IncarnationMap].Vals, b[:])
}
return nil
}
@ -317,8 +402,8 @@ func (w *StateReconWriter1) WriteAccountStorage(address common.Address, incarnat
if *original == *value {
return nil
}
w.writeKeys[kv.PlainState] = append(w.writeKeys[kv.PlainState], dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), incarnation, key.Bytes()))
w.writeVals[kv.PlainState] = append(w.writeVals[kv.PlainState], value.Bytes())
w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), incarnation, key.Bytes()))
w.writeLists[kv.PlainState].Vals = append(w.writeLists[kv.PlainState].Vals, value.Bytes())
//fmt.Printf("storage [%x] [%x] => [%x], txNum: %d\n", address, *key, v, w.txNum)
return nil
}
@ -335,12 +420,19 @@ type StateReconReader1 struct {
readError bool
stateTxNum uint64
composite []byte
readKeys map[string][][]byte
readVals map[string][][]byte
readLists map[string]*KvList
}
func NewStateReconReader1(rs *ReconState1) *StateReconReader1 {
return &StateReconReader1{rs: rs}
return &StateReconReader1{
rs: rs,
readLists: map[string]*KvList{
kv.PlainState: {},
kv.Code: {},
CodeSizeTable: {},
kv.IncarnationMap: {},
},
}
}
func (r *StateReconReader1) SetTxNum(txNum uint64) {
@ -352,12 +444,19 @@ func (r *StateReconReader1) SetTx(tx kv.Tx) {
}
func (r *StateReconReader1) ResetReadSet() {
r.readKeys = map[string][][]byte{}
r.readVals = map[string][][]byte{}
r.readLists = map[string]*KvList{
kv.PlainState: {},
kv.Code: {},
CodeSizeTable: {},
kv.IncarnationMap: {},
}
}
func (r *StateReconReader1) ReadSet() (map[string][][]byte, map[string][][]byte) {
return r.readKeys, r.readVals
func (r *StateReconReader1) ReadSet() map[string]*KvList {
for _, list := range r.readLists {
sort.Sort(list)
}
return r.readLists
}
func (r *StateReconReader1) SetTrace(trace bool) {
@ -373,8 +472,8 @@ func (r *StateReconReader1) ReadAccountData(address common.Address) (*accounts.A
return nil, err
}
}
r.readKeys[kv.PlainState] = append(r.readKeys[kv.PlainState], address.Bytes())
r.readVals[kv.PlainState] = append(r.readVals[kv.PlainState], enc)
r.readLists[kv.PlainState].Keys = append(r.readLists[kv.PlainState].Keys, address.Bytes())
r.readLists[kv.PlainState].Vals = append(r.readLists[kv.PlainState].Vals, common.CopyBytes(enc))
if len(enc) == 0 {
return nil, nil
}
@ -406,8 +505,8 @@ func (r *StateReconReader1) ReadAccountStorage(address common.Address, incarnati
return nil, err
}
}
r.readKeys[kv.PlainState] = append(r.readKeys[kv.PlainState], r.composite)
r.readVals[kv.PlainState] = append(r.readVals[kv.PlainState], enc)
r.readLists[kv.PlainState].Keys = append(r.readLists[kv.PlainState].Keys, common.CopyBytes(r.composite))
r.readLists[kv.PlainState].Vals = append(r.readLists[kv.PlainState].Vals, common.CopyBytes(enc))
if r.trace {
if enc == nil {
fmt.Printf("ReadAccountStorage [%x] [%x] => [], txNum: %d\n", address, key.Bytes(), r.txNum)
@ -430,8 +529,8 @@ func (r *StateReconReader1) ReadAccountCode(address common.Address, incarnation
return nil, err
}
}
r.readKeys[kv.Code] = append(r.readKeys[kv.Code], address.Bytes())
r.readVals[kv.Code] = append(r.readVals[kv.Code], enc)
r.readLists[kv.Code].Keys = append(r.readLists[kv.Code].Keys, address.Bytes())
r.readLists[kv.Code].Vals = append(r.readLists[kv.Code].Vals, common.CopyBytes(enc))
if r.trace {
fmt.Printf("ReadAccountCode [%x] => [%x], txNum: %d\n", address, enc, r.txNum)
}
@ -447,8 +546,10 @@ func (r *StateReconReader1) ReadAccountCodeSize(address common.Address, incarnat
return 0, err
}
}
r.readKeys[kv.Code] = append(r.readKeys[kv.Code], address.Bytes())
r.readVals[kv.Code] = append(r.readVals[kv.Code], enc)
var sizebuf [8]byte
binary.BigEndian.PutUint64(sizebuf[:], uint64(len(enc)))
r.readLists[CodeSizeTable].Keys = append(r.readLists[CodeSizeTable].Keys, address.Bytes())
r.readLists[CodeSizeTable].Vals = append(r.readLists[CodeSizeTable].Vals, sizebuf[:])
size := len(enc)
if r.trace {
fmt.Printf("ReadAccountCodeSize [%x] => [%d], txNum: %d\n", address, size, r.txNum)
@ -465,8 +566,8 @@ func (r *StateReconReader1) ReadAccountIncarnation(address common.Address) (uint
return 0, err
}
}
r.readKeys[kv.IncarnationMap] = append(r.readKeys[kv.IncarnationMap], address.Bytes())
r.readVals[kv.IncarnationMap] = append(r.readVals[kv.IncarnationMap], enc)
r.readLists[kv.IncarnationMap].Keys = append(r.readLists[kv.IncarnationMap].Keys, address.Bytes())
r.readLists[kv.IncarnationMap].Vals = append(r.readLists[kv.IncarnationMap].Vals, common.CopyBytes(enc))
if len(enc) == 0 {
return 0, nil
}

2
go.mod
View File

@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
go 1.18
require (
github.com/ledgerwatch/erigon-lib v0.0.0-20220723031125-6f7794e88b5e
github.com/ledgerwatch/erigon-lib v0.0.0-20220723080652-596d10ea2e13
github.com/ledgerwatch/erigon-snapshot v1.0.0
github.com/ledgerwatch/log/v3 v3.4.1
github.com/ledgerwatch/secp256k1 v1.0.0

4
go.sum
View File

@ -390,8 +390,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20220723031125-6f7794e88b5e h1:4tZnz9FCTIalm6VtGXBZX713Y+lcHqpMK6L3wP7OSHY=
github.com/ledgerwatch/erigon-lib v0.0.0-20220723031125-6f7794e88b5e/go.mod h1:mq8M03qcnaqXZ/yjNuWoyZQ5V8r5JbXw5JYmy4WNUZQ=
github.com/ledgerwatch/erigon-lib v0.0.0-20220723080652-596d10ea2e13 h1:GsmPUJO6xeifKSxxnG+BUwGEFggljkchaYm/HomvIQs=
github.com/ledgerwatch/erigon-lib v0.0.0-20220723080652-596d10ea2e13/go.mod h1:mq8M03qcnaqXZ/yjNuWoyZQ5V8r5JbXw5JYmy4WNUZQ=
github.com/ledgerwatch/erigon-snapshot v1.0.0 h1:bp/7xoPdM5lK7LFdqEMH008RZmqxMZV0RUVEQiWs7v4=
github.com/ledgerwatch/erigon-snapshot v1.0.0/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc=