Fixes for Bor Block Production Synchronization (#9162)

This PR contains 3 fixes for interaction between the Bor mining loop and
the TX pool which where causing the regular creation of blocks with zero
transactions.

* Mining/Tx pool block synchronization
The synchronization of the tx pool between the sync loop and the mining
loop has been changed so that both are triggered by the same event and
synchronized via a sync.Cond rather than a polling loop with a hard
coded loop limit. This means that mining now waits for the pool to be
updated from the previous block before it starts the mining process.
* Txpool Startup consolidated into its MainLoop
Previously the tx pool start process was dynamically triggered at
various points in the code. This has all now been moved to the start of
the main loop. This is necessary to avoid a timing hole which can leave
the mining loop hanging waiting for a previously block broadcast which
it missed due to its delay start.
* Mining listens for block broadcast to avoid duplicate mining
operations
The mining loop for bor has a recommit timer in case blocks re not
produced on time. However in the case of sprint transitions where the
seal publication is delayed this can lead to duplicate block production.
This is suppressed by introducing a `waiting` state which is exited upon
the block being broadcast from the sealing operation.
This commit is contained in:
Mark Holt 2024-01-10 17:12:15 +00:00 committed by GitHub
parent 284aa1dd0c
commit b05ffc909d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 495 additions and 292 deletions

View File

@ -135,6 +135,7 @@ type BlockProducer struct {
NodeArgs
Mine bool `arg:"--mine" flag:"true"`
Etherbase string `arg:"--miner.etherbase"`
GasLimit int `arg:"--miner.gaslimit"`
DevPeriod int `arg:"--dev.period"`
BorPeriod int `arg:"--bor.period"`
BorMinBlockSize int `arg:"--bor.minblocksize"`

View File

@ -34,7 +34,7 @@ type Network struct {
Snapshots bool
Nodes []Node
Services []Service
Alloc types.GenesisAlloc
Genesis *types.Genesis
BorStateSyncDelay time.Duration
BorPeriod time.Duration
BorMinBlockSize int
@ -140,12 +140,16 @@ func (nw *Network) createNode(nodeArgs Node) (Node, error) {
}
if n.IsBlockProducer() {
if nw.Alloc == nil {
nw.Alloc = types.GenesisAlloc{
if nw.Genesis == nil {
nw.Genesis = &types.Genesis{}
}
if nw.Genesis.Alloc == nil {
nw.Genesis.Alloc = types.GenesisAlloc{
n.Account().Address: types.GenesisAccount{Balance: blockProducerFunds},
}
} else {
nw.Alloc[n.Account().Address] = types.GenesisAccount{Balance: blockProducerFunds}
nw.Genesis.Alloc[n.Account().Address] = types.GenesisAccount{Balance: blockProducerFunds}
}
}

View File

@ -167,8 +167,14 @@ func (n *devnetNode) run(ctx *cli.Context) error {
n.nodeCfg.MdbxGrowthStep = 32 * datasize.MB
n.nodeCfg.MdbxDBSizeLimit = 512 * datasize.MB
for addr, account := range n.network.Alloc {
n.ethCfg.Genesis.Alloc[addr] = account
if n.network.Genesis != nil {
for addr, account := range n.network.Genesis.Alloc {
n.ethCfg.Genesis.Alloc[addr] = account
}
if n.network.Genesis.GasLimit != 0 {
n.ethCfg.Genesis.GasLimit = n.network.Genesis.GasLimit
}
}
if n.network.BorStateSyncDelay > 0 {

View File

@ -6,6 +6,7 @@ import (
"os/signal"
"path/filepath"
dbg "runtime/debug"
"strconv"
"strings"
"syscall"
"time"
@ -131,6 +132,12 @@ var (
Value: 1,
}
GasLimitFlag = cli.Uint64Flag{
Name: "gaslimit",
Usage: "Target gas limit for mined blocks",
Value: 0,
}
WaitFlag = cli.BoolFlag{
Name: "wait",
Usage: "Wait until interrupted after all scenarios have run",
@ -173,6 +180,7 @@ func main() {
&logging.LogVerbosityFlag,
&logging.LogConsoleVerbosityFlag,
&logging.LogDirVerbosityFlag,
&GasLimitFlag,
}
if err := app.Run(os.Args); err != nil {
@ -342,21 +350,74 @@ func initDevnet(ctx *cli.Context, logger log.Logger) (devnet.Devnet, error) {
baseRpcHost := ctx.String(BaseRpcHostFlag.Name)
baseRpcPort := ctx.Int(BaseRpcPortFlag.Name)
producerCount := int(ctx.Uint(BlockProducersFlag.Name))
gasLimit := ctx.Uint64(GasLimitFlag.Name)
var dirLogLevel log.Lvl = log.LvlTrace
var consoleLogLevel log.Lvl = log.LvlCrit
if ctx.IsSet(logging.LogVerbosityFlag.Name) {
lvlVal := ctx.String(logging.LogVerbosityFlag.Name)
i, err := strconv.Atoi(lvlVal)
lvl := log.Lvl(i)
if err != nil {
lvl, err = log.LvlFromString(lvlVal)
}
if err == nil {
consoleLogLevel = lvl
dirLogLevel = lvl
}
} else {
if ctx.IsSet(logging.LogConsoleVerbosityFlag.Name) {
lvlVal := ctx.String(logging.LogConsoleVerbosityFlag.Name)
i, err := strconv.Atoi(lvlVal)
lvl := log.Lvl(i)
if err != nil {
lvl, err = log.LvlFromString(lvlVal)
}
if err == nil {
consoleLogLevel = lvl
}
}
if ctx.IsSet(logging.LogDirVerbosityFlag.Name) {
lvlVal := ctx.String(logging.LogDirVerbosityFlag.Name)
i, err := strconv.Atoi(lvlVal)
lvl := log.Lvl(i)
if err != nil {
lvl, err = log.LvlFromString(lvlVal)
}
if err == nil {
dirLogLevel = lvl
}
}
}
switch chainName {
case networkname.BorDevnetChainName:
if ctx.Bool(WithoutHeimdallFlag.Name) {
return networks.NewBorDevnetWithoutHeimdall(dataDir, baseRpcHost, baseRpcPort, logger), nil
return networks.NewBorDevnetWithoutHeimdall(dataDir, baseRpcHost, baseRpcPort, gasLimit, logger, consoleLogLevel, dirLogLevel), nil
} else if ctx.Bool(LocalHeimdallFlag.Name) {
heimdallGrpcAddr := ctx.String(HeimdallGrpcAddressFlag.Name)
sprintSize := uint64(ctx.Int(BorSprintSizeFlag.Name))
return networks.NewBorDevnetWithLocalHeimdall(dataDir, baseRpcHost, baseRpcPort, heimdallGrpcAddr, sprintSize, producerCount, logger), nil
return networks.NewBorDevnetWithLocalHeimdall(dataDir, baseRpcHost, baseRpcPort, heimdallGrpcAddr, sprintSize, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil
} else {
return networks.NewBorDevnetWithRemoteHeimdall(dataDir, baseRpcHost, baseRpcPort, producerCount, logger), nil
return networks.NewBorDevnetWithRemoteHeimdall(dataDir, baseRpcHost, baseRpcPort, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil
}
case networkname.DevChainName:
return networks.NewDevDevnet(dataDir, baseRpcHost, baseRpcPort, producerCount, logger), nil
return networks.NewDevDevnet(dataDir, baseRpcHost, baseRpcPort, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil
default:
return nil, fmt.Errorf("unknown network: '%s'", chainName)

View File

@ -1,6 +1,7 @@
package networks
import (
"strconv"
"time"
"github.com/ledgerwatch/log/v3"
@ -21,7 +22,10 @@ func NewBorDevnetWithoutHeimdall(
dataDir string,
baseRpcHost string,
baseRpcPort int,
gasLimit uint64,
logger log.Logger,
consoleLogLevel log.Lvl,
dirLogLevel log.Lvl,
) devnet.Devnet {
faucetSource := accounts.NewAccount("faucet-source")
@ -34,8 +38,11 @@ func NewBorDevnetWithoutHeimdall(
BaseRPCHost: baseRpcHost,
BaseRPCPort: baseRpcPort,
//Snapshots: true,
Alloc: types.GenesisAlloc{
faucetSource.Address: {Balance: accounts.EtherAmount(200_000)},
Genesis: &types.Genesis{
Alloc: types.GenesisAlloc{
faucetSource.Address: {Balance: accounts.EtherAmount(200_000)},
},
GasLimit: gasLimit,
},
Services: []devnet.Service{
account_services.NewFaucet(networkname.BorDevnetChainName, faucetSource),
@ -43,16 +50,16 @@ func NewBorDevnetWithoutHeimdall(
Nodes: []devnet.Node{
&args.BlockProducer{
NodeArgs: args.NodeArgs{
ConsoleVerbosity: "0",
DirVerbosity: "5",
ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)),
DirVerbosity: strconv.Itoa(int(dirLogLevel)),
WithoutHeimdall: true,
},
AccountSlots: 200,
},
&args.BlockConsumer{
NodeArgs: args.NodeArgs{
ConsoleVerbosity: "0",
DirVerbosity: "5",
ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)),
DirVerbosity: strconv.Itoa(int(dirLogLevel)),
WithoutHeimdall: true,
},
},
@ -70,8 +77,11 @@ func NewBorDevnetWithHeimdall(
heimdallGrpcAddr string,
checkpointOwner *accounts.Account,
producerCount int,
gasLimit uint64,
withMilestones bool,
logger log.Logger,
consoleLogLevel log.Lvl,
dirLogLevel log.Lvl,
) devnet.Devnet {
faucetSource := accounts.NewAccount("faucet-source")
@ -89,8 +99,8 @@ func NewBorDevnetWithHeimdall(
for i := 0; i < producerCount; i++ {
nodes = append(nodes, &args.BlockProducer{
NodeArgs: args.NodeArgs{
ConsoleVerbosity: "0",
DirVerbosity: "5",
ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)),
DirVerbosity: strconv.Itoa(int(dirLogLevel)),
HeimdallGrpcAddr: heimdallGrpcAddr,
},
AccountSlots: 20000,
@ -108,14 +118,17 @@ func NewBorDevnetWithHeimdall(
BorStateSyncDelay: 5 * time.Second,
BorWithMilestones: &withMilestones,
Services: append(services, account_services.NewFaucet(networkname.BorDevnetChainName, faucetSource)),
Alloc: types.GenesisAlloc{
faucetSource.Address: {Balance: accounts.EtherAmount(200_000)},
Genesis: &types.Genesis{
Alloc: types.GenesisAlloc{
faucetSource.Address: {Balance: accounts.EtherAmount(200_000)},
},
GasLimit: gasLimit,
},
Nodes: append(nodes,
&args.BlockConsumer{
NodeArgs: args.NodeArgs{
ConsoleVerbosity: "0",
DirVerbosity: "5",
ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)),
DirVerbosity: strconv.Itoa(int(dirLogLevel)),
HeimdallGrpcAddr: heimdallGrpcAddr,
},
}),
@ -130,15 +143,17 @@ func NewBorDevnetWithHeimdall(
BaseRPCHost: baseRpcHost,
BaseRPCPort: baseRpcPort + 1000,
Services: append(services, account_services.NewFaucet(networkname.DevChainName, faucetSource)),
Alloc: types.GenesisAlloc{
faucetSource.Address: {Balance: accounts.EtherAmount(200_000)},
checkpointOwner.Address: {Balance: accounts.EtherAmount(10_000)},
Genesis: &types.Genesis{
Alloc: types.GenesisAlloc{
faucetSource.Address: {Balance: accounts.EtherAmount(200_000)},
checkpointOwner.Address: {Balance: accounts.EtherAmount(10_000)},
},
},
Nodes: []devnet.Node{
&args.BlockProducer{
NodeArgs: args.NodeArgs{
ConsoleVerbosity: "0",
DirVerbosity: "5",
ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)),
DirVerbosity: strconv.Itoa(int(dirLogLevel)),
VMDebug: true,
HttpCorsDomain: "*",
},
@ -147,8 +162,8 @@ func NewBorDevnetWithHeimdall(
},
&args.BlockConsumer{
NodeArgs: args.NodeArgs{
ConsoleVerbosity: "0",
DirVerbosity: "3",
ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)),
DirVerbosity: strconv.Itoa(int(dirLogLevel)),
},
},
},
@ -165,7 +180,10 @@ func NewBorDevnetWithRemoteHeimdall(
baseRpcHost string,
baseRpcPort int,
producerCount int,
gasLimit uint64,
logger log.Logger,
consoleLogLevel log.Lvl,
dirLogLevel log.Lvl,
) devnet.Devnet {
heimdallGrpcAddr := ""
checkpointOwner := accounts.NewAccount("checkpoint-owner")
@ -178,8 +196,11 @@ func NewBorDevnetWithRemoteHeimdall(
heimdallGrpcAddr,
checkpointOwner,
producerCount,
gasLimit,
withMilestones,
logger)
logger,
consoleLogLevel,
dirLogLevel)
}
func NewBorDevnetWithLocalHeimdall(
@ -189,7 +210,10 @@ func NewBorDevnetWithLocalHeimdall(
heimdallGrpcAddr string,
sprintSize uint64,
producerCount int,
gasLimit uint64,
logger log.Logger,
consoleLogLevel log.Lvl,
dirLogLevel log.Lvl,
) devnet.Devnet {
config := *params.BorDevnetChainConfig
borConfig := config.Bor.(*borcfg.BorConfig)
@ -216,7 +240,8 @@ func NewBorDevnetWithLocalHeimdall(
heimdallGrpcAddr,
checkpointOwner,
producerCount,
gasLimit,
// milestones are not supported yet on the local heimdall
false,
logger)
logger, consoleLogLevel, dirLogLevel)
}

View File

@ -1,6 +1,8 @@
package networks
import (
"strconv"
"github.com/ledgerwatch/erigon-lib/chain/networkname"
"github.com/ledgerwatch/erigon/cmd/devnet/accounts"
"github.com/ledgerwatch/erigon/cmd/devnet/args"
@ -15,7 +17,10 @@ func NewDevDevnet(
baseRpcHost string,
baseRpcPort int,
producerCount int,
gasLimit uint64,
logger log.Logger,
consoleLogLevel log.Lvl,
dirLogLevel log.Lvl,
) devnet.Devnet {
faucetSource := accounts.NewAccount("faucet-source")
@ -28,8 +33,8 @@ func NewDevDevnet(
for i := 0; i < producerCount; i++ {
nodes = append(nodes, &args.BlockProducer{
NodeArgs: args.NodeArgs{
ConsoleVerbosity: "0",
DirVerbosity: "5",
ConsoleVerbosity: strconv.Itoa(int(consoleLogLevel)),
DirVerbosity: strconv.Itoa(int(dirLogLevel)),
},
AccountSlots: 200,
})
@ -42,8 +47,11 @@ func NewDevDevnet(
BasePrivateApiAddr: "localhost:10090",
BaseRPCHost: baseRpcHost,
BaseRPCPort: baseRpcPort,
Alloc: types.GenesisAlloc{
faucetSource.Address: {Balance: accounts.EtherAmount(200_000)},
Genesis: &types.Genesis{
Alloc: types.GenesisAlloc{
faucetSource.Address: {Balance: accounts.EtherAmount(200_000)},
},
GasLimit: gasLimit,
},
Services: []devnet.Service{
account_services.NewFaucet(networkname.DevChainName, faucetSource),

View File

@ -184,34 +184,50 @@ func (req *requestGenerator) rpcCall(ctx context.Context, result interface{}, me
})
}
const connectionTimeout = time.Second * 5
const requestTimeout = time.Second * 20
const connectionTimeout = time.Millisecond * 500
func isConnectionError(err error) bool {
var opErr *net.OpError
if errors.As(err, &opErr) {
switch {
case errors.As(err, &opErr):
return opErr.Op == "dial"
case errors.Is(err, context.DeadlineExceeded):
return true
}
return false
}
func retryConnects(ctx context.Context, op func(context.Context) error) error {
ctx, cancel := context.WithTimeout(ctx, connectionTimeout)
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
return retry(ctx, op, isConnectionError, time.Millisecond*200, nil)
return retry(ctx, op, isConnectionError, time.Second*1, nil)
}
func retry(ctx context.Context, op func(context.Context) error, isRecoverableError func(error) bool, delay time.Duration, lastErr error) error {
err := op(ctx)
opctx, cancel := context.WithTimeout(ctx, connectionTimeout)
defer cancel()
err := op(opctx)
if err == nil {
return nil
}
if errors.Is(err, context.DeadlineExceeded) && lastErr != nil {
return lastErr
}
if !isRecoverableError(err) {
return err
}
if errors.Is(err, context.DeadlineExceeded) {
if lastErr != nil {
return lastErr
}
err = nil
}
delayTimer := time.NewTimer(delay)
select {
case <-delayTimer.C:

View File

@ -16,7 +16,7 @@ import (
"github.com/ledgerwatch/log/v3"
)
func initDevnet(chainName string, dataDir string, producerCount int, logger log.Logger) (devnet.Devnet, error) {
func initDevnet(chainName string, dataDir string, producerCount int, gasLimit uint64, logger log.Logger, consoleLogLevel log.Lvl, dirLogLevel log.Lvl) (devnet.Devnet, error) {
const baseRpcHost = "localhost"
const baseRpcPort = 8545
@ -24,17 +24,17 @@ func initDevnet(chainName string, dataDir string, producerCount int, logger log.
case networkname.BorDevnetChainName:
heimdallGrpcAddr := polygon.HeimdallGrpcAddressDefault
const sprintSize uint64 = 0
return networks.NewBorDevnetWithLocalHeimdall(dataDir, baseRpcHost, baseRpcPort, heimdallGrpcAddr, sprintSize, producerCount, logger), nil
return networks.NewBorDevnetWithLocalHeimdall(dataDir, baseRpcHost, baseRpcPort, heimdallGrpcAddr, sprintSize, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil
case networkname.DevChainName:
return networks.NewDevDevnet(dataDir, baseRpcHost, baseRpcPort, producerCount, logger), nil
return networks.NewDevDevnet(dataDir, baseRpcHost, baseRpcPort, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel), nil
case "":
envChainName, _ := os.LookupEnv("DEVNET_CHAIN")
if envChainName == "" {
envChainName = networkname.DevChainName
}
return initDevnet(envChainName, dataDir, producerCount, logger)
return initDevnet(envChainName, dataDir, producerCount, gasLimit, logger, consoleLogLevel, dirLogLevel)
default:
return nil, fmt.Errorf("unknown network: '%s'", chainName)
@ -57,8 +57,12 @@ func ContextStart(t *testing.T, chainName string) (devnet.Context, error) {
producerCount, _ := strconv.ParseUint(envProducerCount, 10, 64)
// TODO get log levels from env
var dirLogLevel log.Lvl = log.LvlTrace
var consoleLogLevel log.Lvl = log.LvlCrit
var network devnet.Devnet
network, err := initDevnet(chainName, dataDir, int(producerCount), logger)
network, err := initDevnet(chainName, dataDir, int(producerCount), 0, logger, consoleLogLevel, dirLogLevel)
if err != nil {
return nil, fmt.Errorf("ContextStart initDevnet failed: %w", err)
}

View File

@ -140,27 +140,20 @@ func SendTxLoad(ctx context.Context, to, from string, amount uint64, txPerSec ui
for {
start := time.Now()
lowtx, hightx, err := CreateManyEIP1559TransactionsRefWithBaseFee2(ctx, to, from, int(batchCount))
tx, err := CreateManyEIP1559TransactionsHigherThanBaseFee(ctx, to, from, int(batchCount))
if err != nil {
logger.Error("failed Create Txs", "error", err)
return err
}
_, err = SendManyTransactions(ctx, lowtx)
_, err = SendManyTransactions(ctx, tx)
if err != nil {
logger.Error("failed SendManyTransactions(higherThanBaseFeeTxs)", "error", err)
return err
}
_, err = SendManyTransactions(ctx, hightx)
if err != nil {
logger.Error("failed SendManyTransactions(lowerThanBaseFeeTxs)", "error", err)
return err
}
select {
case <-ctx.Done():
return nil
@ -249,6 +242,33 @@ func CreateManyEIP1559TransactionsRefWithBaseFee2(ctx context.Context, to, from
return lowerBaseFeeTransactions, higherBaseFeeTransactions, nil
}
func CreateManyEIP1559TransactionsHigherThanBaseFee(ctx context.Context, to, from string, count int) ([]types.Transaction, error) {
toAddress := libcommon.HexToAddress(to)
fromAddress := libcommon.HexToAddress(from)
baseFeePerGas, err := blocks.BaseFeeFromBlock(ctx)
if err != nil {
return nil, fmt.Errorf("failed BaseFeeFromBlock: %v", err)
}
baseFeePerGas = baseFeePerGas * 2
devnet.Logger(ctx).Info("BaseFeePerGas2", "val", baseFeePerGas)
node := devnet.SelectNode(ctx)
res, err := node.GetTransactionCount(fromAddress, rpc.PendingBlock)
if err != nil {
return nil, fmt.Errorf("failed to get transaction count for address 0x%x: %v", fromAddress, err)
}
nonce := res.Uint64()
return signEIP1559TxsHigherThanBaseFee(ctx, count, baseFeePerGas, &nonce, toAddress, fromAddress)
}
// createNonContractTx returns a signed transaction and the recipient address
func CreateTransaction(node devnet.Node, to, from string, value uint64) (types.Transaction, libcommon.Address, error) {
toAccount := accounts.GetAccount(to)
@ -344,7 +364,7 @@ func signEIP1559TxsLowerThanBaseFee(ctx context.Context, n int, baseFeePerGas ui
transaction := types.NewEIP1559Transaction(chainId, *nonce, toAddress, uint256.NewInt(value), uint64(210_000), uint256.NewInt(gasPrice), new(uint256.Int), uint256.NewInt(gasFeeCap), nil)
devnet.Logger(ctx).Info("LOWER", "transaction", i, "nonce", transaction.Nonce, "value", transaction.Value, "feecap", transaction.FeeCap)
devnet.Logger(ctx).Trace("LOWER", "transaction", i, "nonce", transaction.Nonce, "value", transaction.Value, "feecap", transaction.FeeCap)
signedTransaction, err := types.SignTx(transaction, signer, accounts.SigKey(fromAddress))
@ -385,7 +405,7 @@ func signEIP1559TxsHigherThanBaseFee(ctx context.Context, n int, baseFeePerGas u
transaction := types.NewEIP1559Transaction(chainId, *nonce, toAddress, uint256.NewInt(value), uint64(210_000), uint256.NewInt(gasPrice), new(uint256.Int), uint256.NewInt(gasFeeCap), nil)
devnet.Logger(ctx).Info("HIGHER", "transaction", i, "nonce", transaction.Nonce, "value", transaction.Value, "feecap", transaction.FeeCap)
devnet.Logger(ctx).Trace("HIGHER", "transaction", i, "nonce", transaction.Nonce, "value", transaction.Value, "feecap", transaction.FeeCap)
signerKey := accounts.SigKey(fromAddress)
if signerKey == nil {
@ -407,7 +427,7 @@ func signEIP1559TxsHigherThanBaseFee(ctx context.Context, n int, baseFeePerGas u
func SendManyTransactions(ctx context.Context, signedTransactions []types.Transaction) ([]libcommon.Hash, error) {
logger := devnet.Logger(ctx)
logger.Info("Sending multiple transactions to the txpool...")
logger.Info(fmt.Sprintf("Sending %d transactions to the txpool...", len(signedTransactions)))
hashes := make([]libcommon.Hash, len(signedTransactions))
for idx, tx := range signedTransactions {

View File

@ -208,7 +208,7 @@ func subscribeToStateChangesLoop(ctx context.Context, client StateChangesClient,
time.Sleep(3 * time.Second)
continue
}
log.Warn("[txpool.handleStateChanges]", "err", err)
log.Warn("[rpcdaemon subscribeToStateChanges]", "err", err)
}
}
}()

View File

@ -170,12 +170,6 @@ func doTxpool(ctx context.Context, logger log.Logger) error {
fetch.ConnectCore()
fetch.ConnectSentries()
/*
var ethashApi *ethash.API
sif casted, ok := backend.engine.(*ethash.Ethash); ok {
ethashApi = casted.APIs(nil)[1].Service.(*ethash.API)
}
*/
miningGrpcServer := privateapi.NewMiningServer(ctx, &rpcdaemontest.IsMiningMock{}, nil, logger)
grpcServer, err := txpool.StartGrpc(txpoolGrpcServer, miningGrpcServer, txpoolApiAddr, nil, logger)

View File

@ -82,7 +82,7 @@ func (cr ChainReaderImpl) FrozenBlocks() uint64 {
func (cr ChainReaderImpl) BorSpan(spanId uint64) []byte {
spanBytes, err := cr.BlockReader.Span(context.Background(), cr.Db, spanId)
if err != nil {
log.Error("BorSpan failed", "err", err)
log.Error("[consensus] BorSpan failed", "err", err)
}
return spanBytes
}

View File

@ -171,7 +171,7 @@ func Clean(path string) string {
return FromSlash(out.string())
}
func unixIsLocal(path string) bool {
func unixIsLocal(path string) bool { //nolint
if IsAbs(path) || path == "" {
return false
}

View File

@ -175,51 +175,6 @@ func HasPrefix(p, prefix string) bool {
return strings.HasPrefix(strings.ToLower(p), strings.ToLower(prefix))
}
func splitList(path string) []string {
// The same implementation is used in LookPath in os/exec;
// consider changing os/exec when changing this.
if path == "" {
return []string{}
}
// Split path, respecting but preserving quotes.
list := []string{}
start := 0
quo := false
for i := 0; i < len(path); i++ {
switch c := path[i]; {
case c == '"':
quo = !quo
case c == ListSeparator && !quo:
list = append(list, path[start:i])
start = i + 1
}
}
list = append(list, path[start:])
// Remove quotes.
for i, s := range list {
list[i] = strings.ReplaceAll(s, `"`, ``)
}
return list
}
func abs(path string) (string, error) {
if path == "" {
// syscall.FullPath returns an error on empty path, because it's not a valid path.
// To implement Abs behavior of returning working directory on empty string input,
// special-case empty path by changing it to "." path. See golang.org/issue/24441.
path = "."
}
fullPath, err := syscall.FullPath(path)
if err != nil {
return "", err
}
return Clean(fullPath), nil
}
func join(elem []string) string {
var b strings.Builder
var lastChar byte
@ -260,47 +215,3 @@ func join(elem []string) string {
}
return Clean(b.String())
}
// joinNonEmpty is like join, but it assumes that the first element is non-empty.
func joinNonEmpty(elem []string) string {
if len(elem[0]) == 2 && elem[0][1] == ':' {
// First element is drive letter without terminating slash.
// Keep path relative to current directory on that drive.
// Skip empty elements.
i := 1
for ; i < len(elem); i++ {
if elem[i] != "" {
break
}
}
return Clean(elem[0] + strings.Join(elem[i:], string(Separator)))
}
// The following logic prevents Join from inadvertently creating a
// UNC path on Windows. Unless the first element is a UNC path, Join
// shouldn't create a UNC path. See golang.org/issue/9167.
p := Clean(strings.Join(elem, string(Separator)))
if !isUNC(p) {
return p
}
// p == UNC only allowed when the first element is a UNC path.
head := Clean(elem[0])
if isUNC(head) {
return p
}
// head + tail == UNC, but joining two non-UNC paths should not result
// in a UNC path. Undo creation of UNC path.
tail := Clean(strings.Join(elem[1:], string(Separator)))
if head[len(head)-1] == Separator {
return head + tail
}
return head + string(Separator) + tail
}
// isUNC reports whether path is a UNC path.
func isUNC(path string) bool {
return len(path) > 1 && isSlash(path[0]) && isSlash(path[1])
}
func sameWord(a, b string) bool {
return strings.EqualFold(a, b)
}

View File

@ -41,7 +41,7 @@ func Mmap(f *os.File, size int) ([]byte, *[MaxMapSize]byte, error) {
}
// Close mapping handle.
if err := windows.CloseHandle(windows.Handle(h)); err != nil {
if err := windows.CloseHandle(h); err != nil {
return nil, nil, os.NewSyscallError("CloseHandle", err)
}

View File

@ -126,7 +126,6 @@ type metaTx struct {
timestamp uint64 // when it was added to pool
subPool SubPoolMarker
currentSubPool SubPoolType
alreadyYielded bool
minedBlockNum uint64
}
@ -211,6 +210,7 @@ type TxPool struct {
cfg txpoolcfg.Config
chainID uint256.Int
lastSeenBlock atomic.Uint64
lastSeenCond *sync.Cond
lastFinalizedBlock atomic.Uint64
started atomic.Bool
pendingBaseFee atomic.Uint64
@ -249,8 +249,11 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config,
tracedSenders[common.BytesToAddress([]byte(sender))] = struct{}{}
}
lock := &sync.Mutex{}
res := &TxPool{
lock: &sync.Mutex{},
lock: lock,
lastSeenCond: sync.NewCond(lock),
byHash: map[string]*metaTx{},
isLocalLRU: localsHistory,
discardReasonsLRU: discardHistory,
@ -298,42 +301,81 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config,
return res, nil
}
func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error {
if err := minedTxs.Valid(); err != nil {
return err
func (p *TxPool) Start(ctx context.Context, db kv.RwDB) error {
if p.started.Load() {
return nil
}
return db.View(ctx, func(tx kv.Tx) error {
coreDb, _ := p.coreDBWithCache()
coreTx, err := coreDb.BeginRo(ctx)
if err != nil {
return err
}
defer coreTx.Rollback()
if err := p.fromDB(ctx, tx, coreTx); err != nil {
return fmt.Errorf("loading pool from DB: %w", err)
}
if p.started.CompareAndSwap(false, true) {
p.logger.Info("[txpool] Started")
}
return nil
})
}
func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error {
defer newBlockTimer.ObserveDuration(time.Now())
//t := time.Now()
coreDB, cache := p.coreDBWithCache()
cache.OnNewBlock(stateChanges)
coreTx, err := coreDB.BeginRo(ctx)
if err != nil {
return err
}
defer coreTx.Rollback()
p.lastSeenBlock.Store(stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].BlockHeight)
if !p.started.Load() {
if err := p.fromDBWithLock(ctx, tx, coreTx); err != nil {
return fmt.Errorf("OnNewBlock: loading txs from DB: %w", err)
}
block := stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].BlockHeight
baseFee := stateChanges.PendingBlockBaseFee
available := len(p.pending.best.ms)
defer func() {
p.logger.Debug("[txpool] New block", "block", block, "unwound", len(unwindTxs.Txs), "mined", len(minedTxs.Txs), "baseFee", baseFee, "pending-pre", available, "pending", p.pending.Len(), "baseFee", p.baseFee.Len(), "queued", p.queued.Len(), "err", err)
}()
if err = minedTxs.Valid(); err != nil {
return err
}
cacheView, err := cache.View(ctx, coreTx)
if err != nil {
return err
}
p.lock.Lock()
defer p.lock.Unlock()
defer func() {
if err == nil {
p.lastSeenBlock.Store(block)
p.lastSeenCond.Broadcast()
}
p.lock.Unlock()
}()
if assert.Enable {
if _, err := kvcache.AssertCheckValues(ctx, coreTx, cache); err != nil {
p.logger.Error("AssertCheckValues", "err", err, "stack", stack.Trace().String())
}
}
baseFee := stateChanges.PendingBlockBaseFee
pendingBaseFee, baseFeeChanged := p.setBaseFee(baseFee)
// Update pendingBase for all pool queues and slices
@ -350,10 +392,13 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
p.setBlobFee(pendingBlobFee)
p.blockGasLimit.Store(stateChanges.BlockGasLimit)
if err := p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs, p.logger); err != nil {
if err = p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs, p.logger); err != nil {
return err
}
_, unwindTxs, err = p.validateTxs(&unwindTxs, cacheView)
if err != nil {
return err
}
@ -371,21 +416,23 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
}
}
if err := p.processMinedFinalizedBlobs(coreTx, minedTxs.Txs, stateChanges.FinalizedBlock); err != nil {
return err
}
if err := removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil {
if err = p.processMinedFinalizedBlobs(coreTx, minedTxs.Txs, stateChanges.FinalizedBlock); err != nil {
return err
}
//p.logger.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "baseFee", baseFee, "blockHeight", blockHeight)
if err = removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil {
return err
}
var announcements types.Announcements
announcements, err = addTxsOnNewBlock(block, cacheView, stateChanges, p.senders, unwindTxs, /* newTxs */
pendingBaseFee, stateChanges.BlockGasLimit, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, p.logger)
announcements, err := addTxsOnNewBlock(p.lastSeenBlock.Load(), cacheView, stateChanges, p.senders, unwindTxs, /* newTxs */
pendingBaseFee, stateChanges.BlockGasLimit,
p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, p.logger)
if err != nil {
return err
}
p.pending.EnforceWorstInvariants()
p.baseFee.EnforceInvariants()
p.queued.EnforceInvariants()
@ -394,10 +441,6 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
p.promoted.Reset()
p.promoted.AppendOther(announcements)
if p.started.CompareAndSwap(false, true) {
p.logger.Info("[txpool] Started")
}
if p.promoted.Len() > 0 {
select {
case p.newPendingTxs <- p.promoted.Copy():
@ -405,12 +448,11 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
}
}
//p.logger.Info("[txpool] new block", "number", p.lastSeenBlock.Load(), "pendngBaseFee", pendingBaseFee, "in", time.Since(t))
return nil
}
func (p *TxPool) processRemoteTxs(ctx context.Context) error {
if !p.started.Load() {
if !p.Started() {
return fmt.Errorf("txpool not started yet")
}
@ -610,20 +652,29 @@ func (p *TxPool) IsLocal(idHash []byte) bool {
func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) }
func (p *TxPool) Started() bool { return p.started.Load() }
func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) {
// First wait for the corresponding block to arrive
if p.lastSeenBlock.Load() < onTopOf {
return false, 0, nil // Too early
func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, yielded mapset.Set[[32]byte]) (bool, int, error) {
p.lock.Lock()
defer p.lock.Unlock()
for last := p.lastSeenBlock.Load(); last < onTopOf; last = p.lastSeenBlock.Load() {
p.logger.Debug("[txpool] Waiting for block", "expecting", onTopOf, "lastSeen", last, "txRequested", n, "pending", p.pending.Len(), "baseFee", p.baseFee.Len(), "queued", p.queued.Len())
p.lastSeenCond.Wait()
}
isShanghai := p.isShanghai() || p.isAgra()
best := p.pending.best
isShanghai := p.isShanghai() || p.isAgra()
txs.Resize(uint(cmp.Min(int(n), len(best.ms))))
var toRemove []*metaTx
count := 0
i := 0
for i := 0; count < int(n) && i < len(best.ms); i++ {
defer func() {
p.logger.Debug("[txpool] Processing best request", "last", onTopOf, "txRequested", n, "txAvailable", len(best.ms), "txProcessed", i, "txReturned", count)
}()
for ; count < int(n) && i < len(best.ms); i++ {
// if we wouldn't have enough gas for a standard transaction then quit out early
if availableGas < fixedgas.TxGas {
break
@ -631,7 +682,7 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG
mt := best.ms[i]
if toSkip.Contains(mt.Tx.IDHash) {
if yielded.Contains(mt.Tx.IDHash) {
continue
}
@ -669,7 +720,7 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG
txs.Txs[count] = rlpTx
copy(txs.Senders.At(count), sender.Bytes())
txs.IsLocal[count] = isLocal
toSkip.Add(mt.Tx.IDHash) // TODO: Is this unnecessary
yielded.Add(mt.Tx.IDHash)
count++
}
@ -682,26 +733,13 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG
return true, count, nil
}
func (p *TxPool) ResetYieldedStatus() {
p.lock.Lock()
defer p.lock.Unlock()
best := p.pending.best
for i := 0; i < len(best.ms); i++ {
best.ms[i].alreadyYielded = false
}
}
func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) {
p.lock.Lock()
defer p.lock.Unlock()
return p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, toSkip)
}
func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64) (bool, error) {
set := mapset.NewThreadUnsafeSet[[32]byte]()
p.lock.Lock()
defer p.lock.Unlock()
onTime, _, err := p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, set)
onTime, _, err := p.YieldBest(n, txs, tx, onTopOf, availableGas, availableBlobGas, set)
return onTime, err
}
@ -1075,15 +1113,6 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots,
p.lock.Lock()
defer p.lock.Unlock()
if !p.Started() {
if err := p.fromDB(ctx, tx, coreTx); err != nil {
return nil, fmt.Errorf("AddLocalTxs: loading txs from DB: %w", err)
}
if p.started.CompareAndSwap(false, true) {
p.logger.Info("[txpool] Started")
}
}
if err = p.senders.registerNewSenders(&newTransactions, p.logger); err != nil {
return nil, err
}
@ -1432,27 +1461,38 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P
}
var toDel []*metaTx // can't delete items while iterate them
discarded := 0
pendingRemoved := 0
baseFeeRemoved := 0
queuedRemoved := 0
for senderID, nonce := range noncesToRemove {
//if sender.all.Len() > 0 {
//logger.Debug("[txpool] removing mined", "senderID", tx.senderID, "sender.all.len()", sender.all.Len())
//}
// delete mined transactions from everywhere
byNonce.ascend(senderID, func(mt *metaTx) bool {
//logger.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", it.metaTx.Tx.nonce, "sender.nonce", sender.nonce)
if mt.Tx.Nonce > nonce {
if mt.Tx.Traced {
logger.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", mt.Tx.Nonce, "sender.nonce", nonce)
}
return false
}
if mt.Tx.Traced {
logger.Info(fmt.Sprintf("TX TRACING: removeMined idHash=%x senderId=%d, currentSubPool=%s", mt.Tx.IDHash, mt.Tx.SenderID, mt.currentSubPool))
}
toDel = append(toDel, mt)
// del from sub-pool
switch mt.currentSubPool {
case PendingSubPool:
pendingRemoved++
pending.Remove(mt)
case BaseFeeSubPool:
baseFeeRemoved++
baseFee.Remove(mt)
case QueuedSubPool:
queuedRemoved++
queued.Remove(mt)
default:
//already removed
@ -1460,11 +1500,18 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P
return true
})
discarded += len(toDel)
for _, mt := range toDel {
discard(mt, txpoolcfg.Mined)
}
toDel = toDel[:0]
}
if discarded > 0 {
logger.Debug("Discarding Transactions", "count", discarded, "pending", pendingRemoved, "baseFee", baseFeeRemoved, "queued", queuedRemoved)
}
return nil
}
@ -1657,6 +1704,13 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
logEvery := time.NewTicker(p.cfg.LogEvery)
defer logEvery.Stop()
err := p.Start(ctx, db)
if err != nil {
p.logger.Error("[txpool] Failed to start", "err", err)
return
}
for {
select {
case <-ctx.Done():
@ -1724,7 +1778,7 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
var remoteTxSizes []uint32
var remoteTxHashes types.Hashes
var remoteTxRlps [][]byte
var broadCastedHashes types.Hashes
var broadcastHashes types.Hashes
slotsRlp := make([][]byte, 0, announcements.Len())
if err := db.View(ctx, func(tx kv.Tx) error {
@ -1748,7 +1802,7 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
// "Nodes MUST NOT automatically broadcast blob transactions to their peers" - EIP-4844
if t != types.BlobTxType {
localTxRlps = append(localTxRlps, slotRlp)
broadCastedHashes = append(broadCastedHashes, hash...)
broadcastHashes = append(broadcastHashes, hash...)
}
} else {
remoteTxTypes = append(remoteTxTypes, t)
@ -1775,12 +1829,12 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
const localTxsBroadcastMaxPeers uint64 = 10
txSentTo := send.BroadcastPooledTxs(localTxRlps, localTxsBroadcastMaxPeers)
for i, peer := range txSentTo {
p.logger.Info("Local tx broadcasted", "txHash", hex.EncodeToString(broadCastedHashes.At(i)), "to peer", peer)
p.logger.Trace("Local tx broadcast", "txHash", hex.EncodeToString(broadcastHashes.At(i)), "to peer", peer)
}
hashSentTo := send.AnnouncePooledTxs(localTxTypes, localTxSizes, localTxHashes, localTxsBroadcastMaxPeers*2)
for i := 0; i < localTxHashes.Len(); i++ {
hash := localTxHashes.At(i)
p.logger.Info("Local tx announced", "txHash", hex.EncodeToString(hash), "to peer", hashSentTo[i], "baseFee", p.pendingBaseFee.Load())
p.logger.Trace("Local tx announced", "txHash", hex.EncodeToString(hash), "to peer", hashSentTo[i], "baseFee", p.pendingBaseFee.Load())
}
// broadcast remote transactions
@ -1844,6 +1898,7 @@ func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err err
}
return written, nil
}
func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
for i, mt := range p.deletedTxs {
id := mt.Tx.SenderID
@ -1927,20 +1982,33 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
return nil
}
func (p *TxPool) fromDBWithLock(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
p.lock.Lock()
defer p.lock.Unlock()
return p.fromDB(ctx, tx, coreTx)
}
func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
if p.lastSeenBlock.Load() == 0 {
lastSeenBlock, err := LastSeenBlock(tx)
if err != nil {
return err
}
p.lastSeenBlock.Store(lastSeenBlock)
}
// this is neccessary as otherwise best - which waits for sync events
// may wait for ever if blocks have been process before the txpool
// starts with an empty db
lastSeenProgress, err := getExecutionProgress(coreTx)
if err != nil {
return err
}
if p.lastSeenBlock.Load() < lastSeenProgress {
// TODO we need to process the blocks since the
// last seen to make sure that the tx pool is in
// sync with the processed blocks
p.lastSeenBlock.Store(lastSeenProgress)
}
cacheView, err := p._stateCache.View(ctx, coreTx)
if err != nil {
return err
@ -2031,6 +2099,24 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
p.pendingBlobFee.Store(pendingBlobFee)
return nil
}
func getExecutionProgress(db kv.Getter) (uint64, error) {
data, err := db.GetOne(kv.SyncStageProgress, []byte("Execution"))
if err != nil {
return 0, err
}
if len(data) == 0 {
return 0, nil
}
if len(data) < 8 {
return 0, fmt.Errorf("value must be at least 8 bytes, got %d", len(data))
}
return binary.BigEndian.Uint64(data[:8]), nil
}
func LastSeenBlock(tx kv.Getter) (uint64, error) {
v, err := tx.GetOne(kv.PoolInfo, PoolLastSeenBlockKey)
if err != nil {
@ -2093,7 +2179,7 @@ func (p *TxPool) printDebug(prefix string) {
}
}
func (p *TxPool) logStats() {
if !p.started.Load() {
if !p.Started() {
//p.logger.Info("[txpool] Not started yet, waiting for new blocks...")
return
}

View File

@ -316,6 +316,10 @@ func FuzzOnNewBlocks(f *testing.F) {
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New())
assert.NoError(err)
err = pool.Start(ctx, db)
assert.NoError(err)
pool.senders.senderIDs = senderIDs
for addr, id := range senderIDs {
pool.senders.senderID2Addr[id] = addr
@ -538,6 +542,7 @@ func FuzzOnNewBlocks(f *testing.F) {
p2, err := New(ch, coreDB, txpoolcfg.DefaultConfig, sendersCache, *u256.N1, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, log.New())
assert.NoError(err)
p2.senders = pool.senders // senders are not persisted
err = coreDB.View(ctx, func(coreTx kv.Tx) error { return p2.fromDB(ctx, tx, coreTx) })
require.NoError(err)

View File

@ -19,6 +19,7 @@ package txpool
import (
"bytes"
"context"
// "crypto/rand"
"fmt"
"math"
@ -957,6 +958,10 @@ func TestDropRemoteAtNoGossip(t *testing.T) {
require.True(txPool != nil)
ctx := context.Background()
err = txPool.Start(ctx, db)
assert.NoError(err)
var stateVersionID uint64 = 0
pendingBaseFee := uint64(1_000_000)
// start blocks from 0, set empty hash - then kvcache will also work on this

View File

@ -29,6 +29,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
lru "github.com/hashicorp/golang-lru/arc/v2"
@ -616,7 +617,6 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
miner := stagedsync.NewMiningState(&config.Miner)
backend.pendingBlocks = miner.PendingResultCh
backend.minedBlocks = miner.MiningResultCh
var (
snapDb kv.RwDB
@ -780,7 +780,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
}
}()
if err := backend.StartMining(context.Background(), backend.chainDB, mining, backend.config.Miner, backend.sentriesClient.Hd.QuitPoWMining, tmpdir, logger); err != nil {
if err := backend.StartMining(context.Background(), backend.chainDB, stateDiffClient, mining, miner, backend.gasPrice, backend.sentriesClient.Hd.QuitPoWMining, tmpdir, logger); err != nil {
return nil, err
}
@ -1010,7 +1010,8 @@ func (s *Ethereum) shouldPreserve(block *types.Block) bool { //nolint
// StartMining starts the miner with the given number of CPU threads. If mining
// is already running, this method adjust the number of threads allowed to use
// and updates the minimum price required by the transaction pool.
func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsync.Sync, cfg params.MiningConfig, quitCh chan struct{}, tmpDir string, logger log.Logger) error {
func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient *direct.StateDiffClientDirect, mining *stagedsync.Sync, miner stagedsync.MiningState, gasPrice *uint256.Int, quitCh chan struct{}, tmpDir string, logger log.Logger) error {
var borcfg *bor.Bor
if b, ok := s.engine.(*bor.Bor); ok {
borcfg = b
@ -1023,7 +1024,7 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy
}
//if borcfg == nil {
if !cfg.Enabled {
if !miner.MiningConfig.Enabled {
return nil
}
//}
@ -1036,14 +1037,14 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy
}
if borcfg != nil {
if cfg.Enabled {
if cfg.SigKey == nil {
if miner.MiningConfig.Enabled {
if miner.MiningConfig.SigKey == nil {
s.logger.Error("Etherbase account unavailable locally", "err", err)
return fmt.Errorf("signer missing: %w", err)
}
borcfg.Authorize(eb, func(_ libcommon.Address, mimeType string, message []byte) ([]byte, error) {
return crypto.Sign(crypto.Keccak256(message), cfg.SigKey)
return crypto.Sign(crypto.Keccak256(message), miner.MiningConfig.SigKey)
})
if !s.config.WithoutHeimdall {
@ -1082,47 +1083,73 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy
}
}
if clq != nil {
if cfg.SigKey == nil {
if miner.MiningConfig.SigKey == nil {
s.logger.Error("Etherbase account unavailable locally", "err", err)
return fmt.Errorf("signer missing: %w", err)
}
clq.Authorize(eb, func(_ libcommon.Address, mimeType string, message []byte) ([]byte, error) {
return crypto.Sign(crypto.Keccak256(message), cfg.SigKey)
return crypto.Sign(crypto.Keccak256(message), miner.MiningConfig.SigKey)
})
}
streamCtx, streamCancel := context.WithCancel(ctx)
stream, err := stateDiffClient.StateChanges(streamCtx, &remote.StateChangeRequest{WithStorage: false, WithTransactions: true}, grpc.WaitForReady(true))
if err != nil {
streamCancel()
return err
}
stateChangeCh := make(chan *remote.StateChange)
go func() {
for req, err := stream.Recv(); ; req, err = stream.Recv() {
if err == nil {
for _, change := range req.ChangeBatch {
stateChangeCh <- change
}
}
}
}()
go func() {
defer debug.LogPanic()
defer close(s.waitForMiningStop)
defer streamCancel()
mineEvery := time.NewTicker(cfg.Recommit)
mineEvery := time.NewTicker(miner.MiningConfig.Recommit)
defer mineEvery.Stop()
// Listen on a new head subscription. This allows us to maintain the block time by
// triggering mining after the block is passed through all stages.
newHeadCh, closeNewHeadCh := s.notifications.Events.AddHeaderSubscription()
defer closeNewHeadCh()
s.logger.Info("Starting to mine", "etherbase", eb)
var works bool
var working bool
var waiting atomic.Bool
hasWork := true // Start mining immediately
errc := make(chan error, 1)
workCtx, workCancel := context.WithCancel(ctx)
defer workCancel()
for {
// Only reset if some work was done previously as we'd like to rely
// on the `miner.recommit` as backup.
if hasWork {
mineEvery.Reset(cfg.Recommit)
mineEvery.Reset(miner.MiningConfig.Recommit)
}
// Only check for case if you're already mining (i.e. works = true) and
// Only check for case if you're already mining (i.e. working = true) and
// waiting for error or you don't have any work yet (i.e. hasWork = false).
if works || !hasWork {
if working || !hasWork {
select {
case <-newHeadCh:
case stateChanges := <-stateChangeCh:
block := stateChanges.BlockHeight
s.logger.Debug("Start mining new block based on previous block", "block", block)
// TODO - can do mining clean up here as we have previous
// block info in the state channel
hasWork = true
case <-s.notifyMiningAboutNewTxs:
// Skip mining based on new tx notif for bor consensus
hasWork = s.chainConfig.Bor == nil
@ -1130,10 +1157,12 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy
s.logger.Debug("Start mining new block based on txpool notif")
}
case <-mineEvery.C:
s.logger.Debug("Start mining new block based on miner.recommit")
hasWork = true
if !(working || waiting.Load()) {
s.logger.Debug("Start mining new block based on miner.recommit", "duration", miner.MiningConfig.Recommit)
}
hasWork = !(working || waiting.Load())
case err := <-errc:
works = false
working = false
hasWork = false
if errors.Is(err, libcommon.ErrStopped) {
return
@ -1146,11 +1175,36 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy
}
}
if !works && hasWork {
works = true
if !working && hasWork {
working = true
hasWork = false
mineEvery.Reset(cfg.Recommit)
go func() { errc <- stages2.MiningStep(ctx, db, mining, tmpDir, logger) }()
mineEvery.Reset(miner.MiningConfig.Recommit)
go func() {
err := stages2.MiningStep(ctx, db, mining, tmpDir, logger)
waiting.Store(true)
defer waiting.Store(false)
errc <- err
if err != nil {
return
}
for {
select {
case block := <-miner.MiningResultCh:
if block != nil {
s.logger.Debug("Mined block", "block", block.Number())
s.minedBlocks <- block
}
return
case <-workCtx.Done():
errc <- workCtx.Err()
return
}
}
}()
}
}
}()

View File

@ -17,8 +17,7 @@ import (
// ChainReader implements consensus.ChainReader
type ChainReader struct {
Cfg chain.Config
Cfg chain.Config
Db kv.Getter
BlockReader services.FullBlockReader
Logger log.Logger
@ -72,7 +71,7 @@ func (cr ChainReader) HasBlock(hash libcommon.Hash, number uint64) bool {
func (cr ChainReader) GetTd(hash libcommon.Hash, number uint64) *big.Int {
td, err := rawdb.ReadTd(cr.Db, hash, number)
if err != nil {
log.Error("ReadTd failed", "err", err)
cr.Logger.Error("ReadTd failed", "err", err)
return nil
}
return td

View File

@ -157,10 +157,13 @@ func BorHeimdallForward(
}
lastBlockNum := s.BlockNumber
if cfg.blockReader.FrozenBorBlocks() > lastBlockNum {
lastBlockNum = cfg.blockReader.FrozenBorBlocks()
}
recents, err := lru.NewARC[libcommon.Hash, *bor.Snapshot](inmemorySnapshots)
if err != nil {
return err
}
@ -168,6 +171,7 @@ func BorHeimdallForward(
if err != nil {
return err
}
chain := NewChainReaderImpl(&cfg.chainConfig, tx, cfg.blockReader, logger)
var blockNum uint64

View File

@ -605,7 +605,7 @@ func (cr ChainReaderImpl) BorEventsByBlock(hash libcommon.Hash, number uint64) [
func (cr ChainReaderImpl) BorSpan(spanId uint64) []byte {
span, err := cr.blockReader.Span(context.Background(), cr.tx, spanId)
if err != nil {
cr.logger.Error("BorSpan failed", "err", err)
cr.logger.Error("[staged sync] BorSpan failed", "err", err)
return nil
}
return span

View File

@ -77,7 +77,7 @@ func MiningBorHeimdallForward(
}
logger.Info(
"[%s] Finished processing", logPrefix,
fmt.Sprintf("[%s] Finished processing", logPrefix),
"progress", headerNum,
"lastSpanID", lastSpanID,
"lastStateSyncEventID", lastStateSyncEventID,

View File

@ -41,8 +41,8 @@ type MiningExecCfg struct {
tmpdir string
interrupt *int32
payloadId uint64
txPool2 TxPoolForMining
txPool2DB kv.RoDB
txPool TxPoolForMining
txPoolDB kv.RoDB
}
type TxPoolForMining interface {
@ -54,7 +54,7 @@ func StageMiningExecCfg(
notifier ChainEventNotifier, chainConfig chain.Config,
engine consensus.Engine, vmConfig *vm.Config,
tmpdir string, interrupt *int32, payloadId uint64,
txPool2 TxPoolForMining, txPool2DB kv.RoDB,
txPool TxPoolForMining, txPoolDB kv.RoDB,
blockReader services.FullBlockReader,
) MiningExecCfg {
return MiningExecCfg{
@ -68,8 +68,8 @@ func StageMiningExecCfg(
tmpdir: tmpdir,
interrupt: interrupt,
payloadId: payloadId,
txPool2: txPool2,
txPool2DB: txPool2DB,
txPool: txPool,
txPoolDB: txPoolDB,
}
}
@ -150,7 +150,7 @@ func SpawnMiningExecStage(s *StageState, tx kv.RwTx, cfg MiningExecCfg, quit <-c
}
}
logger.Debug("SpawnMiningExecStage", "block txn", current.Txs.Len(), "payload", cfg.payloadId)
logger.Debug("SpawnMiningExecStage", "block", current.Header.Number, "txn", current.Txs.Len(), "payload", cfg.payloadId)
if current.Uncles == nil {
current.Uncles = []*types.Header{}
}
@ -166,7 +166,8 @@ func SpawnMiningExecStage(s *StageState, tx kv.RwTx, cfg MiningExecCfg, quit <-c
if err != nil {
return err
}
logger.Debug("FinalizeBlockExecution", "current txn", current.Txs.Len(), "current receipt", current.Receipts.Len(), "payload", cfg.payloadId)
logger.Debug("FinalizeBlockExecution", "block", current.Header.Number, "txn", current.Txs.Len(), "gas", current.Header.GasUsed, "receipt", current.Receipts.Len(), "payload", cfg.payloadId)
// hack: pretend that we are real execution stage - next stages will rely on this progress
if err := stages.SaveStageProgress(tx, stages.Execution, current.Header.Number.Uint64()); err != nil {
@ -186,23 +187,20 @@ func getNextTransactions(
logger log.Logger,
) (types.TransactionsStream, int, error) {
txSlots := types2.TxsRlp{}
var onTime bool
count := 0
if err := cfg.txPool2DB.View(context.Background(), func(poolTx kv.Tx) error {
if err := cfg.txPoolDB.View(context.Background(), func(poolTx kv.Tx) error {
var err error
counter := 0
for !onTime && counter < 500 {
remainingGas := header.GasLimit - header.GasUsed
remainingBlobGas := uint64(0)
if header.BlobGasUsed != nil {
remainingBlobGas = cfg.chainConfig.GetMaxBlobGasPerBlock() - *header.BlobGasUsed
}
if onTime, count, err = cfg.txPool2.YieldBest(amount, &txSlots, poolTx, executionAt, remainingGas, remainingBlobGas, alreadyYielded); err != nil {
return err
}
time.Sleep(1 * time.Millisecond)
counter++
remainingGas := header.GasLimit - header.GasUsed
remainingBlobGas := uint64(0)
if header.BlobGasUsed != nil {
remainingBlobGas = cfg.chainConfig.GetMaxBlobGasPerBlock() - *header.BlobGasUsed
}
if _, count, err = cfg.txPool.YieldBest(amount, &txSlots, poolTx, executionAt, remainingGas, remainingBlobGas, alreadyYielded); err != nil {
return err
}
return nil
}); err != nil {
return nil, 0, err
@ -375,7 +373,6 @@ func addTransactionsToMiningBlock(logPrefix string, current *MiningBlock, chainC
gasSnap := gasPool.Gas()
blobGasSnap := gasPool.BlobGas()
snap := ibs.Snapshot()
logger.Debug("addTransactionsToMiningBlock", "txn hash", txn.Hash())
receipt, _, err := core.ApplyTransaction(&chainConfig, core.GetHashFn(header, getHeader), engine, &coinbase, gasPool, ibs, noop, header, txn, &header.GasUsed, header.BlobGasUsed, *vmConfig)
if err != nil {
ibs.RevertToSnapshot(snap)
@ -464,7 +461,7 @@ LOOP:
txs.Pop()
} else if err == nil {
// Everything ok, collect the logs and shift in the next transaction from the same account
logger.Debug(fmt.Sprintf("[%s] addTransactionsToMiningBlock Successful", logPrefix), "sender", from, "nonce", txn.GetNonce(), "payload", payloadId)
logger.Trace(fmt.Sprintf("[%s] Added transaction", logPrefix), "hash", txn.Hash(), "sender", from, "nonce", txn.GetNonce(), "payload", payloadId)
coalescedLogs = append(coalescedLogs, logs...)
tcount++
txs.Shift()

View File

@ -82,10 +82,10 @@ func SpawnMiningFinishStage(s *StageState, tx kv.RwTx, cfg MiningFinishCfg, quit
if block.Transactions().Len() > 0 {
logger.Info(fmt.Sprintf("[%s] block ready for seal", logPrefix),
"block_num", block.NumberU64(),
"block", block.NumberU64(),
"transactions", block.Transactions().Len(),
"gas_used", block.GasUsed(),
"gas_limit", block.GasLimit(),
"gasUsed", block.GasUsed(),
"gasLimit", block.GasLimit(),
"difficulty", block.Difficulty(),
)
}

View File

@ -1112,11 +1112,13 @@ func (c *Bor) Seal(chain consensus.ChainHeaderReader, block *types.Block, result
select {
case <-stop:
c.logger.Info("[bor] Stopped sealing operation for block", "number", number)
results <- nil
return
case <-time.After(delay):
if c.headerProgress != nil && c.headerProgress.Progress() >= number {
c.logger.Info("Discarding sealing operation for block", "number", number)
results <- nil
return
}

View File

@ -478,7 +478,7 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg
}
if c.Enabled {
logger.Info("starting HTTP APIs", "APIs", apis)
logger.Info("starting HTTP APIs", "port", c.HttpPort, "APIs", apis)
}
if ctx.IsSet(utils.HttpCompressionFlag.Name) {

View File

@ -1190,7 +1190,7 @@ func (r *BlockReader) Span(ctx context.Context, tx kv.Getter, spanId uint64) ([]
return nil, err
}
if v == nil {
return nil, fmt.Errorf("span %d not found (db), frosenBlocks=%d", spanId, maxBlockNumInFiles)
return nil, fmt.Errorf("span %d not found (db), frozenBlocks=%d", spanId, maxBlockNumInFiles)
}
return common.Copy(v), nil
}

View File

@ -329,6 +329,7 @@ func (h *Hook) afterRun(tx kv.Tx, finishProgressBefore uint64) error {
pendingBlobFee = f.Uint64()
}
h.logger.Debug("[hook] Sending state changes", "currentBlock", currentHeader.Number.Uint64(), "finalizedBlock", finalizedBlock)
notifications.Accumulator.SendAndReset(h.ctx, notifications.StateChangesConsumer, pendingBaseFee.Uint64(), pendingBlobFee, currentHeader.GasLimit, finalizedBlock)
}
// -- send notifications END