diff --git a/cmd/devnet/args/node_args.go b/cmd/devnet/args/node_args.go index 44930fee7..1fba7caaf 100644 --- a/cmd/devnet/args/node_args.go +++ b/cmd/devnet/args/node_args.go @@ -135,6 +135,7 @@ type BlockProducer struct { NodeArgs Mine bool `arg:"--mine" flag:"true"` Etherbase string `arg:"--miner.etherbase"` + GasLimit int `arg:"--miner.gaslimit"` DevPeriod int `arg:"--dev.period"` BorPeriod int `arg:"--bor.period"` BorMinBlockSize int `arg:"--bor.minblocksize"` diff --git a/cmd/devnet/devnet/network.go b/cmd/devnet/devnet/network.go index 29eee727c..372aca838 100644 --- a/cmd/devnet/devnet/network.go +++ b/cmd/devnet/devnet/network.go @@ -34,7 +34,7 @@ type Network struct { Snapshots bool Nodes []Node Services []Service - Alloc types.GenesisAlloc + Genesis *types.Genesis BorStateSyncDelay time.Duration BorPeriod time.Duration BorMinBlockSize int @@ -140,12 +140,16 @@ func (nw *Network) createNode(nodeArgs Node) (Node, error) { } if n.IsBlockProducer() { - if nw.Alloc == nil { - nw.Alloc = types.GenesisAlloc{ + if nw.Genesis == nil { + nw.Genesis = &types.Genesis{} + } + + if nw.Genesis.Alloc == nil { + nw.Genesis.Alloc = types.GenesisAlloc{ n.Account().Address: types.GenesisAccount{Balance: blockProducerFunds}, } } else { - nw.Alloc[n.Account().Address] = types.GenesisAccount{Balance: blockProducerFunds} + nw.Genesis.Alloc[n.Account().Address] = types.GenesisAccount{Balance: blockProducerFunds} } } diff --git a/cmd/devnet/devnet/node.go b/cmd/devnet/devnet/node.go index 30f466336..33f716aa3 100644 --- a/cmd/devnet/devnet/node.go +++ b/cmd/devnet/devnet/node.go @@ -167,8 +167,14 @@ func (n *devnetNode) run(ctx *cli.Context) error { n.nodeCfg.MdbxGrowthStep = 32 * datasize.MB n.nodeCfg.MdbxDBSizeLimit = 512 * datasize.MB - for addr, account := range n.network.Alloc { - n.ethCfg.Genesis.Alloc[addr] = account + if n.network.Genesis != nil { + for addr, account := range n.network.Genesis.Alloc { + n.ethCfg.Genesis.Alloc[addr] = account + } + + if n.network.Genesis.GasLimit != 0 { + n.ethCfg.Genesis.GasLimit = n.network.Genesis.GasLimit + } } if n.network.BorStateSyncDelay > 0 { diff --git a/cmd/devnet/main.go b/cmd/devnet/main.go index 655d0d88a..92a41aca1 100644 --- a/cmd/devnet/main.go +++ b/cmd/devnet/main.go @@ -6,6 +6,7 @@ import ( "os/signal" "path/filepath" dbg "runtime/debug" + "strconv" "strings" "syscall" "time" @@ -131,6 +132,12 @@ var ( Value: 1, } + GasLimitFlag = cli.Uint64Flag{ + Name: "gaslimit", + Usage: "Target gas limit for mined blocks", + Value: 0, + } + WaitFlag = cli.BoolFlag{ Name: "wait", Usage: "Wait until interrupted after all scenarios have run", @@ -173,6 +180,7 @@ func main() { &logging.LogVerbosityFlag, &logging.LogConsoleVerbosityFlag, &logging.LogDirVerbosityFlag, + &GasLimitFlag, } if err := app.Run(os.Args); err != nil { @@ -342,21 +350,74 @@ func initDevnet(ctx *cli.Context, logger log.Logger) (devnet.Devnet, error) { baseRpcHost := ctx.String(BaseRpcHostFlag.Name) baseRpcPort := ctx.Int(BaseRpcPortFlag.Name) producerCount := int(ctx.Uint(BlockProducersFlag.Name)) + gasLimit := ctx.Uint64(GasLimitFlag.Name) + + var dirLogLevel log.Lvl = log.LvlTrace + var consoleLogLevel log.Lvl = log.LvlCrit + + if ctx.IsSet(logging.LogVerbosityFlag.Name) { + lvlVal := ctx.String(logging.LogVerbosityFlag.Name) + + i, err := strconv.Atoi(lvlVal) + + lvl := log.Lvl(i) + + if err != nil { + lvl, err = log.LvlFromString(lvlVal) + } + + if err == nil { + consoleLogLevel = lvl + dirLogLevel = lvl + } + } else { + if ctx.IsSet(logging.LogConsoleVerbosityFlag.Name) { + lvlVal := ctx.String(logging.LogConsoleVerbosityFlag.Name) + + i, err := strconv.Atoi(lvlVal) + + lvl := log.Lvl(i) + + if err != nil { + lvl, err = log.LvlFromString(lvlVal) + } + + if err == nil { + consoleLogLevel = lvl + } + } + + if ctx.IsSet(logging.LogDirVerbosityFlag.Name) { + lvlVal := ctx.String(logging.LogDirVerbosityFlag.Name) + + i, err := strconv.Atoi(lvlVal) + + lvl := log.Lvl(i) + + if err != nil { + lvl, err = log.LvlFromString(lvlVal) + } + + if err == nil { + dirLogLevel = lvl + } + } + } switch chainName { case networkname.BorDevnetChainName: if ctx.Bool(WithoutHeimdallFlag.Name) { - return networks.NewBorDevnetWithoutHeimdall(dataDir, baseRpcHost, baseRpcPort, logger), nil + return networks.NewBorDevnetWithoutHeimdall(dataDir, baseRpcHost, baseRpcPort, gasLimit, logger, consoleLogLevel, dirLogLevel), nil } else if ctx.Bool(LocalHeimdallFlag.Name) { heimdallGrpcAddr := ctx.String(HeimdallGrpcAddressFlag.Name) sprintSize := uint64(ctx.Int(BorSprintSizeFlag.Name)) - return networks.NewBorDevnetWithLocalHeimdall(dataDir, baseRpcHost, baseRpcPort, heimdallGrpcAddr, sprintSize, producerCount, logger), nil + return networks.NewBorDevnetWithLocalHeimdall(dataDir, baseRpcHost, baseRpcPort, heimdallGrpcAddr, sprintSize, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil } else { - return networks.NewBorDevnetWithRemoteHeimdall(dataDir, baseRpcHost, baseRpcPort, producerCount, logger), nil + return networks.NewBorDevnetWithRemoteHeimdall(dataDir, baseRpcHost, baseRpcPort, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil } case networkname.DevChainName: - return networks.NewDevDevnet(dataDir, baseRpcHost, baseRpcPort, producerCount, logger), nil + return networks.NewDevDevnet(dataDir, baseRpcHost, baseRpcPort, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil default: return nil, fmt.Errorf("unknown network: '%s'", chainName) diff --git a/cmd/devnet/networks/devnet_bor.go b/cmd/devnet/networks/devnet_bor.go index 9f8cbd2fa..41d4993ea 100644 --- a/cmd/devnet/networks/devnet_bor.go +++ b/cmd/devnet/networks/devnet_bor.go @@ -1,6 +1,7 @@ package networks import ( + "strconv" "time" "github.com/ledgerwatch/log/v3" @@ -21,7 +22,10 @@ func NewBorDevnetWithoutHeimdall( dataDir string, baseRpcHost string, baseRpcPort int, + gasLimit uint64, logger log.Logger, + consoleLogLevel log.Lvl, + dirLogLevel log.Lvl, ) devnet.Devnet { faucetSource := accounts.NewAccount("faucet-source") @@ -34,8 +38,11 @@ func NewBorDevnetWithoutHeimdall( BaseRPCHost: baseRpcHost, BaseRPCPort: baseRpcPort, //Snapshots: true, - Alloc: types.GenesisAlloc{ - faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, + Genesis: &types.Genesis{ + Alloc: types.GenesisAlloc{ + faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, + }, + GasLimit: gasLimit, }, Services: []devnet.Service{ account_services.NewFaucet(networkname.BorDevnetChainName, faucetSource), @@ -43,16 +50,16 @@ func NewBorDevnetWithoutHeimdall( Nodes: []devnet.Node{ &args.BlockProducer{ NodeArgs: args.NodeArgs{ - ConsoleVerbosity: "0", - DirVerbosity: "5", + ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)), + DirVerbosity: strconv.Itoa(int(dirLogLevel)), WithoutHeimdall: true, }, AccountSlots: 200, }, &args.BlockConsumer{ NodeArgs: args.NodeArgs{ - ConsoleVerbosity: "0", - DirVerbosity: "5", + ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)), + DirVerbosity: strconv.Itoa(int(dirLogLevel)), WithoutHeimdall: true, }, }, @@ -70,8 +77,11 @@ func NewBorDevnetWithHeimdall( heimdallGrpcAddr string, checkpointOwner *accounts.Account, producerCount int, + gasLimit uint64, withMilestones bool, logger log.Logger, + consoleLogLevel log.Lvl, + dirLogLevel log.Lvl, ) devnet.Devnet { faucetSource := accounts.NewAccount("faucet-source") @@ -89,8 +99,8 @@ func NewBorDevnetWithHeimdall( for i := 0; i < producerCount; i++ { nodes = append(nodes, &args.BlockProducer{ NodeArgs: args.NodeArgs{ - ConsoleVerbosity: "0", - DirVerbosity: "5", + ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)), + DirVerbosity: strconv.Itoa(int(dirLogLevel)), HeimdallGrpcAddr: heimdallGrpcAddr, }, AccountSlots: 20000, @@ -108,14 +118,17 @@ func NewBorDevnetWithHeimdall( BorStateSyncDelay: 5 * time.Second, BorWithMilestones: &withMilestones, Services: append(services, account_services.NewFaucet(networkname.BorDevnetChainName, faucetSource)), - Alloc: types.GenesisAlloc{ - faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, + Genesis: &types.Genesis{ + Alloc: types.GenesisAlloc{ + faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, + }, + GasLimit: gasLimit, }, Nodes: append(nodes, &args.BlockConsumer{ NodeArgs: args.NodeArgs{ - ConsoleVerbosity: "0", - DirVerbosity: "5", + ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)), + DirVerbosity: strconv.Itoa(int(dirLogLevel)), HeimdallGrpcAddr: heimdallGrpcAddr, }, }), @@ -130,15 +143,17 @@ func NewBorDevnetWithHeimdall( BaseRPCHost: baseRpcHost, BaseRPCPort: baseRpcPort + 1000, Services: append(services, account_services.NewFaucet(networkname.DevChainName, faucetSource)), - Alloc: types.GenesisAlloc{ - faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, - checkpointOwner.Address: {Balance: accounts.EtherAmount(10_000)}, + Genesis: &types.Genesis{ + Alloc: types.GenesisAlloc{ + faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, + checkpointOwner.Address: {Balance: accounts.EtherAmount(10_000)}, + }, }, Nodes: []devnet.Node{ &args.BlockProducer{ NodeArgs: args.NodeArgs{ - ConsoleVerbosity: "0", - DirVerbosity: "5", + ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)), + DirVerbosity: strconv.Itoa(int(dirLogLevel)), VMDebug: true, HttpCorsDomain: "*", }, @@ -147,8 +162,8 @@ func NewBorDevnetWithHeimdall( }, &args.BlockConsumer{ NodeArgs: args.NodeArgs{ - ConsoleVerbosity: "0", - DirVerbosity: "3", + ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)), + DirVerbosity: strconv.Itoa(int(dirLogLevel)), }, }, }, @@ -165,7 +180,10 @@ func NewBorDevnetWithRemoteHeimdall( baseRpcHost string, baseRpcPort int, producerCount int, + gasLimit uint64, logger log.Logger, + consoleLogLevel log.Lvl, + dirLogLevel log.Lvl, ) devnet.Devnet { heimdallGrpcAddr := "" checkpointOwner := accounts.NewAccount("checkpoint-owner") @@ -178,8 +196,11 @@ func NewBorDevnetWithRemoteHeimdall( heimdallGrpcAddr, checkpointOwner, producerCount, + gasLimit, withMilestones, - logger) + logger, + consoleLogLevel, + dirLogLevel) } func NewBorDevnetWithLocalHeimdall( @@ -189,7 +210,10 @@ func NewBorDevnetWithLocalHeimdall( heimdallGrpcAddr string, sprintSize uint64, producerCount int, + gasLimit uint64, logger log.Logger, + consoleLogLevel log.Lvl, + dirLogLevel log.Lvl, ) devnet.Devnet { config := *params.BorDevnetChainConfig borConfig := config.Bor.(*borcfg.BorConfig) @@ -216,7 +240,8 @@ func NewBorDevnetWithLocalHeimdall( heimdallGrpcAddr, checkpointOwner, producerCount, + gasLimit, // milestones are not supported yet on the local heimdall false, - logger) + logger, consoleLogLevel, dirLogLevel) } diff --git a/cmd/devnet/networks/devnet_dev.go b/cmd/devnet/networks/devnet_dev.go index 142980540..fa8e399f8 100644 --- a/cmd/devnet/networks/devnet_dev.go +++ b/cmd/devnet/networks/devnet_dev.go @@ -1,6 +1,8 @@ package networks import ( + "strconv" + "github.com/ledgerwatch/erigon-lib/chain/networkname" "github.com/ledgerwatch/erigon/cmd/devnet/accounts" "github.com/ledgerwatch/erigon/cmd/devnet/args" @@ -15,7 +17,10 @@ func NewDevDevnet( baseRpcHost string, baseRpcPort int, producerCount int, + gasLimit uint64, logger log.Logger, + consoleLogLevel log.Lvl, + dirLogLevel log.Lvl, ) devnet.Devnet { faucetSource := accounts.NewAccount("faucet-source") @@ -28,8 +33,8 @@ func NewDevDevnet( for i := 0; i < producerCount; i++ { nodes = append(nodes, &args.BlockProducer{ NodeArgs: args.NodeArgs{ - ConsoleVerbosity: "0", - DirVerbosity: "5", + ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)), + DirVerbosity: strconv.Itoa(int(dirLogLevel)), }, AccountSlots: 200, }) @@ -42,8 +47,11 @@ func NewDevDevnet( BasePrivateApiAddr: "localhost:10090", BaseRPCHost: baseRpcHost, BaseRPCPort: baseRpcPort, - Alloc: types.GenesisAlloc{ - faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, + Genesis: &types.Genesis{ + Alloc: types.GenesisAlloc{ + faucetSource.Address: {Balance: accounts.EtherAmount(200_000)}, + }, + GasLimit: gasLimit, }, Services: []devnet.Service{ account_services.NewFaucet(networkname.DevChainName, faucetSource), diff --git a/cmd/devnet/requests/request_generator.go b/cmd/devnet/requests/request_generator.go index 1c1e04628..1598cba5c 100644 --- a/cmd/devnet/requests/request_generator.go +++ b/cmd/devnet/requests/request_generator.go @@ -184,34 +184,50 @@ func (req *requestGenerator) rpcCall(ctx context.Context, result interface{}, me }) } -const connectionTimeout = time.Second * 5 +const requestTimeout = time.Second * 20 +const connectionTimeout = time.Millisecond * 500 func isConnectionError(err error) bool { var opErr *net.OpError - if errors.As(err, &opErr) { + switch { + case errors.As(err, &opErr): return opErr.Op == "dial" + + case errors.Is(err, context.DeadlineExceeded): + return true } + return false } func retryConnects(ctx context.Context, op func(context.Context) error) error { - ctx, cancel := context.WithTimeout(ctx, connectionTimeout) + ctx, cancel := context.WithTimeout(ctx, requestTimeout) defer cancel() - return retry(ctx, op, isConnectionError, time.Millisecond*200, nil) + return retry(ctx, op, isConnectionError, time.Second*1, nil) } func retry(ctx context.Context, op func(context.Context) error, isRecoverableError func(error) bool, delay time.Duration, lastErr error) error { - err := op(ctx) + opctx, cancel := context.WithTimeout(ctx, connectionTimeout) + defer cancel() + + err := op(opctx) + if err == nil { return nil } - if errors.Is(err, context.DeadlineExceeded) && lastErr != nil { - return lastErr - } + if !isRecoverableError(err) { return err } + if errors.Is(err, context.DeadlineExceeded) { + if lastErr != nil { + return lastErr + } + + err = nil + } + delayTimer := time.NewTimer(delay) select { case <-delayTimer.C: diff --git a/cmd/devnet/tests/context.go b/cmd/devnet/tests/context.go index 7f9ead045..7eb115b0e 100644 --- a/cmd/devnet/tests/context.go +++ b/cmd/devnet/tests/context.go @@ -16,7 +16,7 @@ import ( "github.com/ledgerwatch/log/v3" ) -func initDevnet(chainName string, dataDir string, producerCount int, logger log.Logger) (devnet.Devnet, error) { +func initDevnet(chainName string, dataDir string, producerCount int, gasLimit uint64, logger log.Logger, consoleLogLevel log.Lvl, dirLogLevel log.Lvl) (devnet.Devnet, error) { const baseRpcHost = "localhost" const baseRpcPort = 8545 @@ -24,17 +24,17 @@ func initDevnet(chainName string, dataDir string, producerCount int, logger log. case networkname.BorDevnetChainName: heimdallGrpcAddr := polygon.HeimdallGrpcAddressDefault const sprintSize uint64 = 0 - return networks.NewBorDevnetWithLocalHeimdall(dataDir, baseRpcHost, baseRpcPort, heimdallGrpcAddr, sprintSize, producerCount, logger), nil + return networks.NewBorDevnetWithLocalHeimdall(dataDir, baseRpcHost, baseRpcPort, heimdallGrpcAddr, sprintSize, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil case networkname.DevChainName: - return networks.NewDevDevnet(dataDir, baseRpcHost, baseRpcPort, producerCount, logger), nil + return networks.NewDevDevnet(dataDir, baseRpcHost, baseRpcPort, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil case "": envChainName, _ := os.LookupEnv("DEVNET_CHAIN") if envChainName == "" { envChainName = networkname.DevChainName } - return initDevnet(envChainName, dataDir, producerCount, logger) + return initDevnet(envChainName, dataDir, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel) default: return nil, fmt.Errorf("unknown network: '%s'", chainName) @@ -57,8 +57,12 @@ func ContextStart(t *testing.T, chainName string) (devnet.Context, error) { producerCount, _ := strconv.ParseUint(envProducerCount, 10, 64) + // TODO get log levels from env + var dirLogLevel log.Lvl = log.LvlTrace + var consoleLogLevel log.Lvl = log.LvlCrit + var network devnet.Devnet - network, err := initDevnet(chainName, dataDir, int(producerCount), logger) + network, err := initDevnet(chainName, dataDir, int(producerCount), 0, logger, consoleLogLevel, dirLogLevel) if err != nil { return nil, fmt.Errorf("ContextStart initDevnet failed: %w", err) } diff --git a/cmd/devnet/transactions/tx.go b/cmd/devnet/transactions/tx.go index dcaafd438..3a241171f 100644 --- a/cmd/devnet/transactions/tx.go +++ b/cmd/devnet/transactions/tx.go @@ -140,27 +140,20 @@ func SendTxLoad(ctx context.Context, to, from string, amount uint64, txPerSec ui for { start := time.Now() - lowtx, hightx, err := CreateManyEIP1559TransactionsRefWithBaseFee2(ctx, to, from, int(batchCount)) + tx, err := CreateManyEIP1559TransactionsHigherThanBaseFee(ctx, to, from, int(batchCount)) if err != nil { logger.Error("failed Create Txs", "error", err) return err } - _, err = SendManyTransactions(ctx, lowtx) + _, err = SendManyTransactions(ctx, tx) if err != nil { logger.Error("failed SendManyTransactions(higherThanBaseFeeTxs)", "error", err) return err } - _, err = SendManyTransactions(ctx, hightx) - - if err != nil { - logger.Error("failed SendManyTransactions(lowerThanBaseFeeTxs)", "error", err) - return err - } - select { case <-ctx.Done(): return nil @@ -249,6 +242,33 @@ func CreateManyEIP1559TransactionsRefWithBaseFee2(ctx context.Context, to, from return lowerBaseFeeTransactions, higherBaseFeeTransactions, nil } +func CreateManyEIP1559TransactionsHigherThanBaseFee(ctx context.Context, to, from string, count int) ([]types.Transaction, error) { + toAddress := libcommon.HexToAddress(to) + fromAddress := libcommon.HexToAddress(from) + + baseFeePerGas, err := blocks.BaseFeeFromBlock(ctx) + + if err != nil { + return nil, fmt.Errorf("failed BaseFeeFromBlock: %v", err) + } + + baseFeePerGas = baseFeePerGas * 2 + + devnet.Logger(ctx).Info("BaseFeePerGas2", "val", baseFeePerGas) + + node := devnet.SelectNode(ctx) + + res, err := node.GetTransactionCount(fromAddress, rpc.PendingBlock) + + if err != nil { + return nil, fmt.Errorf("failed to get transaction count for address 0x%x: %v", fromAddress, err) + } + + nonce := res.Uint64() + + return signEIP1559TxsHigherThanBaseFee(ctx, count, baseFeePerGas, &nonce, toAddress, fromAddress) +} + // createNonContractTx returns a signed transaction and the recipient address func CreateTransaction(node devnet.Node, to, from string, value uint64) (types.Transaction, libcommon.Address, error) { toAccount := accounts.GetAccount(to) @@ -344,7 +364,7 @@ func signEIP1559TxsLowerThanBaseFee(ctx context.Context, n int, baseFeePerGas ui transaction := types.NewEIP1559Transaction(chainId, *nonce, toAddress, uint256.NewInt(value), uint64(210_000), uint256.NewInt(gasPrice), new(uint256.Int), uint256.NewInt(gasFeeCap), nil) - devnet.Logger(ctx).Info("LOWER", "transaction", i, "nonce", transaction.Nonce, "value", transaction.Value, "feecap", transaction.FeeCap) + devnet.Logger(ctx).Trace("LOWER", "transaction", i, "nonce", transaction.Nonce, "value", transaction.Value, "feecap", transaction.FeeCap) signedTransaction, err := types.SignTx(transaction, signer, accounts.SigKey(fromAddress)) @@ -385,7 +405,7 @@ func signEIP1559TxsHigherThanBaseFee(ctx context.Context, n int, baseFeePerGas u transaction := types.NewEIP1559Transaction(chainId, *nonce, toAddress, uint256.NewInt(value), uint64(210_000), uint256.NewInt(gasPrice), new(uint256.Int), uint256.NewInt(gasFeeCap), nil) - devnet.Logger(ctx).Info("HIGHER", "transaction", i, "nonce", transaction.Nonce, "value", transaction.Value, "feecap", transaction.FeeCap) + devnet.Logger(ctx).Trace("HIGHER", "transaction", i, "nonce", transaction.Nonce, "value", transaction.Value, "feecap", transaction.FeeCap) signerKey := accounts.SigKey(fromAddress) if signerKey == nil { @@ -407,7 +427,7 @@ func signEIP1559TxsHigherThanBaseFee(ctx context.Context, n int, baseFeePerGas u func SendManyTransactions(ctx context.Context, signedTransactions []types.Transaction) ([]libcommon.Hash, error) { logger := devnet.Logger(ctx) - logger.Info("Sending multiple transactions to the txpool...") + logger.Info(fmt.Sprintf("Sending %d transactions to the txpool...", len(signedTransactions))) hashes := make([]libcommon.Hash, len(signedTransactions)) for idx, tx := range signedTransactions { diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 34f8f438c..5a2f37db8 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -208,7 +208,7 @@ func subscribeToStateChangesLoop(ctx context.Context, client StateChangesClient, time.Sleep(3 * time.Second) continue } - log.Warn("[txpool.handleStateChanges]", "err", err) + log.Warn("[rpcdaemon subscribeToStateChanges]", "err", err) } } }() diff --git a/cmd/txpool/main.go b/cmd/txpool/main.go index d915a18b3..463be539b 100644 --- a/cmd/txpool/main.go +++ b/cmd/txpool/main.go @@ -170,12 +170,6 @@ func doTxpool(ctx context.Context, logger log.Logger) error { fetch.ConnectCore() fetch.ConnectSentries() - /* - var ethashApi *ethash.API - sif casted, ok := backend.engine.(*ethash.Ethash); ok { - ethashApi = casted.APIs(nil)[1].Service.(*ethash.API) - } - */ miningGrpcServer := privateapi.NewMiningServer(ctx, &rpcdaemontest.IsMiningMock{}, nil, logger) grpcServer, err := txpool.StartGrpc(txpoolGrpcServer, miningGrpcServer, txpoolApiAddr, nil, logger) diff --git a/consensus/chain_reader.go b/consensus/chain_reader.go index f79de40c4..c7d81953b 100644 --- a/consensus/chain_reader.go +++ b/consensus/chain_reader.go @@ -82,7 +82,7 @@ func (cr ChainReaderImpl) FrozenBlocks() uint64 { func (cr ChainReaderImpl) BorSpan(spanId uint64) []byte { spanBytes, err := cr.BlockReader.Span(context.Background(), cr.Db, spanId) if err != nil { - log.Error("BorSpan failed", "err", err) + log.Error("[consensus] BorSpan failed", "err", err) } return spanBytes } diff --git a/erigon-lib/downloader/path.go b/erigon-lib/downloader/path.go index 06ba51865..195c6d05c 100644 --- a/erigon-lib/downloader/path.go +++ b/erigon-lib/downloader/path.go @@ -171,7 +171,7 @@ func Clean(path string) string { return FromSlash(out.string()) } -func unixIsLocal(path string) bool { +func unixIsLocal(path string) bool { //nolint if IsAbs(path) || path == "" { return false } diff --git a/erigon-lib/downloader/path_windows.go b/erigon-lib/downloader/path_windows.go index f5f4a01d9..687e81429 100644 --- a/erigon-lib/downloader/path_windows.go +++ b/erigon-lib/downloader/path_windows.go @@ -175,51 +175,6 @@ func HasPrefix(p, prefix string) bool { return strings.HasPrefix(strings.ToLower(p), strings.ToLower(prefix)) } -func splitList(path string) []string { - // The same implementation is used in LookPath in os/exec; - // consider changing os/exec when changing this. - - if path == "" { - return []string{} - } - - // Split path, respecting but preserving quotes. - list := []string{} - start := 0 - quo := false - for i := 0; i < len(path); i++ { - switch c := path[i]; { - case c == '"': - quo = !quo - case c == ListSeparator && !quo: - list = append(list, path[start:i]) - start = i + 1 - } - } - list = append(list, path[start:]) - - // Remove quotes. - for i, s := range list { - list[i] = strings.ReplaceAll(s, `"`, ``) - } - - return list -} - -func abs(path string) (string, error) { - if path == "" { - // syscall.FullPath returns an error on empty path, because it's not a valid path. - // To implement Abs behavior of returning working directory on empty string input, - // special-case empty path by changing it to "." path. See golang.org/issue/24441. - path = "." - } - fullPath, err := syscall.FullPath(path) - if err != nil { - return "", err - } - return Clean(fullPath), nil -} - func join(elem []string) string { var b strings.Builder var lastChar byte @@ -260,47 +215,3 @@ func join(elem []string) string { } return Clean(b.String()) } - -// joinNonEmpty is like join, but it assumes that the first element is non-empty. -func joinNonEmpty(elem []string) string { - if len(elem[0]) == 2 && elem[0][1] == ':' { - // First element is drive letter without terminating slash. - // Keep path relative to current directory on that drive. - // Skip empty elements. - i := 1 - for ; i < len(elem); i++ { - if elem[i] != "" { - break - } - } - return Clean(elem[0] + strings.Join(elem[i:], string(Separator))) - } - // The following logic prevents Join from inadvertently creating a - // UNC path on Windows. Unless the first element is a UNC path, Join - // shouldn't create a UNC path. See golang.org/issue/9167. - p := Clean(strings.Join(elem, string(Separator))) - if !isUNC(p) { - return p - } - // p == UNC only allowed when the first element is a UNC path. - head := Clean(elem[0]) - if isUNC(head) { - return p - } - // head + tail == UNC, but joining two non-UNC paths should not result - // in a UNC path. Undo creation of UNC path. - tail := Clean(strings.Join(elem[1:], string(Separator))) - if head[len(head)-1] == Separator { - return head + tail - } - return head + string(Separator) + tail -} - -// isUNC reports whether path is a UNC path. -func isUNC(path string) bool { - return len(path) > 1 && isSlash(path[0]) && isSlash(path[1]) -} - -func sameWord(a, b string) bool { - return strings.EqualFold(a, b) -} diff --git a/erigon-lib/mmap/mmap_windows.go b/erigon-lib/mmap/mmap_windows.go index b343ebb40..0ce85db9e 100644 --- a/erigon-lib/mmap/mmap_windows.go +++ b/erigon-lib/mmap/mmap_windows.go @@ -41,7 +41,7 @@ func Mmap(f *os.File, size int) ([]byte, *[MaxMapSize]byte, error) { } // Close mapping handle. - if err := windows.CloseHandle(windows.Handle(h)); err != nil { + if err := windows.CloseHandle(h); err != nil { return nil, nil, os.NewSyscallError("CloseHandle", err) } diff --git a/erigon-lib/txpool/pool.go b/erigon-lib/txpool/pool.go index 69dfb3932..45dc6f115 100644 --- a/erigon-lib/txpool/pool.go +++ b/erigon-lib/txpool/pool.go @@ -126,7 +126,6 @@ type metaTx struct { timestamp uint64 // when it was added to pool subPool SubPoolMarker currentSubPool SubPoolType - alreadyYielded bool minedBlockNum uint64 } @@ -211,6 +210,7 @@ type TxPool struct { cfg txpoolcfg.Config chainID uint256.Int lastSeenBlock atomic.Uint64 + lastSeenCond *sync.Cond lastFinalizedBlock atomic.Uint64 started atomic.Bool pendingBaseFee atomic.Uint64 @@ -249,8 +249,11 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, tracedSenders[common.BytesToAddress([]byte(sender))] = struct{}{} } + lock := &sync.Mutex{} + res := &TxPool{ - lock: &sync.Mutex{}, + lock: lock, + lastSeenCond: sync.NewCond(lock), byHash: map[string]*metaTx{}, isLocalLRU: localsHistory, discardReasonsLRU: discardHistory, @@ -298,42 +301,81 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, return res, nil } -func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error { - if err := minedTxs.Valid(); err != nil { - return err +func (p *TxPool) Start(ctx context.Context, db kv.RwDB) error { + if p.started.Load() { + return nil } + return db.View(ctx, func(tx kv.Tx) error { + coreDb, _ := p.coreDBWithCache() + coreTx, err := coreDb.BeginRo(ctx) + + if err != nil { + return err + } + + defer coreTx.Rollback() + + if err := p.fromDB(ctx, tx, coreTx); err != nil { + return fmt.Errorf("loading pool from DB: %w", err) + } + + if p.started.CompareAndSwap(false, true) { + p.logger.Info("[txpool] Started") + } + + return nil + }) +} + +func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error { + defer newBlockTimer.ObserveDuration(time.Now()) //t := time.Now() coreDB, cache := p.coreDBWithCache() cache.OnNewBlock(stateChanges) coreTx, err := coreDB.BeginRo(ctx) + if err != nil { return err } + defer coreTx.Rollback() - p.lastSeenBlock.Store(stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].BlockHeight) - if !p.started.Load() { - if err := p.fromDBWithLock(ctx, tx, coreTx); err != nil { - return fmt.Errorf("OnNewBlock: loading txs from DB: %w", err) - } + block := stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].BlockHeight + baseFee := stateChanges.PendingBlockBaseFee + available := len(p.pending.best.ms) + + defer func() { + p.logger.Debug("[txpool] New block", "block", block, "unwound", len(unwindTxs.Txs), "mined", len(minedTxs.Txs), "baseFee", baseFee, "pending-pre", available, "pending", p.pending.Len(), "baseFee", p.baseFee.Len(), "queued", p.queued.Len(), "err", err) + }() + + if err = minedTxs.Valid(); err != nil { + return err } + cacheView, err := cache.View(ctx, coreTx) + if err != nil { return err } p.lock.Lock() - defer p.lock.Unlock() + defer func() { + if err == nil { + p.lastSeenBlock.Store(block) + p.lastSeenCond.Broadcast() + } + + p.lock.Unlock() + }() if assert.Enable { if _, err := kvcache.AssertCheckValues(ctx, coreTx, cache); err != nil { p.logger.Error("AssertCheckValues", "err", err, "stack", stack.Trace().String()) } } - baseFee := stateChanges.PendingBlockBaseFee pendingBaseFee, baseFeeChanged := p.setBaseFee(baseFee) // Update pendingBase for all pool queues and slices @@ -350,10 +392,13 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang p.setBlobFee(pendingBlobFee) p.blockGasLimit.Store(stateChanges.BlockGasLimit) - if err := p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs, p.logger); err != nil { + + if err = p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs, p.logger); err != nil { return err } + _, unwindTxs, err = p.validateTxs(&unwindTxs, cacheView) + if err != nil { return err } @@ -371,21 +416,23 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang } } - if err := p.processMinedFinalizedBlobs(coreTx, minedTxs.Txs, stateChanges.FinalizedBlock); err != nil { - return err - } - if err := removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil { + if err = p.processMinedFinalizedBlobs(coreTx, minedTxs.Txs, stateChanges.FinalizedBlock); err != nil { return err } - //p.logger.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "baseFee", baseFee, "blockHeight", blockHeight) + if err = removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil { + return err + } + + var announcements types.Announcements + + announcements, err = addTxsOnNewBlock(block, cacheView, stateChanges, p.senders, unwindTxs, /* newTxs */ + pendingBaseFee, stateChanges.BlockGasLimit, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, p.logger) - announcements, err := addTxsOnNewBlock(p.lastSeenBlock.Load(), cacheView, stateChanges, p.senders, unwindTxs, /* newTxs */ - pendingBaseFee, stateChanges.BlockGasLimit, - p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, p.logger) if err != nil { return err } + p.pending.EnforceWorstInvariants() p.baseFee.EnforceInvariants() p.queued.EnforceInvariants() @@ -394,10 +441,6 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang p.promoted.Reset() p.promoted.AppendOther(announcements) - if p.started.CompareAndSwap(false, true) { - p.logger.Info("[txpool] Started") - } - if p.promoted.Len() > 0 { select { case p.newPendingTxs <- p.promoted.Copy(): @@ -405,12 +448,11 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang } } - //p.logger.Info("[txpool] new block", "number", p.lastSeenBlock.Load(), "pendngBaseFee", pendingBaseFee, "in", time.Since(t)) return nil } func (p *TxPool) processRemoteTxs(ctx context.Context) error { - if !p.started.Load() { + if !p.Started() { return fmt.Errorf("txpool not started yet") } @@ -610,20 +652,29 @@ func (p *TxPool) IsLocal(idHash []byte) bool { func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) } func (p *TxPool) Started() bool { return p.started.Load() } -func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) { - // First wait for the corresponding block to arrive - if p.lastSeenBlock.Load() < onTopOf { - return false, 0, nil // Too early +func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, yielded mapset.Set[[32]byte]) (bool, int, error) { + p.lock.Lock() + defer p.lock.Unlock() + + for last := p.lastSeenBlock.Load(); last < onTopOf; last = p.lastSeenBlock.Load() { + p.logger.Debug("[txpool] Waiting for block", "expecting", onTopOf, "lastSeen", last, "txRequested", n, "pending", p.pending.Len(), "baseFee", p.baseFee.Len(), "queued", p.queued.Len()) + p.lastSeenCond.Wait() } - isShanghai := p.isShanghai() || p.isAgra() best := p.pending.best + isShanghai := p.isShanghai() || p.isAgra() + txs.Resize(uint(cmp.Min(int(n), len(best.ms)))) var toRemove []*metaTx count := 0 + i := 0 - for i := 0; count < int(n) && i < len(best.ms); i++ { + defer func() { + p.logger.Debug("[txpool] Processing best request", "last", onTopOf, "txRequested", n, "txAvailable", len(best.ms), "txProcessed", i, "txReturned", count) + }() + + for ; count < int(n) && i < len(best.ms); i++ { // if we wouldn't have enough gas for a standard transaction then quit out early if availableGas < fixedgas.TxGas { break @@ -631,7 +682,7 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG mt := best.ms[i] - if toSkip.Contains(mt.Tx.IDHash) { + if yielded.Contains(mt.Tx.IDHash) { continue } @@ -669,7 +720,7 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG txs.Txs[count] = rlpTx copy(txs.Senders.At(count), sender.Bytes()) txs.IsLocal[count] = isLocal - toSkip.Add(mt.Tx.IDHash) // TODO: Is this unnecessary + yielded.Add(mt.Tx.IDHash) count++ } @@ -682,26 +733,13 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG return true, count, nil } -func (p *TxPool) ResetYieldedStatus() { - p.lock.Lock() - defer p.lock.Unlock() - best := p.pending.best - for i := 0; i < len(best.ms); i++ { - best.ms[i].alreadyYielded = false - } -} - func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) { - p.lock.Lock() - defer p.lock.Unlock() return p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, toSkip) } func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64) (bool, error) { set := mapset.NewThreadUnsafeSet[[32]byte]() - p.lock.Lock() - defer p.lock.Unlock() - onTime, _, err := p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, set) + onTime, _, err := p.YieldBest(n, txs, tx, onTopOf, availableGas, availableBlobGas, set) return onTime, err } @@ -1075,15 +1113,6 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, p.lock.Lock() defer p.lock.Unlock() - if !p.Started() { - if err := p.fromDB(ctx, tx, coreTx); err != nil { - return nil, fmt.Errorf("AddLocalTxs: loading txs from DB: %w", err) - } - if p.started.CompareAndSwap(false, true) { - p.logger.Info("[txpool] Started") - } - } - if err = p.senders.registerNewSenders(&newTransactions, p.logger); err != nil { return nil, err } @@ -1432,27 +1461,38 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P } var toDel []*metaTx // can't delete items while iterate them + + discarded := 0 + pendingRemoved := 0 + baseFeeRemoved := 0 + queuedRemoved := 0 + for senderID, nonce := range noncesToRemove { - //if sender.all.Len() > 0 { - //logger.Debug("[txpool] removing mined", "senderID", tx.senderID, "sender.all.len()", sender.all.Len()) - //} - // delete mined transactions from everywhere + byNonce.ascend(senderID, func(mt *metaTx) bool { - //logger.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", it.metaTx.Tx.nonce, "sender.nonce", sender.nonce) if mt.Tx.Nonce > nonce { + if mt.Tx.Traced { + logger.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", mt.Tx.Nonce, "sender.nonce", nonce) + } + return false } + if mt.Tx.Traced { logger.Info(fmt.Sprintf("TX TRACING: removeMined idHash=%x senderId=%d, currentSubPool=%s", mt.Tx.IDHash, mt.Tx.SenderID, mt.currentSubPool)) } + toDel = append(toDel, mt) // del from sub-pool switch mt.currentSubPool { case PendingSubPool: + pendingRemoved++ pending.Remove(mt) case BaseFeeSubPool: + baseFeeRemoved++ baseFee.Remove(mt) case QueuedSubPool: + queuedRemoved++ queued.Remove(mt) default: //already removed @@ -1460,11 +1500,18 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P return true }) + discarded += len(toDel) + for _, mt := range toDel { discard(mt, txpoolcfg.Mined) } toDel = toDel[:0] } + + if discarded > 0 { + logger.Debug("Discarding Transactions", "count", discarded, "pending", pendingRemoved, "baseFee", baseFeeRemoved, "queued", queuedRemoved) + } + return nil } @@ -1657,6 +1704,13 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs logEvery := time.NewTicker(p.cfg.LogEvery) defer logEvery.Stop() + err := p.Start(ctx, db) + + if err != nil { + p.logger.Error("[txpool] Failed to start", "err", err) + return + } + for { select { case <-ctx.Done(): @@ -1724,7 +1778,7 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs var remoteTxSizes []uint32 var remoteTxHashes types.Hashes var remoteTxRlps [][]byte - var broadCastedHashes types.Hashes + var broadcastHashes types.Hashes slotsRlp := make([][]byte, 0, announcements.Len()) if err := db.View(ctx, func(tx kv.Tx) error { @@ -1748,7 +1802,7 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs // "Nodes MUST NOT automatically broadcast blob transactions to their peers" - EIP-4844 if t != types.BlobTxType { localTxRlps = append(localTxRlps, slotRlp) - broadCastedHashes = append(broadCastedHashes, hash...) + broadcastHashes = append(broadcastHashes, hash...) } } else { remoteTxTypes = append(remoteTxTypes, t) @@ -1775,12 +1829,12 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs const localTxsBroadcastMaxPeers uint64 = 10 txSentTo := send.BroadcastPooledTxs(localTxRlps, localTxsBroadcastMaxPeers) for i, peer := range txSentTo { - p.logger.Info("Local tx broadcasted", "txHash", hex.EncodeToString(broadCastedHashes.At(i)), "to peer", peer) + p.logger.Trace("Local tx broadcast", "txHash", hex.EncodeToString(broadcastHashes.At(i)), "to peer", peer) } hashSentTo := send.AnnouncePooledTxs(localTxTypes, localTxSizes, localTxHashes, localTxsBroadcastMaxPeers*2) for i := 0; i < localTxHashes.Len(); i++ { hash := localTxHashes.At(i) - p.logger.Info("Local tx announced", "txHash", hex.EncodeToString(hash), "to peer", hashSentTo[i], "baseFee", p.pendingBaseFee.Load()) + p.logger.Trace("Local tx announced", "txHash", hex.EncodeToString(hash), "to peer", hashSentTo[i], "baseFee", p.pendingBaseFee.Load()) } // broadcast remote transactions @@ -1844,6 +1898,7 @@ func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err err } return written, nil } + func (p *TxPool) flushLocked(tx kv.RwTx) (err error) { for i, mt := range p.deletedTxs { id := mt.Tx.SenderID @@ -1927,20 +1982,33 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) { return nil } -func (p *TxPool) fromDBWithLock(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { - p.lock.Lock() - defer p.lock.Unlock() - return p.fromDB(ctx, tx, coreTx) -} func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { if p.lastSeenBlock.Load() == 0 { lastSeenBlock, err := LastSeenBlock(tx) if err != nil { return err } + p.lastSeenBlock.Store(lastSeenBlock) } + // this is neccessary as otherwise best - which waits for sync events + // may wait for ever if blocks have been process before the txpool + // starts with an empty db + lastSeenProgress, err := getExecutionProgress(coreTx) + + if err != nil { + return err + } + + if p.lastSeenBlock.Load() < lastSeenProgress { + // TODO we need to process the blocks since the + // last seen to make sure that the tx pool is in + // sync with the processed blocks + + p.lastSeenBlock.Store(lastSeenProgress) + } + cacheView, err := p._stateCache.View(ctx, coreTx) if err != nil { return err @@ -2031,6 +2099,24 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { p.pendingBlobFee.Store(pendingBlobFee) return nil } + +func getExecutionProgress(db kv.Getter) (uint64, error) { + data, err := db.GetOne(kv.SyncStageProgress, []byte("Execution")) + if err != nil { + return 0, err + } + + if len(data) == 0 { + return 0, nil + } + + if len(data) < 8 { + return 0, fmt.Errorf("value must be at least 8 bytes, got %d", len(data)) + } + + return binary.BigEndian.Uint64(data[:8]), nil +} + func LastSeenBlock(tx kv.Getter) (uint64, error) { v, err := tx.GetOne(kv.PoolInfo, PoolLastSeenBlockKey) if err != nil { @@ -2093,7 +2179,7 @@ func (p *TxPool) printDebug(prefix string) { } } func (p *TxPool) logStats() { - if !p.started.Load() { + if !p.Started() { //p.logger.Info("[txpool] Not started yet, waiting for new blocks...") return } diff --git a/erigon-lib/txpool/pool_fuzz_test.go b/erigon-lib/txpool/pool_fuzz_test.go index e81d31691..1e8923d88 100644 --- a/erigon-lib/txpool/pool_fuzz_test.go +++ b/erigon-lib/txpool/pool_fuzz_test.go @@ -316,6 +316,10 @@ func FuzzOnNewBlocks(f *testing.F) { sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New()) assert.NoError(err) + + err = pool.Start(ctx, db) + assert.NoError(err) + pool.senders.senderIDs = senderIDs for addr, id := range senderIDs { pool.senders.senderID2Addr[id] = addr @@ -538,6 +542,7 @@ func FuzzOnNewBlocks(f *testing.F) { p2, err := New(ch, coreDB, txpoolcfg.DefaultConfig, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New()) assert.NoError(err) + p2.senders = pool.senders // senders are not persisted err = coreDB.View(ctx, func(coreTx kv.Tx) error { return p2.fromDB(ctx, tx, coreTx) }) require.NoError(err) diff --git a/erigon-lib/txpool/pool_test.go b/erigon-lib/txpool/pool_test.go index a438be399..ef4347a58 100644 --- a/erigon-lib/txpool/pool_test.go +++ b/erigon-lib/txpool/pool_test.go @@ -19,6 +19,7 @@ package txpool import ( "bytes" "context" + // "crypto/rand" "fmt" "math" @@ -957,6 +958,10 @@ func TestDropRemoteAtNoGossip(t *testing.T) { require.True(txPool != nil) ctx := context.Background() + + err = txPool.Start(ctx, db) + assert.NoError(err) + var stateVersionID uint64 = 0 pendingBaseFee := uint64(1_000_000) // start blocks from 0, set empty hash - then kvcache will also work on this diff --git a/eth/backend.go b/eth/backend.go index bbec2e31c..105092b37 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -29,6 +29,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" lru "github.com/hashicorp/golang-lru/arc/v2" @@ -616,7 +617,6 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger miner := stagedsync.NewMiningState(&config.Miner) backend.pendingBlocks = miner.PendingResultCh - backend.minedBlocks = miner.MiningResultCh var ( snapDb kv.RwDB @@ -780,7 +780,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger } }() - if err := backend.StartMining(context.Background(), backend.chainDB, mining, backend.config.Miner, backend.sentriesClient.Hd.QuitPoWMining, tmpdir, logger); err != nil { + if err := backend.StartMining(context.Background(), backend.chainDB, stateDiffClient, mining, miner, backend.gasPrice, backend.sentriesClient.Hd.QuitPoWMining, tmpdir, logger); err != nil { return nil, err } @@ -1010,7 +1010,8 @@ func (s *Ethereum) shouldPreserve(block *types.Block) bool { //nolint // StartMining starts the miner with the given number of CPU threads. If mining // is already running, this method adjust the number of threads allowed to use // and updates the minimum price required by the transaction pool. -func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsync.Sync, cfg params.MiningConfig, quitCh chan struct{}, tmpDir string, logger log.Logger) error { +func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient *direct.StateDiffClientDirect, mining *stagedsync.Sync, miner stagedsync.MiningState, gasPrice *uint256.Int, quitCh chan struct{}, tmpDir string, logger log.Logger) error { + var borcfg *bor.Bor if b, ok := s.engine.(*bor.Bor); ok { borcfg = b @@ -1023,7 +1024,7 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy } //if borcfg == nil { - if !cfg.Enabled { + if !miner.MiningConfig.Enabled { return nil } //} @@ -1036,14 +1037,14 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy } if borcfg != nil { - if cfg.Enabled { - if cfg.SigKey == nil { + if miner.MiningConfig.Enabled { + if miner.MiningConfig.SigKey == nil { s.logger.Error("Etherbase account unavailable locally", "err", err) return fmt.Errorf("signer missing: %w", err) } borcfg.Authorize(eb, func(_ libcommon.Address, mimeType string, message []byte) ([]byte, error) { - return crypto.Sign(crypto.Keccak256(message), cfg.SigKey) + return crypto.Sign(crypto.Keccak256(message), miner.MiningConfig.SigKey) }) if !s.config.WithoutHeimdall { @@ -1082,47 +1083,73 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy } } if clq != nil { - if cfg.SigKey == nil { + if miner.MiningConfig.SigKey == nil { s.logger.Error("Etherbase account unavailable locally", "err", err) return fmt.Errorf("signer missing: %w", err) } clq.Authorize(eb, func(_ libcommon.Address, mimeType string, message []byte) ([]byte, error) { - return crypto.Sign(crypto.Keccak256(message), cfg.SigKey) + return crypto.Sign(crypto.Keccak256(message), miner.MiningConfig.SigKey) }) } + streamCtx, streamCancel := context.WithCancel(ctx) + stream, err := stateDiffClient.StateChanges(streamCtx, &remote.StateChangeRequest{WithStorage: false, WithTransactions: true}, grpc.WaitForReady(true)) + + if err != nil { + streamCancel() + return err + } + + stateChangeCh := make(chan *remote.StateChange) + + go func() { + for req, err := stream.Recv(); ; req, err = stream.Recv() { + if err == nil { + for _, change := range req.ChangeBatch { + stateChangeCh <- change + } + } + } + }() + go func() { defer debug.LogPanic() defer close(s.waitForMiningStop) + defer streamCancel() - mineEvery := time.NewTicker(cfg.Recommit) + mineEvery := time.NewTicker(miner.MiningConfig.Recommit) defer mineEvery.Stop() - // Listen on a new head subscription. This allows us to maintain the block time by - // triggering mining after the block is passed through all stages. - newHeadCh, closeNewHeadCh := s.notifications.Events.AddHeaderSubscription() - defer closeNewHeadCh() - s.logger.Info("Starting to mine", "etherbase", eb) - var works bool + var working bool + var waiting atomic.Bool + hasWork := true // Start mining immediately errc := make(chan error, 1) + workCtx, workCancel := context.WithCancel(ctx) + defer workCancel() + for { // Only reset if some work was done previously as we'd like to rely // on the `miner.recommit` as backup. if hasWork { - mineEvery.Reset(cfg.Recommit) + mineEvery.Reset(miner.MiningConfig.Recommit) } - // Only check for case if you're already mining (i.e. works = true) and + // Only check for case if you're already mining (i.e. working = true) and // waiting for error or you don't have any work yet (i.e. hasWork = false). - if works || !hasWork { + if working || !hasWork { select { - case <-newHeadCh: + case stateChanges := <-stateChangeCh: + block := stateChanges.BlockHeight + s.logger.Debug("Start mining new block based on previous block", "block", block) + // TODO - can do mining clean up here as we have previous + // block info in the state channel hasWork = true + case <-s.notifyMiningAboutNewTxs: // Skip mining based on new tx notif for bor consensus hasWork = s.chainConfig.Bor == nil @@ -1130,10 +1157,12 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy s.logger.Debug("Start mining new block based on txpool notif") } case <-mineEvery.C: - s.logger.Debug("Start mining new block based on miner.recommit") - hasWork = true + if !(working || waiting.Load()) { + s.logger.Debug("Start mining new block based on miner.recommit", "duration", miner.MiningConfig.Recommit) + } + hasWork = !(working || waiting.Load()) case err := <-errc: - works = false + working = false hasWork = false if errors.Is(err, libcommon.ErrStopped) { return @@ -1146,11 +1175,36 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy } } - if !works && hasWork { - works = true + if !working && hasWork { + working = true hasWork = false - mineEvery.Reset(cfg.Recommit) - go func() { errc <- stages2.MiningStep(ctx, db, mining, tmpDir, logger) }() + mineEvery.Reset(miner.MiningConfig.Recommit) + go func() { + err := stages2.MiningStep(ctx, db, mining, tmpDir, logger) + + waiting.Store(true) + defer waiting.Store(false) + + errc <- err + + if err != nil { + return + } + + for { + select { + case block := <-miner.MiningResultCh: + if block != nil { + s.logger.Debug("Mined block", "block", block.Number()) + s.minedBlocks <- block + } + return + case <-workCtx.Done(): + errc <- workCtx.Err() + return + } + } + }() } } }() diff --git a/eth/stagedsync/chain_reader.go b/eth/stagedsync/chain_reader.go index f1d0e5205..5c2d75c42 100644 --- a/eth/stagedsync/chain_reader.go +++ b/eth/stagedsync/chain_reader.go @@ -17,8 +17,7 @@ import ( // ChainReader implements consensus.ChainReader type ChainReader struct { - Cfg chain.Config - + Cfg chain.Config Db kv.Getter BlockReader services.FullBlockReader Logger log.Logger @@ -72,7 +71,7 @@ func (cr ChainReader) HasBlock(hash libcommon.Hash, number uint64) bool { func (cr ChainReader) GetTd(hash libcommon.Hash, number uint64) *big.Int { td, err := rawdb.ReadTd(cr.Db, hash, number) if err != nil { - log.Error("ReadTd failed", "err", err) + cr.Logger.Error("ReadTd failed", "err", err) return nil } return td diff --git a/eth/stagedsync/stage_bor_heimdall.go b/eth/stagedsync/stage_bor_heimdall.go index a6953b370..65cff4086 100644 --- a/eth/stagedsync/stage_bor_heimdall.go +++ b/eth/stagedsync/stage_bor_heimdall.go @@ -157,10 +157,13 @@ func BorHeimdallForward( } lastBlockNum := s.BlockNumber + if cfg.blockReader.FrozenBorBlocks() > lastBlockNum { lastBlockNum = cfg.blockReader.FrozenBorBlocks() } + recents, err := lru.NewARC[libcommon.Hash, *bor.Snapshot](inmemorySnapshots) + if err != nil { return err } @@ -168,6 +171,7 @@ func BorHeimdallForward( if err != nil { return err } + chain := NewChainReaderImpl(&cfg.chainConfig, tx, cfg.blockReader, logger) var blockNum uint64 diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 839d78424..6ac18f587 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -605,7 +605,7 @@ func (cr ChainReaderImpl) BorEventsByBlock(hash libcommon.Hash, number uint64) [ func (cr ChainReaderImpl) BorSpan(spanId uint64) []byte { span, err := cr.blockReader.Span(context.Background(), cr.tx, spanId) if err != nil { - cr.logger.Error("BorSpan failed", "err", err) + cr.logger.Error("[staged sync] BorSpan failed", "err", err) return nil } return span diff --git a/eth/stagedsync/stage_mining_bor_heimdall.go b/eth/stagedsync/stage_mining_bor_heimdall.go index 2dd96a98b..4a5d21665 100644 --- a/eth/stagedsync/stage_mining_bor_heimdall.go +++ b/eth/stagedsync/stage_mining_bor_heimdall.go @@ -77,7 +77,7 @@ func MiningBorHeimdallForward( } logger.Info( - "[%s] Finished processing", logPrefix, + fmt.Sprintf("[%s] Finished processing", logPrefix), "progress", headerNum, "lastSpanID", lastSpanID, "lastStateSyncEventID", lastStateSyncEventID, diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index abc94326d..7fae41332 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -41,8 +41,8 @@ type MiningExecCfg struct { tmpdir string interrupt *int32 payloadId uint64 - txPool2 TxPoolForMining - txPool2DB kv.RoDB + txPool TxPoolForMining + txPoolDB kv.RoDB } type TxPoolForMining interface { @@ -54,7 +54,7 @@ func StageMiningExecCfg( notifier ChainEventNotifier, chainConfig chain.Config, engine consensus.Engine, vmConfig *vm.Config, tmpdir string, interrupt *int32, payloadId uint64, - txPool2 TxPoolForMining, txPool2DB kv.RoDB, + txPool TxPoolForMining, txPoolDB kv.RoDB, blockReader services.FullBlockReader, ) MiningExecCfg { return MiningExecCfg{ @@ -68,8 +68,8 @@ func StageMiningExecCfg( tmpdir: tmpdir, interrupt: interrupt, payloadId: payloadId, - txPool2: txPool2, - txPool2DB: txPool2DB, + txPool: txPool, + txPoolDB: txPoolDB, } } @@ -150,7 +150,7 @@ func SpawnMiningExecStage(s *StageState, tx kv.RwTx, cfg MiningExecCfg, quit <-c } } - logger.Debug("SpawnMiningExecStage", "block txn", current.Txs.Len(), "payload", cfg.payloadId) + logger.Debug("SpawnMiningExecStage", "block", current.Header.Number, "txn", current.Txs.Len(), "payload", cfg.payloadId) if current.Uncles == nil { current.Uncles = []*types.Header{} } @@ -166,7 +166,8 @@ func SpawnMiningExecStage(s *StageState, tx kv.RwTx, cfg MiningExecCfg, quit <-c if err != nil { return err } - logger.Debug("FinalizeBlockExecution", "current txn", current.Txs.Len(), "current receipt", current.Receipts.Len(), "payload", cfg.payloadId) + + logger.Debug("FinalizeBlockExecution", "block", current.Header.Number, "txn", current.Txs.Len(), "gas", current.Header.GasUsed, "receipt", current.Receipts.Len(), "payload", cfg.payloadId) // hack: pretend that we are real execution stage - next stages will rely on this progress if err := stages.SaveStageProgress(tx, stages.Execution, current.Header.Number.Uint64()); err != nil { @@ -186,23 +187,20 @@ func getNextTransactions( logger log.Logger, ) (types.TransactionsStream, int, error) { txSlots := types2.TxsRlp{} - var onTime bool count := 0 - if err := cfg.txPool2DB.View(context.Background(), func(poolTx kv.Tx) error { + if err := cfg.txPoolDB.View(context.Background(), func(poolTx kv.Tx) error { var err error - counter := 0 - for !onTime && counter < 500 { - remainingGas := header.GasLimit - header.GasUsed - remainingBlobGas := uint64(0) - if header.BlobGasUsed != nil { - remainingBlobGas = cfg.chainConfig.GetMaxBlobGasPerBlock() - *header.BlobGasUsed - } - if onTime, count, err = cfg.txPool2.YieldBest(amount, &txSlots, poolTx, executionAt, remainingGas, remainingBlobGas, alreadyYielded); err != nil { - return err - } - time.Sleep(1 * time.Millisecond) - counter++ + + remainingGas := header.GasLimit - header.GasUsed + remainingBlobGas := uint64(0) + if header.BlobGasUsed != nil { + remainingBlobGas = cfg.chainConfig.GetMaxBlobGasPerBlock() - *header.BlobGasUsed } + + if _, count, err = cfg.txPool.YieldBest(amount, &txSlots, poolTx, executionAt, remainingGas, remainingBlobGas, alreadyYielded); err != nil { + return err + } + return nil }); err != nil { return nil, 0, err @@ -375,7 +373,6 @@ func addTransactionsToMiningBlock(logPrefix string, current *MiningBlock, chainC gasSnap := gasPool.Gas() blobGasSnap := gasPool.BlobGas() snap := ibs.Snapshot() - logger.Debug("addTransactionsToMiningBlock", "txn hash", txn.Hash()) receipt, _, err := core.ApplyTransaction(&chainConfig, core.GetHashFn(header, getHeader), engine, &coinbase, gasPool, ibs, noop, header, txn, &header.GasUsed, header.BlobGasUsed, *vmConfig) if err != nil { ibs.RevertToSnapshot(snap) @@ -464,7 +461,7 @@ LOOP: txs.Pop() } else if err == nil { // Everything ok, collect the logs and shift in the next transaction from the same account - logger.Debug(fmt.Sprintf("[%s] addTransactionsToMiningBlock Successful", logPrefix), "sender", from, "nonce", txn.GetNonce(), "payload", payloadId) + logger.Trace(fmt.Sprintf("[%s] Added transaction", logPrefix), "hash", txn.Hash(), "sender", from, "nonce", txn.GetNonce(), "payload", payloadId) coalescedLogs = append(coalescedLogs, logs...) tcount++ txs.Shift() diff --git a/eth/stagedsync/stage_mining_finish.go b/eth/stagedsync/stage_mining_finish.go index 81cc486e5..d3d36dfba 100644 --- a/eth/stagedsync/stage_mining_finish.go +++ b/eth/stagedsync/stage_mining_finish.go @@ -82,10 +82,10 @@ func SpawnMiningFinishStage(s *StageState, tx kv.RwTx, cfg MiningFinishCfg, quit if block.Transactions().Len() > 0 { logger.Info(fmt.Sprintf("[%s] block ready for seal", logPrefix), - "block_num", block.NumberU64(), + "block", block.NumberU64(), "transactions", block.Transactions().Len(), - "gas_used", block.GasUsed(), - "gas_limit", block.GasLimit(), + "gasUsed", block.GasUsed(), + "gasLimit", block.GasLimit(), "difficulty", block.Difficulty(), ) } diff --git a/polygon/bor/bor.go b/polygon/bor/bor.go index e3c31bdee..2bdf64a04 100644 --- a/polygon/bor/bor.go +++ b/polygon/bor/bor.go @@ -1112,11 +1112,13 @@ func (c *Bor) Seal(chain consensus.ChainHeaderReader, block *types.Block, result select { case <-stop: c.logger.Info("[bor] Stopped sealing operation for block", "number", number) + results <- nil return case <-time.After(delay): if c.headerProgress != nil && c.headerProgress.Progress() >= number { c.logger.Info("Discarding sealing operation for block", "number", number) + results <- nil return } diff --git a/turbo/cli/flags.go b/turbo/cli/flags.go index 75261f14d..88ab5ca94 100644 --- a/turbo/cli/flags.go +++ b/turbo/cli/flags.go @@ -478,7 +478,7 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg } if c.Enabled { - logger.Info("starting HTTP APIs", "APIs", apis) + logger.Info("starting HTTP APIs", "port", c.HttpPort, "APIs", apis) } if ctx.IsSet(utils.HttpCompressionFlag.Name) { diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index 848370531..a24367e83 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -1190,7 +1190,7 @@ func (r *BlockReader) Span(ctx context.Context, tx kv.Getter, spanId uint64) ([] return nil, err } if v == nil { - return nil, fmt.Errorf("span %d not found (db), frosenBlocks=%d", spanId, maxBlockNumInFiles) + return nil, fmt.Errorf("span %d not found (db), frozenBlocks=%d", spanId, maxBlockNumInFiles) } return common.Copy(v), nil } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 951dc4b7b..95db1e1e8 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -329,6 +329,7 @@ func (h *Hook) afterRun(tx kv.Tx, finishProgressBefore uint64) error { pendingBlobFee = f.Uint64() } + h.logger.Debug("[hook] Sending state changes", "currentBlock", currentHeader.Number.Uint64(), "finalizedBlock", finalizedBlock) notifications.Accumulator.SendAndReset(h.ctx, notifications.StateChangesConsumer, pendingBaseFee.Uint64(), pendingBlobFee, currentHeader.GasLimit, finalizedBlock) } // -- send notifications END