From b05ffc909d979e41e1da6f2b949bc4dd562b1bd2 Mon Sep 17 00:00:00 2001 From: Mark Holt <135143369+mh0lt@users.noreply.github.com> Date: Wed, 10 Jan 2024 17:12:15 +0000 Subject: [PATCH] Fixes for Bor Block Production Synchronization (#9162) This PR contains 3 fixes for interaction between the Bor mining loop and the TX pool which where causing the regular creation of blocks with zero transactions. * Mining/Tx pool block synchronization The synchronization of the tx pool between the sync loop and the mining loop has been changed so that both are triggered by the same event and synchronized via a sync.Cond rather than a polling loop with a hard coded loop limit. This means that mining now waits for the pool to be updated from the previous block before it starts the mining process. * Txpool Startup consolidated into its MainLoop Previously the tx pool start process was dynamically triggered at various points in the code. This has all now been moved to the start of the main loop. This is necessary to avoid a timing hole which can leave the mining loop hanging waiting for a previously block broadcast which it missed due to its delay start. * Mining listens for block broadcast to avoid duplicate mining operations The mining loop for bor has a recommit timer in case blocks re not produced on time. However in the case of sprint transitions where the seal publication is delayed this can lead to duplicate block production. This is suppressed by introducing a `waiting` state which is exited upon the block being broadcast from the sealing operation. --- cmd/devnet/args/node_args.go | 1 + cmd/devnet/devnet/network.go | 12 +- cmd/devnet/devnet/node.go | 10 +- cmd/devnet/main.go | 69 +++++- cmd/devnet/networks/devnet_bor.go | 67 +++-- cmd/devnet/networks/devnet_dev.go | 16 +- cmd/devnet/requests/request_generator.go | 32 ++- cmd/devnet/tests/context.go | 14 +- cmd/devnet/transactions/tx.go | 44 +++- cmd/rpcdaemon/cli/config.go | 2 +- cmd/txpool/main.go | 6 - consensus/chain_reader.go | 2 +- erigon-lib/downloader/path.go | 2 +- erigon-lib/downloader/path_windows.go | 89 ------- erigon-lib/mmap/mmap_windows.go | 2 +- erigon-lib/txpool/pool.go | 232 ++++++++++++------ erigon-lib/txpool/pool_fuzz_test.go | 5 + erigon-lib/txpool/pool_test.go | 5 + eth/backend.go | 108 ++++++-- eth/stagedsync/chain_reader.go | 5 +- eth/stagedsync/stage_bor_heimdall.go | 4 + eth/stagedsync/stage_headers.go | 2 +- eth/stagedsync/stage_mining_bor_heimdall.go | 2 +- eth/stagedsync/stage_mining_exec.go | 43 ++-- eth/stagedsync/stage_mining_finish.go | 6 +- polygon/bor/bor.go | 2 + turbo/cli/flags.go | 2 +- .../snapshotsync/freezeblocks/block_reader.go | 2 +- turbo/stages/stageloop.go | 1 + 29 files changed, 495 insertions(+), 292 deletions(-) diff --git a/cmd/devnet/args/node_args.go b/cmd/devnet/args/node_args.go index 44930fee7..1fba7caaf 100644 --- a/cmd/devnet/args/node_args.go +++ b/cmd/devnet/args/node_args.go @@ -135,6 +135,7 @@ type BlockProducer struct { NodeArgs Mine bool `arg:"--mine" flag:"true"` Etherbase string `arg:"--miner.etherbase"` + GasLimit int `arg:"--miner.gaslimit"` DevPeriod int `arg:"--dev.period"` BorPeriod int `arg:"--bor.period"` BorMinBlockSize int `arg:"--bor.minblocksize"` diff --git a/cmd/devnet/devnet/network.go b/cmd/devnet/devnet/network.go index 29eee727c..372aca838 100644 --- a/cmd/devnet/devnet/network.go +++ b/cmd/devnet/devnet/network.go @@ -34,7 +34,7 @@ type Network struct { Snapshots bool Nodes []Node Services []Service - Alloc types.GenesisAlloc + Genesis *types.Genesis BorStateSyncDelay time.Duration BorPeriod time.Duration BorMinBlockSize int @@ -140,12 +140,16 @@ func (nw *Network) createNode(nodeArgs Node) (Node, error) { } if n.IsBlockProducer() { - if nw.Alloc == nil { - nw.Alloc = types.GenesisAlloc{ + if nw.Genesis == nil { + nw.Genesis = &types.Genesis{} + } + + if nw.Genesis.Alloc == nil { + nw.Genesis.Alloc = types.GenesisAlloc{ n.Account().Address: types.GenesisAccount{Balance: blockProducerFunds}, } } else { - nw.Alloc[n.Account().Address] = types.GenesisAccount{Balance: blockProducerFunds} + nw.Genesis.Alloc[n.Account().Address] = types.GenesisAccount{Balance: blockProducerFunds} } } diff --git a/cmd/devnet/devnet/node.go b/cmd/devnet/devnet/node.go index 30f466336..33f716aa3 100644 --- a/cmd/devnet/devnet/node.go +++ b/cmd/devnet/devnet/node.go @@ -167,8 +167,14 @@ func (n *devnetNode) run(ctx *cli.Context) error { n.nodeCfg.MdbxGrowthStep = 32 * datasize.MB n.nodeCfg.MdbxDBSizeLimit = 512 * datasize.MB - for addr, account := range n.network.Alloc { - n.ethCfg.Genesis.Alloc[addr] = account + if n.network.Genesis != nil { + for addr, account := range n.network.Genesis.Alloc { + n.ethCfg.Genesis.Alloc[addr] = account + } + + if n.network.Genesis.GasLimit != 0 { + n.ethCfg.Genesis.GasLimit = n.network.Genesis.GasLimit + } } if n.network.BorStateSyncDelay > 0 { diff --git a/cmd/devnet/main.go b/cmd/devnet/main.go index 655d0d88a..92a41aca1 100644 --- a/cmd/devnet/main.go +++ b/cmd/devnet/main.go @@ -6,6 +6,7 @@ import ( "os/signal" "path/filepath" dbg "runtime/debug" + "strconv" "strings" "syscall" "time" @@ -131,6 +132,12 @@ var ( Value: 1, } + GasLimitFlag = cli.Uint64Flag{ + Name: "gaslimit", + Usage: "Target gas limit for mined blocks", + Value: 0, + } + WaitFlag = cli.BoolFlag{ Name: "wait", Usage: "Wait until interrupted after all scenarios have run", @@ -173,6 +180,7 @@ func main() { &logging.LogVerbosityFlag, &logging.LogConsoleVerbosityFlag, &logging.LogDirVerbosityFlag, + &GasLimitFlag, } if err := app.Run(os.Args); err != nil { @@ -342,21 +350,74 @@ func initDevnet(ctx *cli.Context, logger log.Logger) (devnet.Devnet, error) { baseRpcHost := ctx.String(BaseRpcHostFlag.Name) baseRpcPort := ctx.Int(BaseRpcPortFlag.Name) producerCount := int(ctx.Uint(BlockProducersFlag.Name)) + gasLimit := ctx.Uint64(GasLimitFlag.Name) + + var dirLogLevel log.Lvl = log.LvlTrace + var consoleLogLevel log.Lvl = log.LvlCrit + + if ctx.IsSet(logging.LogVerbosityFlag.Name) { + lvlVal := ctx.String(logging.LogVerbosityFlag.Name) + + i, err := strconv.Atoi(lvlVal) + + lvl := log.Lvl(i) + + if err != nil { + lvl, err = log.LvlFromString(lvlVal) + } + + if err == nil { + consoleLogLevel = lvl + dirLogLevel = lvl + } + } else { + if ctx.IsSet(logging.LogConsoleVerbosityFlag.Name) { + lvlVal := ctx.String(logging.LogConsoleVerbosityFlag.Name) + + i, err := strconv.Atoi(lvlVal) + + lvl := log.Lvl(i) + + if err != nil { + lvl, err = log.LvlFromString(lvlVal) + } + + if err == nil { + consoleLogLevel = lvl + } + } + + if ctx.IsSet(logging.LogDirVerbosityFlag.Name) { + lvlVal := ctx.String(logging.LogDirVerbosityFlag.Name) + + i, err := strconv.Atoi(lvlVal) + + lvl := log.Lvl(i) + + if err != nil { + lvl, err = log.LvlFromString(lvlVal) + } + + if err == nil { + dirLogLevel = lvl + } + } + } switch chainName { case networkname.BorDevnetChainName: if ctx.Bool(WithoutHeimdallFlag.Name) { - return networks.NewBorDevnetWithoutHeimdall(dataDir, baseRpcHost, baseRpcPort, logger), nil + return networks.NewBorDevnetWithoutHeimdall(dataDir, baseRpcHost, baseRpcPort, gasLimit, logger, consoleLogLevel, dirLogLevel), nil } else if ctx.Bool(LocalHeimdallFlag.Name) { heimdallGrpcAddr := ctx.String(HeimdallGrpcAddressFlag.Name) sprintSize := uint64(ctx.Int(BorSprintSizeFlag.Name)) - return networks.NewBorDevnetWithLocalHeimdall(dataDir, baseRpcHost, baseRpcPort, heimdallGrpcAddr, sprintSize, producerCount, logger), nil + return networks.NewBorDevnetWithLocalHeimdall(dataDir, baseRpcHost, baseRpcPort, heimdallGrpcAddr, sprintSize, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil } else { - return networks.NewBorDevnetWithRemoteHeimdall(dataDir, baseRpcHost, baseRpcPort, producerCount, logger), nil + return networks.NewBorDevnetWithRemoteHeimdall(dataDir, baseRpcHost, baseRpcPort, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil } case networkname.DevChainName: - return networks.NewDevDevnet(dataDir, baseRpcHost, baseRpcPort, producerCount, logger), nil + return networks.NewDevDevnet(dataDir, baseRpcHost, baseRpcPort, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil default: return nil, fmt.Errorf("unknown network: '%s'", chainName) diff --git a/cmd/devnet/networks/devnet_bor.go b/cmd/devnet/networks/devnet_bor.go index 9f8cbd2fa..41d4993ea 100644 --- a/cmd/devnet/networks/devnet_bor.go +++ b/cmd/devnet/networks/devnet_bor.go @@ -1,6 +1,7 @@ package networks import ( + "strconv" "time" "github.com/ledgerwatch/log/v3" @@ -21,7 +22,10 @@ func NewBorDevnetWithoutHeimdall( dataDir string, baseRpcHost string, baseRpcPort int, + gasLimit uint64, logger log.Logger, + consoleLogLevel log.Lvl, + dirLogLevel log.Lvl, ) devnet.Devnet { faucetSource := accounts.NewAccount("faucet-source") @@ -34,8 +38,11 @@ func NewBorDevnetWithoutHeimdall( BaseRPCHost: baseRpcHost, BaseRPCPort: baseRpcPort, //Snapshots: true, - Alloc: types.GenesisAlloc{ - faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, + Genesis: &types.Genesis{ + Alloc: types.GenesisAlloc{ + faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, + }, + GasLimit: gasLimit, }, Services: []devnet.Service{ account_services.NewFaucet(networkname.BorDevnetChainName, faucetSource), @@ -43,16 +50,16 @@ func NewBorDevnetWithoutHeimdall( Nodes: []devnet.Node{ &args.BlockProducer{ NodeArgs: args.NodeArgs{ - ConsoleVerbosity: "0", - DirVerbosity: "5", + ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)), + DirVerbosity: strconv.Itoa(int(dirLogLevel)), WithoutHeimdall: true, }, AccountSlots: 200, }, &args.BlockConsumer{ NodeArgs: args.NodeArgs{ - ConsoleVerbosity: "0", - DirVerbosity: "5", + ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)), + DirVerbosity: strconv.Itoa(int(dirLogLevel)), WithoutHeimdall: true, }, }, @@ -70,8 +77,11 @@ func NewBorDevnetWithHeimdall( heimdallGrpcAddr string, checkpointOwner *accounts.Account, producerCount int, + gasLimit uint64, withMilestones bool, logger log.Logger, + consoleLogLevel log.Lvl, + dirLogLevel log.Lvl, ) devnet.Devnet { faucetSource := accounts.NewAccount("faucet-source") @@ -89,8 +99,8 @@ func NewBorDevnetWithHeimdall( for i := 0; i < producerCount; i++ { nodes = append(nodes, &args.BlockProducer{ NodeArgs: args.NodeArgs{ - ConsoleVerbosity: "0", - DirVerbosity: "5", + ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)), + DirVerbosity: strconv.Itoa(int(dirLogLevel)), HeimdallGrpcAddr: heimdallGrpcAddr, }, AccountSlots: 20000, @@ -108,14 +118,17 @@ func NewBorDevnetWithHeimdall( BorStateSyncDelay: 5 * time.Second, BorWithMilestones: &withMilestones, Services: append(services, account_services.NewFaucet(networkname.BorDevnetChainName, faucetSource)), - Alloc: types.GenesisAlloc{ - faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, + Genesis: &types.Genesis{ + Alloc: types.GenesisAlloc{ + faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, + }, + GasLimit: gasLimit, }, Nodes: append(nodes, &args.BlockConsumer{ NodeArgs: args.NodeArgs{ - ConsoleVerbosity: "0", - DirVerbosity: "5", + ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)), + DirVerbosity: strconv.Itoa(int(dirLogLevel)), HeimdallGrpcAddr: heimdallGrpcAddr, }, }), @@ -130,15 +143,17 @@ func NewBorDevnetWithHeimdall( BaseRPCHost: baseRpcHost, BaseRPCPort: baseRpcPort + 1000, Services: append(services, account_services.NewFaucet(networkname.DevChainName, faucetSource)), - Alloc: types.GenesisAlloc{ - faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, - checkpointOwner.Address: {Balance: accounts.EtherAmount(10_000)}, + Genesis: &types.Genesis{ + Alloc: types.GenesisAlloc{ + faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, + checkpointOwner.Address: {Balance: accounts.EtherAmount(10_000)}, + }, }, Nodes: []devnet.Node{ &args.BlockProducer{ NodeArgs: args.NodeArgs{ - ConsoleVerbosity: "0", - DirVerbosity: "5", + ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)), + DirVerbosity: strconv.Itoa(int(dirLogLevel)), VMDebug: true, HttpCorsDomain: "*", }, @@ -147,8 +162,8 @@ func NewBorDevnetWithHeimdall( }, &args.BlockConsumer{ NodeArgs: args.NodeArgs{ - ConsoleVerbosity: "0", - DirVerbosity: "3", + ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)), + DirVerbosity: strconv.Itoa(int(dirLogLevel)), }, }, }, @@ -165,7 +180,10 @@ func NewBorDevnetWithRemoteHeimdall( baseRpcHost string, baseRpcPort int, producerCount int, + gasLimit uint64, logger log.Logger, + consoleLogLevel log.Lvl, + dirLogLevel log.Lvl, ) devnet.Devnet { heimdallGrpcAddr := "" checkpointOwner := accounts.NewAccount("checkpoint-owner") @@ -178,8 +196,11 @@ func NewBorDevnetWithRemoteHeimdall( heimdallGrpcAddr, checkpointOwner, producerCount, + gasLimit, withMilestones, - logger) + logger, + consoleLogLevel, + dirLogLevel) } func NewBorDevnetWithLocalHeimdall( @@ -189,7 +210,10 @@ func NewBorDevnetWithLocalHeimdall( heimdallGrpcAddr string, sprintSize uint64, producerCount int, + gasLimit uint64, logger log.Logger, + consoleLogLevel log.Lvl, + dirLogLevel log.Lvl, ) devnet.Devnet { config := *params.BorDevnetChainConfig borConfig := config.Bor.(*borcfg.BorConfig) @@ -216,7 +240,8 @@ func NewBorDevnetWithLocalHeimdall( heimdallGrpcAddr, checkpointOwner, producerCount, + gasLimit, // milestones are not supported yet on the local heimdall false, - logger) + logger, consoleLogLevel, dirLogLevel) } diff --git a/cmd/devnet/networks/devnet_dev.go b/cmd/devnet/networks/devnet_dev.go index 142980540..fa8e399f8 100644 --- a/cmd/devnet/networks/devnet_dev.go +++ b/cmd/devnet/networks/devnet_dev.go @@ -1,6 +1,8 @@ package networks import ( + "strconv" + "github.com/ledgerwatch/erigon-lib/chain/networkname" "github.com/ledgerwatch/erigon/cmd/devnet/accounts" "github.com/ledgerwatch/erigon/cmd/devnet/args" @@ -15,7 +17,10 @@ func NewDevDevnet( baseRpcHost string, baseRpcPort int, producerCount int, + gasLimit uint64, logger log.Logger, + consoleLogLevel log.Lvl, + dirLogLevel log.Lvl, ) devnet.Devnet { faucetSource := accounts.NewAccount("faucet-source") @@ -28,8 +33,8 @@ func NewDevDevnet( for i := 0; i < producerCount; i++ { nodes = append(nodes, &args.BlockProducer{ NodeArgs: args.NodeArgs{ - ConsoleVerbosity: "0", - DirVerbosity: "5", + ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)), + DirVerbosity: strconv.Itoa(int(dirLogLevel)), }, AccountSlots: 200, }) @@ -42,8 +47,11 @@ func NewDevDevnet( BasePrivateApiAddr: "localhost:10090", BaseRPCHost: baseRpcHost, BaseRPCPort: baseRpcPort, - Alloc: types.GenesisAlloc{ - faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, + Genesis: &types.Genesis{ + Alloc: types.GenesisAlloc{ + faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, + }, + GasLimit: gasLimit, }, Services: []devnet.Service{ account_services.NewFaucet(networkname.DevChainName, faucetSource), diff --git a/cmd/devnet/requests/request_generator.go b/cmd/devnet/requests/request_generator.go index 1c1e04628..1598cba5c 100644 --- a/cmd/devnet/requests/request_generator.go +++ b/cmd/devnet/requests/request_generator.go @@ -184,34 +184,50 @@ func (req *requestGenerator) rpcCall(ctx context.Context, result interface{}, me }) } -const connectionTimeout = time.Second * 5 +const requestTimeout = time.Second * 20 +const connectionTimeout = time.Millisecond * 500 func isConnectionError(err error) bool { var opErr *net.OpError - if errors.As(err, &opErr) { + switch { + case errors.As(err, &opErr): return opErr.Op == "dial" + + case errors.Is(err, context.DeadlineExceeded): + return true } + return false } func retryConnects(ctx context.Context, op func(context.Context) error) error { - ctx, cancel := context.WithTimeout(ctx, connectionTimeout) + ctx, cancel := context.WithTimeout(ctx, requestTimeout) defer cancel() - return retry(ctx, op, isConnectionError, time.Millisecond*200, nil) + return retry(ctx, op, isConnectionError, time.Second*1, nil) } func retry(ctx context.Context, op func(context.Context) error, isRecoverableError func(error) bool, delay time.Duration, lastErr error) error { - err := op(ctx) + opctx, cancel := context.WithTimeout(ctx, connectionTimeout) + defer cancel() + + err := op(opctx) + if err == nil { return nil } - if errors.Is(err, context.DeadlineExceeded) && lastErr != nil { - return lastErr - } + if !isRecoverableError(err) { return err } + if errors.Is(err, context.DeadlineExceeded) { + if lastErr != nil { + return lastErr + } + + err = nil + } + delayTimer := time.NewTimer(delay) select { case <-delayTimer.C: diff --git a/cmd/devnet/tests/context.go b/cmd/devnet/tests/context.go index 7f9ead045..7eb115b0e 100644 --- a/cmd/devnet/tests/context.go +++ b/cmd/devnet/tests/context.go @@ -16,7 +16,7 @@ import ( "github.com/ledgerwatch/log/v3" ) -func initDevnet(chainName string, dataDir string, producerCount int, logger log.Logger) (devnet.Devnet, error) { +func initDevnet(chainName string, dataDir string, producerCount int, gasLimit uint64, logger log.Logger, consoleLogLevel log.Lvl, dirLogLevel log.Lvl) (devnet.Devnet, error) { const baseRpcHost = "localhost" const baseRpcPort = 8545 @@ -24,17 +24,17 @@ func initDevnet(chainName string, dataDir string, producerCount int, logger log. case networkname.BorDevnetChainName: heimdallGrpcAddr := polygon.HeimdallGrpcAddressDefault const sprintSize uint64 = 0 - return networks.NewBorDevnetWithLocalHeimdall(dataDir, baseRpcHost, baseRpcPort, heimdallGrpcAddr, sprintSize, producerCount, logger), nil + return networks.NewBorDevnetWithLocalHeimdall(dataDir, baseRpcHost, baseRpcPort, heimdallGrpcAddr, sprintSize, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil case networkname.DevChainName: - return networks.NewDevDevnet(dataDir, baseRpcHost, baseRpcPort, producerCount, logger), nil + return networks.NewDevDevnet(dataDir, baseRpcHost, baseRpcPort, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil case "": envChainName, _ := os.LookupEnv("DEVNET_CHAIN") if envChainName == "" { envChainName = networkname.DevChainName } - return initDevnet(envChainName, dataDir, producerCount, logger) + return initDevnet(envChainName, dataDir, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel) default: return nil, fmt.Errorf("unknown network: '%s'", chainName) @@ -57,8 +57,12 @@ func ContextStart(t *testing.T, chainName string) (devnet.Context, error) { producerCount, _ := strconv.ParseUint(envProducerCount, 10, 64) + // TODO get log levels from env + var dirLogLevel log.Lvl = log.LvlTrace + var consoleLogLevel log.Lvl = log.LvlCrit + var network devnet.Devnet - network, err := initDevnet(chainName, dataDir, int(producerCount), logger) + network, err := initDevnet(chainName, dataDir, int(producerCount), 0, logger, consoleLogLevel, dirLogLevel) if err != nil { return nil, fmt.Errorf("ContextStart initDevnet failed: %w", err) } diff --git a/cmd/devnet/transactions/tx.go b/cmd/devnet/transactions/tx.go index dcaafd438..3a241171f 100644 --- a/cmd/devnet/transactions/tx.go +++ b/cmd/devnet/transactions/tx.go @@ -140,27 +140,20 @@ func SendTxLoad(ctx context.Context, to, from string, amount uint64, txPerSec ui for { start := time.Now() - lowtx, hightx, err := CreateManyEIP1559TransactionsRefWithBaseFee2(ctx, to, from, int(batchCount)) + tx, err := CreateManyEIP1559TransactionsHigherThanBaseFee(ctx, to, from, int(batchCount)) if err != nil { logger.Error("failed Create Txs", "error", err) return err } - _, err = SendManyTransactions(ctx, lowtx) + _, err = SendManyTransactions(ctx, tx) if err != nil { logger.Error("failed SendManyTransactions(higherThanBaseFeeTxs)", "error", err) return err } - _, err = SendManyTransactions(ctx, hightx) - - if err != nil { - logger.Error("failed SendManyTransactions(lowerThanBaseFeeTxs)", "error", err) - return err - } - select { case <-ctx.Done(): return nil @@ -249,6 +242,33 @@ func CreateManyEIP1559TransactionsRefWithBaseFee2(ctx context.Context, to, from return lowerBaseFeeTransactions, higherBaseFeeTransactions, nil } +func CreateManyEIP1559TransactionsHigherThanBaseFee(ctx context.Context, to, from string, count int) ([]types.Transaction, error) { + toAddress := libcommon.HexToAddress(to) + fromAddress := libcommon.HexToAddress(from) + + baseFeePerGas, err := blocks.BaseFeeFromBlock(ctx) + + if err != nil { + return nil, fmt.Errorf("failed BaseFeeFromBlock: %v", err) + } + + baseFeePerGas = baseFeePerGas * 2 + + devnet.Logger(ctx).Info("BaseFeePerGas2", "val", baseFeePerGas) + + node := devnet.SelectNode(ctx) + + res, err := node.GetTransactionCount(fromAddress, rpc.PendingBlock) + + if err != nil { + return nil, fmt.Errorf("failed to get transaction count for address 0x%x: %v", fromAddress, err) + } + + nonce := res.Uint64() + + return signEIP1559TxsHigherThanBaseFee(ctx, count, baseFeePerGas, &nonce, toAddress, fromAddress) +} + // createNonContractTx returns a signed transaction and the recipient address func CreateTransaction(node devnet.Node, to, from string, value uint64) (types.Transaction, libcommon.Address, error) { toAccount := accounts.GetAccount(to) @@ -344,7 +364,7 @@ func signEIP1559TxsLowerThanBaseFee(ctx context.Context, n int, baseFeePerGas ui transaction := types.NewEIP1559Transaction(chainId, *nonce, toAddress, uint256.NewInt(value), uint64(210_000), uint256.NewInt(gasPrice), new(uint256.Int), uint256.NewInt(gasFeeCap), nil) - devnet.Logger(ctx).Info("LOWER", "transaction", i, "nonce", transaction.Nonce, "value", transaction.Value, "feecap", transaction.FeeCap) + devnet.Logger(ctx).Trace("LOWER", "transaction", i, "nonce", transaction.Nonce, "value", transaction.Value, "feecap", transaction.FeeCap) signedTransaction, err := types.SignTx(transaction, signer, accounts.SigKey(fromAddress)) @@ -385,7 +405,7 @@ func signEIP1559TxsHigherThanBaseFee(ctx context.Context, n int, baseFeePerGas u transaction := types.NewEIP1559Transaction(chainId, *nonce, toAddress, uint256.NewInt(value), uint64(210_000), uint256.NewInt(gasPrice), new(uint256.Int), uint256.NewInt(gasFeeCap), nil) - devnet.Logger(ctx).Info("HIGHER", "transaction", i, "nonce", transaction.Nonce, "value", transaction.Value, "feecap", transaction.FeeCap) + devnet.Logger(ctx).Trace("HIGHER", "transaction", i, "nonce", transaction.Nonce, "value", transaction.Value, "feecap", transaction.FeeCap) signerKey := accounts.SigKey(fromAddress) if signerKey == nil { @@ -407,7 +427,7 @@ func signEIP1559TxsHigherThanBaseFee(ctx context.Context, n int, baseFeePerGas u func SendManyTransactions(ctx context.Context, signedTransactions []types.Transaction) ([]libcommon.Hash, error) { logger := devnet.Logger(ctx) - logger.Info("Sending multiple transactions to the txpool...") + logger.Info(fmt.Sprintf("Sending %d transactions to the txpool...", len(signedTransactions))) hashes := make([]libcommon.Hash, len(signedTransactions)) for idx, tx := range signedTransactions { diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 34f8f438c..5a2f37db8 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -208,7 +208,7 @@ func subscribeToStateChangesLoop(ctx context.Context, client StateChangesClient, time.Sleep(3 * time.Second) continue } - log.Warn("[txpool.handleStateChanges]", "err", err) + log.Warn("[rpcdaemon subscribeToStateChanges]", "err", err) } } }() diff --git a/cmd/txpool/main.go b/cmd/txpool/main.go index d915a18b3..463be539b 100644 --- a/cmd/txpool/main.go +++ b/cmd/txpool/main.go @@ -170,12 +170,6 @@ func doTxpool(ctx context.Context, logger log.Logger) error { fetch.ConnectCore() fetch.ConnectSentries() - /* - var ethashApi *ethash.API - sif casted, ok := backend.engine.(*ethash.Ethash); ok { - ethashApi = casted.APIs(nil)[1].Service.(*ethash.API) - } - */ miningGrpcServer := privateapi.NewMiningServer(ctx, &rpcdaemontest.IsMiningMock{}, nil, logger) grpcServer, err := txpool.StartGrpc(txpoolGrpcServer, miningGrpcServer, txpoolApiAddr, nil, logger) diff --git a/consensus/chain_reader.go b/consensus/chain_reader.go index f79de40c4..c7d81953b 100644 --- a/consensus/chain_reader.go +++ b/consensus/chain_reader.go @@ -82,7 +82,7 @@ func (cr ChainReaderImpl) FrozenBlocks() uint64 { func (cr ChainReaderImpl) BorSpan(spanId uint64) []byte { spanBytes, err := cr.BlockReader.Span(context.Background(), cr.Db, spanId) if err != nil { - log.Error("BorSpan failed", "err", err) + log.Error("[consensus] BorSpan failed", "err", err) } return spanBytes } diff --git a/erigon-lib/downloader/path.go b/erigon-lib/downloader/path.go index 06ba51865..195c6d05c 100644 --- a/erigon-lib/downloader/path.go +++ b/erigon-lib/downloader/path.go @@ -171,7 +171,7 @@ func Clean(path string) string { return FromSlash(out.string()) } -func unixIsLocal(path string) bool { +func unixIsLocal(path string) bool { //nolint if IsAbs(path) || path == "" { return false } diff --git a/erigon-lib/downloader/path_windows.go b/erigon-lib/downloader/path_windows.go index f5f4a01d9..687e81429 100644 --- a/erigon-lib/downloader/path_windows.go +++ b/erigon-lib/downloader/path_windows.go @@ -175,51 +175,6 @@ func HasPrefix(p, prefix string) bool { return strings.HasPrefix(strings.ToLower(p), strings.ToLower(prefix)) } -func splitList(path string) []string { - // The same implementation is used in LookPath in os/exec; - // consider changing os/exec when changing this. - - if path == "" { - return []string{} - } - - // Split path, respecting but preserving quotes. - list := []string{} - start := 0 - quo := false - for i := 0; i < len(path); i++ { - switch c := path[i]; { - case c == '"': - quo = !quo - case c == ListSeparator && !quo: - list = append(list, path[start:i]) - start = i + 1 - } - } - list = append(list, path[start:]) - - // Remove quotes. - for i, s := range list { - list[i] = strings.ReplaceAll(s, `"`, ``) - } - - return list -} - -func abs(path string) (string, error) { - if path == "" { - // syscall.FullPath returns an error on empty path, because it's not a valid path. - // To implement Abs behavior of returning working directory on empty string input, - // special-case empty path by changing it to "." path. See golang.org/issue/24441. - path = "." - } - fullPath, err := syscall.FullPath(path) - if err != nil { - return "", err - } - return Clean(fullPath), nil -} - func join(elem []string) string { var b strings.Builder var lastChar byte @@ -260,47 +215,3 @@ func join(elem []string) string { } return Clean(b.String()) } - -// joinNonEmpty is like join, but it assumes that the first element is non-empty. -func joinNonEmpty(elem []string) string { - if len(elem[0]) == 2 && elem[0][1] == ':' { - // First element is drive letter without terminating slash. - // Keep path relative to current directory on that drive. - // Skip empty elements. - i := 1 - for ; i < len(elem); i++ { - if elem[i] != "" { - break - } - } - return Clean(elem[0] + strings.Join(elem[i:], string(Separator))) - } - // The following logic prevents Join from inadvertently creating a - // UNC path on Windows. Unless the first element is a UNC path, Join - // shouldn't create a UNC path. See golang.org/issue/9167. - p := Clean(strings.Join(elem, string(Separator))) - if !isUNC(p) { - return p - } - // p == UNC only allowed when the first element is a UNC path. - head := Clean(elem[0]) - if isUNC(head) { - return p - } - // head + tail == UNC, but joining two non-UNC paths should not result - // in a UNC path. Undo creation of UNC path. - tail := Clean(strings.Join(elem[1:], string(Separator))) - if head[len(head)-1] == Separator { - return head + tail - } - return head + string(Separator) + tail -} - -// isUNC reports whether path is a UNC path. -func isUNC(path string) bool { - return len(path) > 1 && isSlash(path[0]) && isSlash(path[1]) -} - -func sameWord(a, b string) bool { - return strings.EqualFold(a, b) -} diff --git a/erigon-lib/mmap/mmap_windows.go b/erigon-lib/mmap/mmap_windows.go index b343ebb40..0ce85db9e 100644 --- a/erigon-lib/mmap/mmap_windows.go +++ b/erigon-lib/mmap/mmap_windows.go @@ -41,7 +41,7 @@ func Mmap(f *os.File, size int) ([]byte, *[MaxMapSize]byte, error) { } // Close mapping handle. - if err := windows.CloseHandle(windows.Handle(h)); err != nil { + if err := windows.CloseHandle(h); err != nil { return nil, nil, os.NewSyscallError("CloseHandle", err) } diff --git a/erigon-lib/txpool/pool.go b/erigon-lib/txpool/pool.go index 69dfb3932..45dc6f115 100644 --- a/erigon-lib/txpool/pool.go +++ b/erigon-lib/txpool/pool.go @@ -126,7 +126,6 @@ type metaTx struct { timestamp uint64 // when it was added to pool subPool SubPoolMarker currentSubPool SubPoolType - alreadyYielded bool minedBlockNum uint64 } @@ -211,6 +210,7 @@ type TxPool struct { cfg txpoolcfg.Config chainID uint256.Int lastSeenBlock atomic.Uint64 + lastSeenCond *sync.Cond lastFinalizedBlock atomic.Uint64 started atomic.Bool pendingBaseFee atomic.Uint64 @@ -249,8 +249,11 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, tracedSenders[common.BytesToAddress([]byte(sender))] = struct{}{} } + lock := &sync.Mutex{} + res := &TxPool{ - lock: &sync.Mutex{}, + lock: lock, + lastSeenCond: sync.NewCond(lock), byHash: map[string]*metaTx{}, isLocalLRU: localsHistory, discardReasonsLRU: discardHistory, @@ -298,42 +301,81 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, return res, nil } -func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error { - if err := minedTxs.Valid(); err != nil { - return err +func (p *TxPool) Start(ctx context.Context, db kv.RwDB) error { + if p.started.Load() { + return nil } + return db.View(ctx, func(tx kv.Tx) error { + coreDb, _ := p.coreDBWithCache() + coreTx, err := coreDb.BeginRo(ctx) + + if err != nil { + return err + } + + defer coreTx.Rollback() + + if err := p.fromDB(ctx, tx, coreTx); err != nil { + return fmt.Errorf("loading pool from DB: %w", err) + } + + if p.started.CompareAndSwap(false, true) { + p.logger.Info("[txpool] Started") + } + + return nil + }) +} + +func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error { + defer newBlockTimer.ObserveDuration(time.Now()) //t := time.Now() coreDB, cache := p.coreDBWithCache() cache.OnNewBlock(stateChanges) coreTx, err := coreDB.BeginRo(ctx) + if err != nil { return err } + defer coreTx.Rollback() - p.lastSeenBlock.Store(stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].BlockHeight) - if !p.started.Load() { - if err := p.fromDBWithLock(ctx, tx, coreTx); err != nil { - return fmt.Errorf("OnNewBlock: loading txs from DB: %w", err) - } + block := stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].BlockHeight + baseFee := stateChanges.PendingBlockBaseFee + available := len(p.pending.best.ms) + + defer func() { + p.logger.Debug("[txpool] New block", "block", block, "unwound", len(unwindTxs.Txs), "mined", len(minedTxs.Txs), "baseFee", baseFee, "pending-pre", available, "pending", p.pending.Len(), "baseFee", p.baseFee.Len(), "queued", p.queued.Len(), "err", err) + }() + + if err = minedTxs.Valid(); err != nil { + return err } + cacheView, err := cache.View(ctx, coreTx) + if err != nil { return err } p.lock.Lock() - defer p.lock.Unlock() + defer func() { + if err == nil { + p.lastSeenBlock.Store(block) + p.lastSeenCond.Broadcast() + } + + p.lock.Unlock() + }() if assert.Enable { if _, err := kvcache.AssertCheckValues(ctx, coreTx, cache); err != nil { p.logger.Error("AssertCheckValues", "err", err, "stack", stack.Trace().String()) } } - baseFee := stateChanges.PendingBlockBaseFee pendingBaseFee, baseFeeChanged := p.setBaseFee(baseFee) // Update pendingBase for all pool queues and slices @@ -350,10 +392,13 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang p.setBlobFee(pendingBlobFee) p.blockGasLimit.Store(stateChanges.BlockGasLimit) - if err := p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs, p.logger); err != nil { + + if err = p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs, p.logger); err != nil { return err } + _, unwindTxs, err = p.validateTxs(&unwindTxs, cacheView) + if err != nil { return err } @@ -371,21 +416,23 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang } } - if err := p.processMinedFinalizedBlobs(coreTx, minedTxs.Txs, stateChanges.FinalizedBlock); err != nil { - return err - } - if err := removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil { + if err = p.processMinedFinalizedBlobs(coreTx, minedTxs.Txs, stateChanges.FinalizedBlock); err != nil { return err } - //p.logger.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "baseFee", baseFee, "blockHeight", blockHeight) + if err = removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil { + return err + } + + var announcements types.Announcements + + announcements, err = addTxsOnNewBlock(block, cacheView, stateChanges, p.senders, unwindTxs, /* newTxs */ + pendingBaseFee, stateChanges.BlockGasLimit, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, p.logger) - announcements, err := addTxsOnNewBlock(p.lastSeenBlock.Load(), cacheView, stateChanges, p.senders, unwindTxs, /* newTxs */ - pendingBaseFee, stateChanges.BlockGasLimit, - p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, p.logger) if err != nil { return err } + p.pending.EnforceWorstInvariants() p.baseFee.EnforceInvariants() p.queued.EnforceInvariants() @@ -394,10 +441,6 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang p.promoted.Reset() p.promoted.AppendOther(announcements) - if p.started.CompareAndSwap(false, true) { - p.logger.Info("[txpool] Started") - } - if p.promoted.Len() > 0 { select { case p.newPendingTxs <- p.promoted.Copy(): @@ -405,12 +448,11 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang } } - //p.logger.Info("[txpool] new block", "number", p.lastSeenBlock.Load(), "pendngBaseFee", pendingBaseFee, "in", time.Since(t)) return nil } func (p *TxPool) processRemoteTxs(ctx context.Context) error { - if !p.started.Load() { + if !p.Started() { return fmt.Errorf("txpool not started yet") } @@ -610,20 +652,29 @@ func (p *TxPool) IsLocal(idHash []byte) bool { func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) } func (p *TxPool) Started() bool { return p.started.Load() } -func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) { - // First wait for the corresponding block to arrive - if p.lastSeenBlock.Load() < onTopOf { - return false, 0, nil // Too early +func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, yielded mapset.Set[[32]byte]) (bool, int, error) { + p.lock.Lock() + defer p.lock.Unlock() + + for last := p.lastSeenBlock.Load(); last < onTopOf; last = p.lastSeenBlock.Load() { + p.logger.Debug("[txpool] Waiting for block", "expecting", onTopOf, "lastSeen", last, "txRequested", n, "pending", p.pending.Len(), "baseFee", p.baseFee.Len(), "queued", p.queued.Len()) + p.lastSeenCond.Wait() } - isShanghai := p.isShanghai() || p.isAgra() best := p.pending.best + isShanghai := p.isShanghai() || p.isAgra() + txs.Resize(uint(cmp.Min(int(n), len(best.ms)))) var toRemove []*metaTx count := 0 + i := 0 - for i := 0; count < int(n) && i < len(best.ms); i++ { + defer func() { + p.logger.Debug("[txpool] Processing best request", "last", onTopOf, "txRequested", n, "txAvailable", len(best.ms), "txProcessed", i, "txReturned", count) + }() + + for ; count < int(n) && i < len(best.ms); i++ { // if we wouldn't have enough gas for a standard transaction then quit out early if availableGas < fixedgas.TxGas { break @@ -631,7 +682,7 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG mt := best.ms[i] - if toSkip.Contains(mt.Tx.IDHash) { + if yielded.Contains(mt.Tx.IDHash) { continue } @@ -669,7 +720,7 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG txs.Txs[count] = rlpTx copy(txs.Senders.At(count), sender.Bytes()) txs.IsLocal[count] = isLocal - toSkip.Add(mt.Tx.IDHash) // TODO: Is this unnecessary + yielded.Add(mt.Tx.IDHash) count++ } @@ -682,26 +733,13 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG return true, count, nil } -func (p *TxPool) ResetYieldedStatus() { - p.lock.Lock() - defer p.lock.Unlock() - best := p.pending.best - for i := 0; i < len(best.ms); i++ { - best.ms[i].alreadyYielded = false - } -} - func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) { - p.lock.Lock() - defer p.lock.Unlock() return p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, toSkip) } func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64) (bool, error) { set := mapset.NewThreadUnsafeSet[[32]byte]() - p.lock.Lock() - defer p.lock.Unlock() - onTime, _, err := p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, set) + onTime, _, err := p.YieldBest(n, txs, tx, onTopOf, availableGas, availableBlobGas, set) return onTime, err } @@ -1075,15 +1113,6 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, p.lock.Lock() defer p.lock.Unlock() - if !p.Started() { - if err := p.fromDB(ctx, tx, coreTx); err != nil { - return nil, fmt.Errorf("AddLocalTxs: loading txs from DB: %w", err) - } - if p.started.CompareAndSwap(false, true) { - p.logger.Info("[txpool] Started") - } - } - if err = p.senders.registerNewSenders(&newTransactions, p.logger); err != nil { return nil, err } @@ -1432,27 +1461,38 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P } var toDel []*metaTx // can't delete items while iterate them + + discarded := 0 + pendingRemoved := 0 + baseFeeRemoved := 0 + queuedRemoved := 0 + for senderID, nonce := range noncesToRemove { - //if sender.all.Len() > 0 { - //logger.Debug("[txpool] removing mined", "senderID", tx.senderID, "sender.all.len()", sender.all.Len()) - //} - // delete mined transactions from everywhere + byNonce.ascend(senderID, func(mt *metaTx) bool { - //logger.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", it.metaTx.Tx.nonce, "sender.nonce", sender.nonce) if mt.Tx.Nonce > nonce { + if mt.Tx.Traced { + logger.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", mt.Tx.Nonce, "sender.nonce", nonce) + } + return false } + if mt.Tx.Traced { logger.Info(fmt.Sprintf("TX TRACING: removeMined idHash=%x senderId=%d, currentSubPool=%s", mt.Tx.IDHash, mt.Tx.SenderID, mt.currentSubPool)) } + toDel = append(toDel, mt) // del from sub-pool switch mt.currentSubPool { case PendingSubPool: + pendingRemoved++ pending.Remove(mt) case BaseFeeSubPool: + baseFeeRemoved++ baseFee.Remove(mt) case QueuedSubPool: + queuedRemoved++ queued.Remove(mt) default: //already removed @@ -1460,11 +1500,18 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P return true }) + discarded += len(toDel) + for _, mt := range toDel { discard(mt, txpoolcfg.Mined) } toDel = toDel[:0] } + + if discarded > 0 { + logger.Debug("Discarding Transactions", "count", discarded, "pending", pendingRemoved, "baseFee", baseFeeRemoved, "queued", queuedRemoved) + } + return nil } @@ -1657,6 +1704,13 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs logEvery := time.NewTicker(p.cfg.LogEvery) defer logEvery.Stop() + err := p.Start(ctx, db) + + if err != nil { + p.logger.Error("[txpool] Failed to start", "err", err) + return + } + for { select { case <-ctx.Done(): @@ -1724,7 +1778,7 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs var remoteTxSizes []uint32 var remoteTxHashes types.Hashes var remoteTxRlps [][]byte - var broadCastedHashes types.Hashes + var broadcastHashes types.Hashes slotsRlp := make([][]byte, 0, announcements.Len()) if err := db.View(ctx, func(tx kv.Tx) error { @@ -1748,7 +1802,7 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs // "Nodes MUST NOT automatically broadcast blob transactions to their peers" - EIP-4844 if t != types.BlobTxType { localTxRlps = append(localTxRlps, slotRlp) - broadCastedHashes = append(broadCastedHashes, hash...) + broadcastHashes = append(broadcastHashes, hash...) } } else { remoteTxTypes = append(remoteTxTypes, t) @@ -1775,12 +1829,12 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs const localTxsBroadcastMaxPeers uint64 = 10 txSentTo := send.BroadcastPooledTxs(localTxRlps, localTxsBroadcastMaxPeers) for i, peer := range txSentTo { - p.logger.Info("Local tx broadcasted", "txHash", hex.EncodeToString(broadCastedHashes.At(i)), "to peer", peer) + p.logger.Trace("Local tx broadcast", "txHash", hex.EncodeToString(broadcastHashes.At(i)), "to peer", peer) } hashSentTo := send.AnnouncePooledTxs(localTxTypes, localTxSizes, localTxHashes, localTxsBroadcastMaxPeers*2) for i := 0; i < localTxHashes.Len(); i++ { hash := localTxHashes.At(i) - p.logger.Info("Local tx announced", "txHash", hex.EncodeToString(hash), "to peer", hashSentTo[i], "baseFee", p.pendingBaseFee.Load()) + p.logger.Trace("Local tx announced", "txHash", hex.EncodeToString(hash), "to peer", hashSentTo[i], "baseFee", p.pendingBaseFee.Load()) } // broadcast remote transactions @@ -1844,6 +1898,7 @@ func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err err } return written, nil } + func (p *TxPool) flushLocked(tx kv.RwTx) (err error) { for i, mt := range p.deletedTxs { id := mt.Tx.SenderID @@ -1927,20 +1982,33 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) { return nil } -func (p *TxPool) fromDBWithLock(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { - p.lock.Lock() - defer p.lock.Unlock() - return p.fromDB(ctx, tx, coreTx) -} func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { if p.lastSeenBlock.Load() == 0 { lastSeenBlock, err := LastSeenBlock(tx) if err != nil { return err } + p.lastSeenBlock.Store(lastSeenBlock) } + // this is neccessary as otherwise best - which waits for sync events + // may wait for ever if blocks have been process before the txpool + // starts with an empty db + lastSeenProgress, err := getExecutionProgress(coreTx) + + if err != nil { + return err + } + + if p.lastSeenBlock.Load() < lastSeenProgress { + // TODO we need to process the blocks since the + // last seen to make sure that the tx pool is in + // sync with the processed blocks + + p.lastSeenBlock.Store(lastSeenProgress) + } + cacheView, err := p._stateCache.View(ctx, coreTx) if err != nil { return err @@ -2031,6 +2099,24 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { p.pendingBlobFee.Store(pendingBlobFee) return nil } + +func getExecutionProgress(db kv.Getter) (uint64, error) { + data, err := db.GetOne(kv.SyncStageProgress, []byte("Execution")) + if err != nil { + return 0, err + } + + if len(data) == 0 { + return 0, nil + } + + if len(data) < 8 { + return 0, fmt.Errorf("value must be at least 8 bytes, got %d", len(data)) + } + + return binary.BigEndian.Uint64(data[:8]), nil +} + func LastSeenBlock(tx kv.Getter) (uint64, error) { v, err := tx.GetOne(kv.PoolInfo, PoolLastSeenBlockKey) if err != nil { @@ -2093,7 +2179,7 @@ func (p *TxPool) printDebug(prefix string) { } } func (p *TxPool) logStats() { - if !p.started.Load() { + if !p.Started() { //p.logger.Info("[txpool] Not started yet, waiting for new blocks...") return } diff --git a/erigon-lib/txpool/pool_fuzz_test.go b/erigon-lib/txpool/pool_fuzz_test.go index e81d31691..1e8923d88 100644 --- a/erigon-lib/txpool/pool_fuzz_test.go +++ b/erigon-lib/txpool/pool_fuzz_test.go @@ -316,6 +316,10 @@ func FuzzOnNewBlocks(f *testing.F) { sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New()) assert.NoError(err) + + err = pool.Start(ctx, db) + assert.NoError(err) + pool.senders.senderIDs = senderIDs for addr, id := range senderIDs { pool.senders.senderID2Addr[id] = addr @@ -538,6 +542,7 @@ func FuzzOnNewBlocks(f *testing.F) { p2, err := New(ch, coreDB, txpoolcfg.DefaultConfig, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New()) assert.NoError(err) + p2.senders = pool.senders // senders are not persisted err = coreDB.View(ctx, func(coreTx kv.Tx) error { return p2.fromDB(ctx, tx, coreTx) }) require.NoError(err) diff --git a/erigon-lib/txpool/pool_test.go b/erigon-lib/txpool/pool_test.go index a438be399..ef4347a58 100644 --- a/erigon-lib/txpool/pool_test.go +++ b/erigon-lib/txpool/pool_test.go @@ -19,6 +19,7 @@ package txpool import ( "bytes" "context" + // "crypto/rand" "fmt" "math" @@ -957,6 +958,10 @@ func TestDropRemoteAtNoGossip(t *testing.T) { require.True(txPool != nil) ctx := context.Background() + + err = txPool.Start(ctx, db) + assert.NoError(err) + var stateVersionID uint64 = 0 pendingBaseFee := uint64(1_000_000) // start blocks from 0, set empty hash - then kvcache will also work on this diff --git a/eth/backend.go b/eth/backend.go index bbec2e31c..105092b37 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -29,6 +29,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" lru "github.com/hashicorp/golang-lru/arc/v2" @@ -616,7 +617,6 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger miner := stagedsync.NewMiningState(&config.Miner) backend.pendingBlocks = miner.PendingResultCh - backend.minedBlocks = miner.MiningResultCh var ( snapDb kv.RwDB @@ -780,7 +780,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger } }() - if err := backend.StartMining(context.Background(), backend.chainDB, mining, backend.config.Miner, backend.sentriesClient.Hd.QuitPoWMining, tmpdir, logger); err != nil { + if err := backend.StartMining(context.Background(), backend.chainDB, stateDiffClient, mining, miner, backend.gasPrice, backend.sentriesClient.Hd.QuitPoWMining, tmpdir, logger); err != nil { return nil, err } @@ -1010,7 +1010,8 @@ func (s *Ethereum) shouldPreserve(block *types.Block) bool { //nolint // StartMining starts the miner with the given number of CPU threads. If mining // is already running, this method adjust the number of threads allowed to use // and updates the minimum price required by the transaction pool. -func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsync.Sync, cfg params.MiningConfig, quitCh chan struct{}, tmpDir string, logger log.Logger) error { +func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient *direct.StateDiffClientDirect, mining *stagedsync.Sync, miner stagedsync.MiningState, gasPrice *uint256.Int, quitCh chan struct{}, tmpDir string, logger log.Logger) error { + var borcfg *bor.Bor if b, ok := s.engine.(*bor.Bor); ok { borcfg = b @@ -1023,7 +1024,7 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy } //if borcfg == nil { - if !cfg.Enabled { + if !miner.MiningConfig.Enabled { return nil } //} @@ -1036,14 +1037,14 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy } if borcfg != nil { - if cfg.Enabled { - if cfg.SigKey == nil { + if miner.MiningConfig.Enabled { + if miner.MiningConfig.SigKey == nil { s.logger.Error("Etherbase account unavailable locally", "err", err) return fmt.Errorf("signer missing: %w", err) } borcfg.Authorize(eb, func(_ libcommon.Address, mimeType string, message []byte) ([]byte, error) { - return crypto.Sign(crypto.Keccak256(message), cfg.SigKey) + return crypto.Sign(crypto.Keccak256(message), miner.MiningConfig.SigKey) }) if !s.config.WithoutHeimdall { @@ -1082,47 +1083,73 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy } } if clq != nil { - if cfg.SigKey == nil { + if miner.MiningConfig.SigKey == nil { s.logger.Error("Etherbase account unavailable locally", "err", err) return fmt.Errorf("signer missing: %w", err) } clq.Authorize(eb, func(_ libcommon.Address, mimeType string, message []byte) ([]byte, error) { - return crypto.Sign(crypto.Keccak256(message), cfg.SigKey) + return crypto.Sign(crypto.Keccak256(message), miner.MiningConfig.SigKey) }) } + streamCtx, streamCancel := context.WithCancel(ctx) + stream, err := stateDiffClient.StateChanges(streamCtx, &remote.StateChangeRequest{WithStorage: false, WithTransactions: true}, grpc.WaitForReady(true)) + + if err != nil { + streamCancel() + return err + } + + stateChangeCh := make(chan *remote.StateChange) + + go func() { + for req, err := stream.Recv(); ; req, err = stream.Recv() { + if err == nil { + for _, change := range req.ChangeBatch { + stateChangeCh <- change + } + } + } + }() + go func() { defer debug.LogPanic() defer close(s.waitForMiningStop) + defer streamCancel() - mineEvery := time.NewTicker(cfg.Recommit) + mineEvery := time.NewTicker(miner.MiningConfig.Recommit) defer mineEvery.Stop() - // Listen on a new head subscription. This allows us to maintain the block time by - // triggering mining after the block is passed through all stages. - newHeadCh, closeNewHeadCh := s.notifications.Events.AddHeaderSubscription() - defer closeNewHeadCh() - s.logger.Info("Starting to mine", "etherbase", eb) - var works bool + var working bool + var waiting atomic.Bool + hasWork := true // Start mining immediately errc := make(chan error, 1) + workCtx, workCancel := context.WithCancel(ctx) + defer workCancel() + for { // Only reset if some work was done previously as we'd like to rely // on the `miner.recommit` as backup. if hasWork { - mineEvery.Reset(cfg.Recommit) + mineEvery.Reset(miner.MiningConfig.Recommit) } - // Only check for case if you're already mining (i.e. works = true) and + // Only check for case if you're already mining (i.e. working = true) and // waiting for error or you don't have any work yet (i.e. hasWork = false). - if works || !hasWork { + if working || !hasWork { select { - case <-newHeadCh: + case stateChanges := <-stateChangeCh: + block := stateChanges.BlockHeight + s.logger.Debug("Start mining new block based on previous block", "block", block) + // TODO - can do mining clean up here as we have previous + // block info in the state channel hasWork = true + case <-s.notifyMiningAboutNewTxs: // Skip mining based on new tx notif for bor consensus hasWork = s.chainConfig.Bor == nil @@ -1130,10 +1157,12 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy s.logger.Debug("Start mining new block based on txpool notif") } case <-mineEvery.C: - s.logger.Debug("Start mining new block based on miner.recommit") - hasWork = true + if !(working || waiting.Load()) { + s.logger.Debug("Start mining new block based on miner.recommit", "duration", miner.MiningConfig.Recommit) + } + hasWork = !(working || waiting.Load()) case err := <-errc: - works = false + working = false hasWork = false if errors.Is(err, libcommon.ErrStopped) { return @@ -1146,11 +1175,36 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy } } - if !works && hasWork { - works = true + if !working && hasWork { + working = true hasWork = false - mineEvery.Reset(cfg.Recommit) - go func() { errc <- stages2.MiningStep(ctx, db, mining, tmpDir, logger) }() + mineEvery.Reset(miner.MiningConfig.Recommit) + go func() { + err := stages2.MiningStep(ctx, db, mining, tmpDir, logger) + + waiting.Store(true) + defer waiting.Store(false) + + errc <- err + + if err != nil { + return + } + + for { + select { + case block := <-miner.MiningResultCh: + if block != nil { + s.logger.Debug("Mined block", "block", block.Number()) + s.minedBlocks <- block + } + return + case <-workCtx.Done(): + errc <- workCtx.Err() + return + } + } + }() } } }() diff --git a/eth/stagedsync/chain_reader.go b/eth/stagedsync/chain_reader.go index f1d0e5205..5c2d75c42 100644 --- a/eth/stagedsync/chain_reader.go +++ b/eth/stagedsync/chain_reader.go @@ -17,8 +17,7 @@ import ( // ChainReader implements consensus.ChainReader type ChainReader struct { - Cfg chain.Config - + Cfg chain.Config Db kv.Getter BlockReader services.FullBlockReader Logger log.Logger @@ -72,7 +71,7 @@ func (cr ChainReader) HasBlock(hash libcommon.Hash, number uint64) bool { func (cr ChainReader) GetTd(hash libcommon.Hash, number uint64) *big.Int { td, err := rawdb.ReadTd(cr.Db, hash, number) if err != nil { - log.Error("ReadTd failed", "err", err) + cr.Logger.Error("ReadTd failed", "err", err) return nil } return td diff --git a/eth/stagedsync/stage_bor_heimdall.go b/eth/stagedsync/stage_bor_heimdall.go index a6953b370..65cff4086 100644 --- a/eth/stagedsync/stage_bor_heimdall.go +++ b/eth/stagedsync/stage_bor_heimdall.go @@ -157,10 +157,13 @@ func BorHeimdallForward( } lastBlockNum := s.BlockNumber + if cfg.blockReader.FrozenBorBlocks() > lastBlockNum { lastBlockNum = cfg.blockReader.FrozenBorBlocks() } + recents, err := lru.NewARC[libcommon.Hash, *bor.Snapshot](inmemorySnapshots) + if err != nil { return err } @@ -168,6 +171,7 @@ func BorHeimdallForward( if err != nil { return err } + chain := NewChainReaderImpl(&cfg.chainConfig, tx, cfg.blockReader, logger) var blockNum uint64 diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 839d78424..6ac18f587 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -605,7 +605,7 @@ func (cr ChainReaderImpl) BorEventsByBlock(hash libcommon.Hash, number uint64) [ func (cr ChainReaderImpl) BorSpan(spanId uint64) []byte { span, err := cr.blockReader.Span(context.Background(), cr.tx, spanId) if err != nil { - cr.logger.Error("BorSpan failed", "err", err) + cr.logger.Error("[staged sync] BorSpan failed", "err", err) return nil } return span diff --git a/eth/stagedsync/stage_mining_bor_heimdall.go b/eth/stagedsync/stage_mining_bor_heimdall.go index 2dd96a98b..4a5d21665 100644 --- a/eth/stagedsync/stage_mining_bor_heimdall.go +++ b/eth/stagedsync/stage_mining_bor_heimdall.go @@ -77,7 +77,7 @@ func MiningBorHeimdallForward( } logger.Info( - "[%s] Finished processing", logPrefix, + fmt.Sprintf("[%s] Finished processing", logPrefix), "progress", headerNum, "lastSpanID", lastSpanID, "lastStateSyncEventID", lastStateSyncEventID, diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index abc94326d..7fae41332 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -41,8 +41,8 @@ type MiningExecCfg struct { tmpdir string interrupt *int32 payloadId uint64 - txPool2 TxPoolForMining - txPool2DB kv.RoDB + txPool TxPoolForMining + txPoolDB kv.RoDB } type TxPoolForMining interface { @@ -54,7 +54,7 @@ func StageMiningExecCfg( notifier ChainEventNotifier, chainConfig chain.Config, engine consensus.Engine, vmConfig *vm.Config, tmpdir string, interrupt *int32, payloadId uint64, - txPool2 TxPoolForMining, txPool2DB kv.RoDB, + txPool TxPoolForMining, txPoolDB kv.RoDB, blockReader services.FullBlockReader, ) MiningExecCfg { return MiningExecCfg{ @@ -68,8 +68,8 @@ func StageMiningExecCfg( tmpdir: tmpdir, interrupt: interrupt, payloadId: payloadId, - txPool2: txPool2, - txPool2DB: txPool2DB, + txPool: txPool, + txPoolDB: txPoolDB, } } @@ -150,7 +150,7 @@ func SpawnMiningExecStage(s *StageState, tx kv.RwTx, cfg MiningExecCfg, quit <-c } } - logger.Debug("SpawnMiningExecStage", "block txn", current.Txs.Len(), "payload", cfg.payloadId) + logger.Debug("SpawnMiningExecStage", "block", current.Header.Number, "txn", current.Txs.Len(), "payload", cfg.payloadId) if current.Uncles == nil { current.Uncles = []*types.Header{} } @@ -166,7 +166,8 @@ func SpawnMiningExecStage(s *StageState, tx kv.RwTx, cfg MiningExecCfg, quit <-c if err != nil { return err } - logger.Debug("FinalizeBlockExecution", "current txn", current.Txs.Len(), "current receipt", current.Receipts.Len(), "payload", cfg.payloadId) + + logger.Debug("FinalizeBlockExecution", "block", current.Header.Number, "txn", current.Txs.Len(), "gas", current.Header.GasUsed, "receipt", current.Receipts.Len(), "payload", cfg.payloadId) // hack: pretend that we are real execution stage - next stages will rely on this progress if err := stages.SaveStageProgress(tx, stages.Execution, current.Header.Number.Uint64()); err != nil { @@ -186,23 +187,20 @@ func getNextTransactions( logger log.Logger, ) (types.TransactionsStream, int, error) { txSlots := types2.TxsRlp{} - var onTime bool count := 0 - if err := cfg.txPool2DB.View(context.Background(), func(poolTx kv.Tx) error { + if err := cfg.txPoolDB.View(context.Background(), func(poolTx kv.Tx) error { var err error - counter := 0 - for !onTime && counter < 500 { - remainingGas := header.GasLimit - header.GasUsed - remainingBlobGas := uint64(0) - if header.BlobGasUsed != nil { - remainingBlobGas = cfg.chainConfig.GetMaxBlobGasPerBlock() - *header.BlobGasUsed - } - if onTime, count, err = cfg.txPool2.YieldBest(amount, &txSlots, poolTx, executionAt, remainingGas, remainingBlobGas, alreadyYielded); err != nil { - return err - } - time.Sleep(1 * time.Millisecond) - counter++ + + remainingGas := header.GasLimit - header.GasUsed + remainingBlobGas := uint64(0) + if header.BlobGasUsed != nil { + remainingBlobGas = cfg.chainConfig.GetMaxBlobGasPerBlock() - *header.BlobGasUsed } + + if _, count, err = cfg.txPool.YieldBest(amount, &txSlots, poolTx, executionAt, remainingGas, remainingBlobGas, alreadyYielded); err != nil { + return err + } + return nil }); err != nil { return nil, 0, err @@ -375,7 +373,6 @@ func addTransactionsToMiningBlock(logPrefix string, current *MiningBlock, chainC gasSnap := gasPool.Gas() blobGasSnap := gasPool.BlobGas() snap := ibs.Snapshot() - logger.Debug("addTransactionsToMiningBlock", "txn hash", txn.Hash()) receipt, _, err := core.ApplyTransaction(&chainConfig, core.GetHashFn(header, getHeader), engine, &coinbase, gasPool, ibs, noop, header, txn, &header.GasUsed, header.BlobGasUsed, *vmConfig) if err != nil { ibs.RevertToSnapshot(snap) @@ -464,7 +461,7 @@ LOOP: txs.Pop() } else if err == nil { // Everything ok, collect the logs and shift in the next transaction from the same account - logger.Debug(fmt.Sprintf("[%s] addTransactionsToMiningBlock Successful", logPrefix), "sender", from, "nonce", txn.GetNonce(), "payload", payloadId) + logger.Trace(fmt.Sprintf("[%s] Added transaction", logPrefix), "hash", txn.Hash(), "sender", from, "nonce", txn.GetNonce(), "payload", payloadId) coalescedLogs = append(coalescedLogs, logs...) tcount++ txs.Shift() diff --git a/eth/stagedsync/stage_mining_finish.go b/eth/stagedsync/stage_mining_finish.go index 81cc486e5..d3d36dfba 100644 --- a/eth/stagedsync/stage_mining_finish.go +++ b/eth/stagedsync/stage_mining_finish.go @@ -82,10 +82,10 @@ func SpawnMiningFinishStage(s *StageState, tx kv.RwTx, cfg MiningFinishCfg, quit if block.Transactions().Len() > 0 { logger.Info(fmt.Sprintf("[%s] block ready for seal", logPrefix), - "block_num", block.NumberU64(), + "block", block.NumberU64(), "transactions", block.Transactions().Len(), - "gas_used", block.GasUsed(), - "gas_limit", block.GasLimit(), + "gasUsed", block.GasUsed(), + "gasLimit", block.GasLimit(), "difficulty", block.Difficulty(), ) } diff --git a/polygon/bor/bor.go b/polygon/bor/bor.go index e3c31bdee..2bdf64a04 100644 --- a/polygon/bor/bor.go +++ b/polygon/bor/bor.go @@ -1112,11 +1112,13 @@ func (c *Bor) Seal(chain consensus.ChainHeaderReader, block *types.Block, result select { case <-stop: c.logger.Info("[bor] Stopped sealing operation for block", "number", number) + results <- nil return case <-time.After(delay): if c.headerProgress != nil && c.headerProgress.Progress() >= number { c.logger.Info("Discarding sealing operation for block", "number", number) + results <- nil return } diff --git a/turbo/cli/flags.go b/turbo/cli/flags.go index 75261f14d..88ab5ca94 100644 --- a/turbo/cli/flags.go +++ b/turbo/cli/flags.go @@ -478,7 +478,7 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg } if c.Enabled { - logger.Info("starting HTTP APIs", "APIs", apis) + logger.Info("starting HTTP APIs", "port", c.HttpPort, "APIs", apis) } if ctx.IsSet(utils.HttpCompressionFlag.Name) { diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index 848370531..a24367e83 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -1190,7 +1190,7 @@ func (r *BlockReader) Span(ctx context.Context, tx kv.Getter, spanId uint64) ([] return nil, err } if v == nil { - return nil, fmt.Errorf("span %d not found (db), frosenBlocks=%d", spanId, maxBlockNumInFiles) + return nil, fmt.Errorf("span %d not found (db), frozenBlocks=%d", spanId, maxBlockNumInFiles) } return common.Copy(v), nil } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 951dc4b7b..95db1e1e8 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -329,6 +329,7 @@ func (h *Hook) afterRun(tx kv.Tx, finishProgressBefore uint64) error { pendingBlobFee = f.Uint64() } + h.logger.Debug("[hook] Sending state changes", "currentBlock", currentHeader.Number.Uint64(), "finalizedBlock", finalizedBlock) notifications.Accumulator.SendAndReset(h.ctx, notifications.StateChangesConsumer, pendingBaseFee.Uint64(), pendingBlobFee, currentHeader.GasLimit, finalizedBlock) } // -- send notifications END