erigon-pulse/erigon-lib/txpool/pool.go
Shane Bammel 75db06fa1d Add support for PulseChain
Thanks @bretep for the original integration.
2024-02-14 13:10:45 -06:00

2866 lines
90 KiB
Go

/*
Copyright 2022 The Erigon contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package txpool
import (
"bytes"
"container/heap"
"context"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
gokzg4844 "github.com/crate-crypto/go-kzg-4844"
mapset "github.com/deckarep/golang-set/v2"
"github.com/go-stack/stack"
"github.com/google/btree"
"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/holiman/uint256"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/assert"
"github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/common/fixedgas"
"github.com/ledgerwatch/erigon-lib/common/u256"
libkzg "github.com/ledgerwatch/erigon-lib/crypto/kzg"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
proto_txpool "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/metrics"
"github.com/ledgerwatch/erigon-lib/txpool/txpoolcfg"
"github.com/ledgerwatch/erigon-lib/types"
)
const DefaultBlockGasLimit = uint64(30000000)
var (
processBatchTxsTimer = metrics.NewSummary(`pool_process_remote_txs`)
addRemoteTxsTimer = metrics.NewSummary(`pool_add_remote_txs`)
newBlockTimer = metrics.NewSummary(`pool_new_block`)
writeToDBTimer = metrics.NewSummary(`pool_write_to_db`)
propagateToNewPeerTimer = metrics.NewSummary(`pool_propagate_to_new_peer`)
propagateNewTxsTimer = metrics.NewSummary(`pool_propagate_new_txs`)
writeToDBBytesCounter = metrics.GetOrCreateGauge(`pool_write_to_db_bytes`)
pendingSubCounter = metrics.GetOrCreateGauge(`txpool_pending`)
queuedSubCounter = metrics.GetOrCreateGauge(`txpool_queued`)
basefeeSubCounter = metrics.GetOrCreateGauge(`txpool_basefee`)
)
var TraceAll = false
// Pool is interface for the transaction pool
// This interface exists for the convenience of testing, and not yet because
// there are multiple implementations
type Pool interface {
ValidateSerializedTxn(serializedTxn []byte) error
// Handle 3 main events - new remote txs from p2p, new local txs from RPC, new blocks from execution layer
AddRemoteTxs(ctx context.Context, newTxs types.TxSlots)
AddLocalTxs(ctx context.Context, newTxs types.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error)
OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, unwindBlobTxs, minedTxs types.TxSlots, tx kv.Tx) error
// IdHashKnown check whether transaction with given Id hash is known to the pool
IdHashKnown(tx kv.Tx, hash []byte) (bool, error)
FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error)
Started() bool
GetRlp(tx kv.Tx, hash []byte) ([]byte, error)
AddNewGoodPeer(peerID types.PeerID)
}
var _ Pool = (*TxPool)(nil) // compile-time interface check
// SubPoolMarker is an ordered bitset of five bits that's used to sort transactions into sub-pools. Bits meaning:
// 1. Absence of nonce gaps. Set to 1 for transactions whose nonce is N, state nonce for the sender is M, and there are transactions for all nonces between M and N from the same sender. Set to 0 is the transaction's nonce is divided from the state nonce by one or more nonce gaps.
// 2. Sufficient balance for gas. Set to 1 if the balance of sender's account in the state is B, nonce of the sender in the state is M, nonce of the transaction is N, and the sum of feeCap x gasLimit + transferred_value of all transactions from this sender with nonces N+1 ... M is no more than B. Set to 0 otherwise. In other words, this bit is set if there is currently a guarantee that the transaction and all its required prior transactions will be able to pay for gas.
// 3. Not too much gas: Set to 1 if the transaction doesn't use too much gas
// 4. Dynamic fee requirement. Set to 1 if feeCap of the transaction is no less than baseFee of the currently pending block. Set to 0 otherwise.
// 5. Local transaction. Set to 1 if transaction is local.
type SubPoolMarker uint8
const (
NoNonceGaps = 0b010000
EnoughBalance = 0b001000
NotTooMuchGas = 0b000100
EnoughFeeCapBlock = 0b000010
IsLocal = 0b000001
BaseFeePoolBits = NoNonceGaps + EnoughBalance + NotTooMuchGas
)
// metaTx holds transaction and some metadata
type metaTx struct {
Tx *types.TxSlot
minFeeCap uint256.Int
nonceDistance uint64 // how far their nonces are from the state's nonce for the sender
cumulativeBalanceDistance uint64 // how far their cumulativeRequiredBalance are from the state's balance for the sender
minTip uint64
bestIndex int
worstIndex int
timestamp uint64 // when it was added to pool
subPool SubPoolMarker
currentSubPool SubPoolType
minedBlockNum uint64
}
func newMetaTx(slot *types.TxSlot, isLocal bool, timestamp uint64) *metaTx {
mt := &metaTx{Tx: slot, worstIndex: -1, bestIndex: -1, timestamp: timestamp}
if isLocal {
mt.subPool = IsLocal
}
return mt
}
type SubPoolType uint8
const PendingSubPool SubPoolType = 1
const BaseFeeSubPool SubPoolType = 2
const QueuedSubPool SubPoolType = 3
func (sp SubPoolType) String() string {
switch sp {
case PendingSubPool:
return "Pending"
case BaseFeeSubPool:
return "BaseFee"
case QueuedSubPool:
return "Queued"
}
return fmt.Sprintf("Unknown:%d", sp)
}
// sender - immutable structure which stores only nonce and balance of account
type sender struct {
balance uint256.Int
nonce uint64
}
func newSender(nonce uint64, balance uint256.Int) *sender {
return &sender{nonce: nonce, balance: balance}
}
var emptySender = newSender(0, *uint256.NewInt(0))
func SortByNonceLess(a, b *metaTx) bool {
if a.Tx.SenderID != b.Tx.SenderID {
return a.Tx.SenderID < b.Tx.SenderID
}
return a.Tx.Nonce < b.Tx.Nonce
}
// TxPool - holds all pool-related data structures and lock-based tiny methods
// most of logic implemented by pure tests-friendly functions
//
// txpool doesn't start any goroutines - "leave concurrency to user" design
// txpool has no DB-TX fields - "leave db transactions management to user" design
// txpool has _chainDB field - but it must maximize local state cache hit-rate - and perform minimum _chainDB transactions
//
// It preserve TxSlot objects immutable
type TxPool struct {
_chainDB kv.RoDB // remote db - use it wisely
_stateCache kvcache.Cache
lock *sync.Mutex
recentlyConnectedPeers *recentlyConnectedPeers // all txs will be propagated to this peers eventually, and clear list
senders *sendersBatch
// batch processing of remote transactions
// handling is fast enough without batching, but batching allows:
// - fewer _chainDB transactions
// - batch notifications about new txs (reduced P2P spam to other nodes about txs propagation)
// - and as a result reducing lock contention
unprocessedRemoteTxs *types.TxSlots
unprocessedRemoteByHash map[string]int // to reject duplicates
byHash map[string]*metaTx // tx_hash => tx : only those records not committed to db yet
discardReasonsLRU *simplelru.LRU[string, txpoolcfg.DiscardReason] // tx_hash => discard_reason : non-persisted
pending *PendingPool
baseFee *SubPool
queued *SubPool
minedBlobTxsByBlock map[uint64][]*metaTx // (blockNum => slice): cache of recently mined blobs
minedBlobTxsByHash map[string]*metaTx // (hash => mt): map of recently mined blobs
isLocalLRU *simplelru.LRU[string, struct{}] // tx_hash => is_local : to restore isLocal flag of unwinded transactions
newPendingTxs chan types.Announcements // notifications about new txs in Pending sub-pool
all *BySenderAndNonce // senderID => (sorted map of tx nonce => *metaTx)
deletedTxs []*metaTx // list of discarded txs since last db commit
promoted types.Announcements
cfg txpoolcfg.Config
chainID uint256.Int
lastSeenBlock atomic.Uint64
lastSeenCond *sync.Cond
lastFinalizedBlock atomic.Uint64
started atomic.Bool
pendingBaseFee atomic.Uint64
pendingBlobFee atomic.Uint64 // For gas accounting for blobs, which has its own dimension
blockGasLimit atomic.Uint64
shanghaiTime *uint64
isPostShanghai atomic.Bool
agraBlock *uint64
isPostAgra atomic.Bool
cancunTime *uint64
isPostCancun atomic.Bool
maxBlobsPerBlock uint64
feeCalculator FeeCalculator
logger log.Logger
}
type FeeCalculator interface {
CurrentFees(chainConfig *chain.Config, db kv.Getter) (baseFee uint64, blobFee uint64, minBlobGasPrice, blockGasLimit uint64, err error)
}
func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, cache kvcache.Cache,
chainID uint256.Int, shanghaiTime, agraBlock, cancunTime *big.Int, maxBlobsPerBlock uint64,
feeCalculator FeeCalculator, logger log.Logger,
) (*TxPool, error) {
localsHistory, err := simplelru.NewLRU[string, struct{}](10_000, nil)
if err != nil {
return nil, err
}
discardHistory, err := simplelru.NewLRU[string, txpoolcfg.DiscardReason](10_000, nil)
if err != nil {
return nil, err
}
byNonce := &BySenderAndNonce{
tree: btree.NewG[*metaTx](32, SortByNonceLess),
search: &metaTx{Tx: &types.TxSlot{}},
senderIDTxnCount: map[uint64]int{},
senderIDBlobCount: map[uint64]uint64{},
}
tracedSenders := make(map[common.Address]struct{})
for _, sender := range cfg.TracedSenders {
tracedSenders[common.BytesToAddress([]byte(sender))] = struct{}{}
}
lock := &sync.Mutex{}
res := &TxPool{
lock: lock,
lastSeenCond: sync.NewCond(lock),
byHash: map[string]*metaTx{},
isLocalLRU: localsHistory,
discardReasonsLRU: discardHistory,
all: byNonce,
recentlyConnectedPeers: &recentlyConnectedPeers{},
pending: NewPendingSubPool(PendingSubPool, cfg.PendingSubPoolLimit),
baseFee: NewSubPool(BaseFeeSubPool, cfg.BaseFeeSubPoolLimit),
queued: NewSubPool(QueuedSubPool, cfg.QueuedSubPoolLimit),
newPendingTxs: newTxs,
_stateCache: cache,
senders: newSendersCache(tracedSenders),
_chainDB: coreDB,
cfg: cfg,
chainID: chainID,
unprocessedRemoteTxs: &types.TxSlots{},
unprocessedRemoteByHash: map[string]int{},
minedBlobTxsByBlock: map[uint64][]*metaTx{},
minedBlobTxsByHash: map[string]*metaTx{},
maxBlobsPerBlock: maxBlobsPerBlock,
feeCalculator: feeCalculator,
logger: logger,
}
if shanghaiTime != nil {
if !shanghaiTime.IsUint64() {
return nil, errors.New("shanghaiTime overflow")
}
shanghaiTimeU64 := shanghaiTime.Uint64()
res.shanghaiTime = &shanghaiTimeU64
}
if agraBlock != nil {
if !agraBlock.IsUint64() {
return nil, errors.New("agraBlock overflow")
}
agraBlockU64 := agraBlock.Uint64()
res.agraBlock = &agraBlockU64
}
if cancunTime != nil {
if !cancunTime.IsUint64() {
return nil, errors.New("cancunTime overflow")
}
cancunTimeU64 := cancunTime.Uint64()
res.cancunTime = &cancunTimeU64
}
return res, nil
}
func (p *TxPool) Start(ctx context.Context, db kv.RwDB) error {
if p.started.Load() {
return nil
}
return db.View(ctx, func(tx kv.Tx) error {
coreDb, _ := p.coreDBWithCache()
coreTx, err := coreDb.BeginRo(ctx)
if err != nil {
return err
}
defer coreTx.Rollback()
if err := p.fromDB(ctx, tx, coreTx); err != nil {
return fmt.Errorf("loading pool from DB: %w", err)
}
if p.started.CompareAndSwap(false, true) {
p.logger.Info("[txpool] Started")
}
return nil
})
}
func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, unwindBlobTxs, minedTxs types.TxSlots, tx kv.Tx) error {
defer newBlockTimer.ObserveDuration(time.Now())
//t := time.Now()
coreDB, cache := p.coreDBWithCache()
cache.OnNewBlock(stateChanges)
coreTx, err := coreDB.BeginRo(ctx)
if err != nil {
return err
}
defer coreTx.Rollback()
block := stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].BlockHeight
baseFee := stateChanges.PendingBlockBaseFee
available := len(p.pending.best.ms)
defer func() {
p.logger.Debug("[txpool] New block", "block", block, "unwound", len(unwindTxs.Txs), "mined", len(minedTxs.Txs), "baseFee", baseFee, "pending-pre", available, "pending", p.pending.Len(), "baseFee", p.baseFee.Len(), "queued", p.queued.Len(), "err", err)
}()
if err = minedTxs.Valid(); err != nil {
return err
}
cacheView, err := cache.View(ctx, coreTx)
if err != nil {
return err
}
p.lock.Lock()
defer func() {
if err == nil {
p.lastSeenBlock.Store(block)
p.lastSeenCond.Broadcast()
}
p.lock.Unlock()
}()
if assert.Enable {
if _, err := kvcache.AssertCheckValues(ctx, coreTx, cache); err != nil {
p.logger.Error("AssertCheckValues", "err", err, "stack", stack.Trace().String())
}
}
pendingBaseFee, baseFeeChanged := p.setBaseFee(baseFee)
// Update pendingBase for all pool queues and slices
if baseFeeChanged {
p.pending.best.pendingBaseFee = pendingBaseFee
p.pending.worst.pendingBaseFee = pendingBaseFee
p.baseFee.best.pendingBastFee = pendingBaseFee
p.baseFee.worst.pendingBaseFee = pendingBaseFee
p.queued.best.pendingBastFee = pendingBaseFee
p.queued.worst.pendingBaseFee = pendingBaseFee
}
pendingBlobFee := stateChanges.PendingBlobFeePerGas
p.setBlobFee(pendingBlobFee)
p.blockGasLimit.Store(stateChanges.BlockGasLimit)
for i, txn := range unwindBlobTxs.Txs {
if txn.Type == types.BlobTxType {
knownBlobTxn, err := p.getCachedBlobTxnLocked(coreTx, txn.IDHash[:])
if err != nil {
return err
}
if knownBlobTxn != nil {
unwindTxs.Append(knownBlobTxn.Tx, unwindBlobTxs.Senders.At(i), false)
}
}
}
if err = p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs, p.logger); err != nil {
return err
}
_, unwindTxs, err = p.validateTxs(&unwindTxs, cacheView)
if err != nil {
return err
}
if assert.Enable {
for _, txn := range unwindTxs.Txs {
if txn.SenderID == 0 {
panic(fmt.Errorf("onNewBlock.unwindTxs: senderID can't be zero"))
}
}
for _, txn := range minedTxs.Txs {
if txn.SenderID == 0 {
panic(fmt.Errorf("onNewBlock.minedTxs: senderID can't be zero"))
}
}
}
if err = p.processMinedFinalizedBlobs(coreTx, minedTxs.Txs, stateChanges.FinalizedBlock); err != nil {
return err
}
if err = p.removeMined(p.all, minedTxs.Txs); err != nil {
return err
}
var announcements types.Announcements
announcements, err = p.addTxsOnNewBlock(block, cacheView, stateChanges, p.senders, unwindTxs, /* newTxs */
pendingBaseFee, stateChanges.BlockGasLimit, p.logger)
if err != nil {
return err
}
p.pending.EnforceWorstInvariants()
p.baseFee.EnforceInvariants()
p.queued.EnforceInvariants()
p.promote(pendingBaseFee, pendingBlobFee, &announcements, p.logger)
p.pending.EnforceBestInvariants()
p.promoted.Reset()
p.promoted.AppendOther(announcements)
if p.promoted.Len() > 0 {
select {
case p.newPendingTxs <- p.promoted.Copy():
default:
}
}
return nil
}
func (p *TxPool) processRemoteTxs(ctx context.Context) error {
if !p.Started() {
return fmt.Errorf("txpool not started yet")
}
defer processBatchTxsTimer.ObserveDuration(time.Now())
coreDB, cache := p.coreDBWithCache()
coreTx, err := coreDB.BeginRo(ctx)
if err != nil {
return err
}
defer coreTx.Rollback()
cacheView, err := cache.View(ctx, coreTx)
if err != nil {
return err
}
//t := time.Now()
p.lock.Lock()
defer p.lock.Unlock()
l := len(p.unprocessedRemoteTxs.Txs)
if l == 0 {
return nil
}
err = p.senders.registerNewSenders(p.unprocessedRemoteTxs, p.logger)
if err != nil {
return err
}
_, newTxs, err := p.validateTxs(p.unprocessedRemoteTxs, cacheView)
if err != nil {
return err
}
announcements, _, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs,
p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), true, p.logger)
if err != nil {
return err
}
p.promoted.Reset()
p.promoted.AppendOther(announcements)
if p.promoted.Len() > 0 {
select {
case <-ctx.Done():
return nil
case p.newPendingTxs <- p.promoted.Copy():
default:
}
}
p.unprocessedRemoteTxs.Resize(0)
p.unprocessedRemoteByHash = map[string]int{}
//p.logger.Info("[txpool] on new txs", "amount", len(newPendingTxs.txs), "in", time.Since(t))
return nil
}
func (p *TxPool) getRlpLocked(tx kv.Tx, hash []byte) (rlpTxn []byte, sender common.Address, isLocal bool, err error) {
txn, ok := p.byHash[string(hash)]
if ok && txn.Tx.Rlp != nil {
return txn.Tx.Rlp, p.senders.senderID2Addr[txn.Tx.SenderID], txn.subPool&IsLocal > 0, nil
}
v, err := tx.GetOne(kv.PoolTransaction, hash)
if err != nil {
return nil, common.Address{}, false, err
}
if v == nil {
return nil, common.Address{}, false, nil
}
return v[20:], *(*[20]byte)(v[:20]), txn != nil && txn.subPool&IsLocal > 0, nil
}
func (p *TxPool) GetRlp(tx kv.Tx, hash []byte) ([]byte, error) {
p.lock.Lock()
defer p.lock.Unlock()
rlpTx, _, _, err := p.getRlpLocked(tx, hash)
return common.Copy(rlpTx), err
}
func (p *TxPool) AppendLocalAnnouncements(types []byte, sizes []uint32, hashes []byte) ([]byte, []uint32, []byte) {
p.lock.Lock()
defer p.lock.Unlock()
for hash, txn := range p.byHash {
if txn.subPool&IsLocal == 0 {
continue
}
types = append(types, txn.Tx.Type)
sizes = append(sizes, txn.Tx.Size)
hashes = append(hashes, hash...)
}
return types, sizes, hashes
}
func (p *TxPool) AppendRemoteAnnouncements(types []byte, sizes []uint32, hashes []byte) ([]byte, []uint32, []byte) {
p.lock.Lock()
defer p.lock.Unlock()
for hash, txn := range p.byHash {
if txn.subPool&IsLocal != 0 {
continue
}
types = append(types, txn.Tx.Type)
sizes = append(sizes, txn.Tx.Size)
hashes = append(hashes, hash...)
}
for hash, txIdx := range p.unprocessedRemoteByHash {
txSlot := p.unprocessedRemoteTxs.Txs[txIdx]
types = append(types, txSlot.Type)
sizes = append(sizes, txSlot.Size)
hashes = append(hashes, hash...)
}
return types, sizes, hashes
}
func (p *TxPool) AppendAllAnnouncements(types []byte, sizes []uint32, hashes []byte) ([]byte, []uint32, []byte) {
types, sizes, hashes = p.AppendLocalAnnouncements(types, sizes, hashes)
types, sizes, hashes = p.AppendRemoteAnnouncements(types, sizes, hashes)
return types, sizes, hashes
}
func (p *TxPool) idHashKnown(tx kv.Tx, hash []byte, hashS string) (bool, error) {
if _, ok := p.unprocessedRemoteByHash[hashS]; ok {
return true, nil
}
if _, ok := p.discardReasonsLRU.Get(hashS); ok {
return true, nil
}
if _, ok := p.byHash[hashS]; ok {
return true, nil
}
if _, ok := p.minedBlobTxsByHash[hashS]; ok {
return true, nil
}
return tx.Has(kv.PoolTransaction, hash)
}
func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) {
hashS := string(hash)
p.lock.Lock()
defer p.lock.Unlock()
return p.idHashKnown(tx, hash, hashS)
}
func (p *TxPool) FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error) {
p.lock.Lock()
defer p.lock.Unlock()
for i := 0; i < len(hashes); i += 32 {
known, err := p.idHashKnown(tx, hashes[i:i+32], string(hashes[i:i+32]))
if err != nil {
return unknownHashes, err
}
if !known {
unknownHashes = append(unknownHashes, hashes[i:i+32]...)
}
}
return unknownHashes, err
}
func (p *TxPool) getUnprocessedTxn(hashS string) (*types.TxSlot, bool) {
if i, ok := p.unprocessedRemoteByHash[hashS]; ok {
return p.unprocessedRemoteTxs.Txs[i], true
}
return nil, false
}
func (p *TxPool) getCachedBlobTxnLocked(tx kv.Tx, hash []byte) (*metaTx, error) {
hashS := string(hash)
if mt, ok := p.minedBlobTxsByHash[hashS]; ok {
return mt, nil
}
if txn, ok := p.getUnprocessedTxn(hashS); ok {
return newMetaTx(txn, false, 0), nil
}
if mt, ok := p.byHash[hashS]; ok {
return mt, nil
}
has, err := tx.Has(kv.PoolTransaction, hash)
if err != nil {
return nil, err
}
if !has {
return nil, nil
}
txn, err := tx.GetOne(kv.PoolTransaction, hash)
if err != nil {
return nil, err
}
parseCtx := types.NewTxParseContext(p.chainID, false)
parseCtx.WithSender(false)
txSlot := &types.TxSlot{}
parseCtx.ParseTransaction(txn, 0, txSlot, nil, false, true, nil)
return newMetaTx(txSlot, false, 0), nil
}
func (p *TxPool) IsLocal(idHash []byte) bool {
hashS := string(idHash)
p.lock.Lock()
defer p.lock.Unlock()
return p.isLocalLRU.Contains(hashS)
}
func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) }
func (p *TxPool) Started() bool { return p.started.Load() }
func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, yielded mapset.Set[[32]byte]) (bool, int, error) {
p.lock.Lock()
defer p.lock.Unlock()
for last := p.lastSeenBlock.Load(); last < onTopOf; last = p.lastSeenBlock.Load() {
p.logger.Debug("[txpool] Waiting for block", "expecting", onTopOf, "lastSeen", last, "txRequested", n, "pending", p.pending.Len(), "baseFee", p.baseFee.Len(), "queued", p.queued.Len())
p.lastSeenCond.Wait()
}
best := p.pending.best
isShanghai := p.isShanghai() || p.isAgra()
txs.Resize(uint(cmp.Min(int(n), len(best.ms))))
var toRemove []*metaTx
count := 0
i := 0
defer func() {
p.logger.Debug("[txpool] Processing best request", "last", onTopOf, "txRequested", n, "txAvailable", len(best.ms), "txProcessed", i, "txReturned", count)
}()
for ; count < int(n) && i < len(best.ms); i++ {
// if we wouldn't have enough gas for a standard transaction then quit out early
if availableGas < fixedgas.TxGas {
break
}
mt := best.ms[i]
if yielded.Contains(mt.Tx.IDHash) {
continue
}
if mt.Tx.Gas >= p.blockGasLimit.Load() {
// Skip transactions with very large gas limit
continue
}
rlpTx, sender, isLocal, err := p.getRlpLocked(tx, mt.Tx.IDHash[:])
if err != nil {
return false, count, err
}
if len(rlpTx) == 0 {
toRemove = append(toRemove, mt)
continue
}
// Skip transactions that require more blob gas than is available
blobCount := uint64(len(mt.Tx.BlobHashes))
if blobCount*fixedgas.BlobGasPerBlob > availableBlobGas {
continue
}
availableBlobGas -= blobCount * fixedgas.BlobGasPerBlob
// make sure we have enough gas in the caller to add this transaction.
// not an exact science using intrinsic gas but as close as we could hope for at
// this stage
intrinsicGas, _ := txpoolcfg.CalcIntrinsicGas(uint64(mt.Tx.DataLen), uint64(mt.Tx.DataNonZeroLen), nil, mt.Tx.Creation, true, true, isShanghai)
if intrinsicGas > availableGas {
// we might find another TX with a low enough intrinsic gas to include so carry on
continue
}
availableGas -= intrinsicGas
txs.Txs[count] = rlpTx
copy(txs.Senders.At(count), sender.Bytes())
txs.IsLocal[count] = isLocal
yielded.Add(mt.Tx.IDHash)
count++
}
txs.Resize(uint(count))
if len(toRemove) > 0 {
for _, mt := range toRemove {
p.pending.Remove(mt, "best", p.logger)
}
}
return true, count, nil
}
func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) {
return p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, toSkip)
}
func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64) (bool, error) {
set := mapset.NewThreadUnsafeSet[[32]byte]()
onTime, _, err := p.YieldBest(n, txs, tx, onTopOf, availableGas, availableBlobGas, set)
return onTime, err
}
func (p *TxPool) CountContent() (int, int, int) {
p.lock.Lock()
defer p.lock.Unlock()
return p.pending.Len(), p.baseFee.Len(), p.queued.Len()
}
func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs types.TxSlots) {
if p.cfg.NoGossip {
// if no gossip, then
// disable adding remote transactions
// consume remote tx from fetch
return
}
defer addRemoteTxsTimer.ObserveDuration(time.Now())
p.lock.Lock()
defer p.lock.Unlock()
for i, txn := range newTxs.Txs {
hashS := string(txn.IDHash[:])
_, ok := p.unprocessedRemoteByHash[hashS]
if ok {
continue
}
p.unprocessedRemoteByHash[hashS] = len(p.unprocessedRemoteTxs.Txs)
p.unprocessedRemoteTxs.Append(txn, newTxs.Senders.At(i), false)
}
}
func toBlobs(_blobs [][]byte) []gokzg4844.Blob {
blobs := make([]gokzg4844.Blob, len(_blobs))
for i, _blob := range _blobs {
var b gokzg4844.Blob
copy(b[:], _blob)
blobs[i] = b
}
return blobs
}
func (p *TxPool) validateTx(txn *types.TxSlot, isLocal bool, stateCache kvcache.CacheView) txpoolcfg.DiscardReason {
isShanghai := p.isShanghai() || p.isAgra()
if isShanghai {
if txn.DataLen > fixedgas.MaxInitCodeSize {
return txpoolcfg.InitCodeTooLarge
}
}
if txn.Type == types.BlobTxType {
if !p.isCancun() {
return txpoolcfg.TypeNotActivated
}
if txn.Creation {
return txpoolcfg.CreateBlobTxn
}
blobCount := uint64(len(txn.BlobHashes))
if blobCount == 0 {
return txpoolcfg.NoBlobs
}
if blobCount > p.maxBlobsPerBlock {
return txpoolcfg.TooManyBlobs
}
equalNumber := len(txn.BlobHashes) == len(txn.Blobs) &&
len(txn.Blobs) == len(txn.Commitments) &&
len(txn.Commitments) == len(txn.Proofs)
if !equalNumber {
return txpoolcfg.UnequalBlobTxExt
}
for i := 0; i < len(txn.Commitments); i++ {
if libkzg.KZGToVersionedHash(txn.Commitments[i]) != libkzg.VersionedHash(txn.BlobHashes[i]) {
return txpoolcfg.BlobHashCheckFail
}
}
// https://github.com/ethereum/consensus-specs/blob/017a8495f7671f5fff2075a9bfc9238c1a0982f8/specs/deneb/polynomial-commitments.md#verify_blob_kzg_proof_batch
kzgCtx := libkzg.Ctx()
err := kzgCtx.VerifyBlobKZGProofBatch(toBlobs(txn.Blobs), txn.Commitments, txn.Proofs)
if err != nil {
return txpoolcfg.UnmatchedBlobTxExt
}
}
// Drop non-local transactions under our own minimal accepted gas price or tip
if !isLocal && uint256.NewInt(p.cfg.MinFeeCap).Cmp(&txn.FeeCap) == 1 {
if txn.Traced {
p.logger.Info(fmt.Sprintf("TX TRACING: validateTx underpriced idHash=%x local=%t, feeCap=%d, cfg.MinFeeCap=%d", txn.IDHash, isLocal, txn.FeeCap, p.cfg.MinFeeCap))
}
return txpoolcfg.UnderPriced
}
gas, reason := txpoolcfg.CalcIntrinsicGas(uint64(txn.DataLen), uint64(txn.DataNonZeroLen), nil, txn.Creation, true, true, isShanghai)
if txn.Traced {
p.logger.Info(fmt.Sprintf("TX TRACING: validateTx intrinsic gas idHash=%x gas=%d", txn.IDHash, gas))
}
if reason != txpoolcfg.Success {
if txn.Traced {
p.logger.Info(fmt.Sprintf("TX TRACING: validateTx intrinsic gas calculated failed idHash=%x reason=%s", txn.IDHash, reason))
}
return reason
}
if gas > txn.Gas {
if txn.Traced {
p.logger.Info(fmt.Sprintf("TX TRACING: validateTx intrinsic gas > txn.gas idHash=%x gas=%d, txn.gas=%d", txn.IDHash, gas, txn.Gas))
}
return txpoolcfg.IntrinsicGas
}
if !isLocal && uint64(p.all.count(txn.SenderID)) > p.cfg.AccountSlots {
if txn.Traced {
p.logger.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming idHash=%x slots=%d, limit=%d", txn.IDHash, p.all.count(txn.SenderID), p.cfg.AccountSlots))
}
return txpoolcfg.Spammer
}
if !isLocal && p.all.blobCount(txn.SenderID) > p.cfg.BlobSlots {
if txn.Traced {
p.logger.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming (too many blobs) idHash=%x slots=%d, limit=%d", txn.IDHash, p.all.count(txn.SenderID), p.cfg.AccountSlots))
}
return txpoolcfg.Spammer
}
// check nonce and balance
senderNonce, senderBalance, _ := p.senders.info(stateCache, txn.SenderID)
if senderNonce > txn.Nonce {
if txn.Traced {
p.logger.Info(fmt.Sprintf("TX TRACING: validateTx nonce too low idHash=%x nonce in state=%d, txn.nonce=%d", txn.IDHash, senderNonce, txn.Nonce))
}
return txpoolcfg.NonceTooLow
}
// Transactor should have enough funds to cover the costs
total := requiredBalance(txn)
if senderBalance.Cmp(total) < 0 {
if txn.Traced {
p.logger.Info(fmt.Sprintf("TX TRACING: validateTx insufficient funds idHash=%x balance in state=%d, txn.gas*txn.tip=%d", txn.IDHash, senderBalance, total))
}
return txpoolcfg.InsufficientFunds
}
return txpoolcfg.Success
}
var maxUint256 = new(uint256.Int).SetAllOne()
// Sender should have enough balance for: gasLimit x feeCap + blobGas x blobFeeCap + transferred_value
// See YP, Eq (61) in Section 6.2 "Execution"
func requiredBalance(txn *types.TxSlot) *uint256.Int {
// See https://github.com/ethereum/EIPs/pull/3594
total := uint256.NewInt(txn.Gas)
_, overflow := total.MulOverflow(total, &txn.FeeCap)
if overflow {
return maxUint256
}
// and https://eips.ethereum.org/EIPS/eip-4844#gas-accounting
blobCount := uint64(len(txn.BlobHashes))
if blobCount != 0 {
maxBlobGasCost := uint256.NewInt(fixedgas.BlobGasPerBlob)
maxBlobGasCost.Mul(maxBlobGasCost, uint256.NewInt(blobCount))
_, overflow = maxBlobGasCost.MulOverflow(maxBlobGasCost, &txn.BlobFeeCap)
if overflow {
return maxUint256
}
_, overflow = total.AddOverflow(total, maxBlobGasCost)
if overflow {
return maxUint256
}
}
_, overflow = total.AddOverflow(total, &txn.Value)
if overflow {
return maxUint256
}
return total
}
func (p *TxPool) isShanghai() bool {
// once this flag has been set for the first time we no longer need to check the timestamp
set := p.isPostShanghai.Load()
if set {
return true
}
if p.shanghaiTime == nil {
return false
}
shanghaiTime := *p.shanghaiTime
// a zero here means Shanghai is always active
if shanghaiTime == 0 {
p.isPostShanghai.Swap(true)
return true
}
now := time.Now().Unix()
activated := uint64(now) >= shanghaiTime
if activated {
p.isPostShanghai.Swap(true)
}
return activated
}
func (p *TxPool) isAgra() bool {
// once this flag has been set for the first time we no longer need to check the block
set := p.isPostAgra.Load()
if set {
return true
}
if p.agraBlock == nil {
return false
}
agraBlock := *p.agraBlock
// a zero here means Agra is always active
if agraBlock == 0 {
p.isPostAgra.Swap(true)
return true
}
tx, err := p._chainDB.BeginRo(context.Background())
if err != nil {
return false
}
defer tx.Rollback()
head_block, err := chain.CurrentBlockNumber(tx)
if head_block == nil || err != nil {
return false
}
// A new block is built on top of the head block, so when the head is agraBlock-1,
// the new block should use the Agra rules.
activated := (*head_block + 1) >= agraBlock
if activated {
p.isPostAgra.Swap(true)
}
return activated
}
func (p *TxPool) isCancun() bool {
// once this flag has been set for the first time we no longer need to check the timestamp
set := p.isPostCancun.Load()
if set {
return true
}
if p.cancunTime == nil {
return false
}
cancunTime := *p.cancunTime
// a zero here means Cancun is always active
if cancunTime == 0 {
p.isPostCancun.Swap(true)
return true
}
now := time.Now().Unix()
activated := uint64(now) >= cancunTime
if activated {
p.isPostCancun.Swap(true)
}
return activated
}
// Check that that the serialized txn should not exceed a certain max size
func (p *TxPool) ValidateSerializedTxn(serializedTxn []byte) error {
const (
// txSlotSize is used to calculate how many data slots a single transaction
// takes up based on its size. The slots are used as DoS protection, ensuring
// that validating a new transaction remains a constant operation (in reality
// O(maxslots), where max slots are 4 currently).
txSlotSize = 32 * 1024
// txMaxSize is the maximum size a single transaction can have. This field has
// non-trivial consequences: larger transactions are significantly harder and
// more expensive to propagate; larger transactions also take more resources
// to validate whether they fit into the pool or not.
txMaxSize = 4 * txSlotSize // 128KB
// Should be enough for a transaction with 6 blobs
blobTxMaxSize = 800_000
)
txType, err := types.PeekTransactionType(serializedTxn)
if err != nil {
return err
}
maxSize := txMaxSize
if txType == types.BlobTxType {
maxSize = blobTxMaxSize
}
if len(serializedTxn) > maxSize {
return types.ErrRlpTooBig
}
return nil
}
func (p *TxPool) validateTxs(txs *types.TxSlots, stateCache kvcache.CacheView) (reasons []txpoolcfg.DiscardReason, goodTxs types.TxSlots, err error) {
// reasons is pre-sized for direct indexing, with the default zero
// value DiscardReason of NotSet
reasons = make([]txpoolcfg.DiscardReason, len(txs.Txs))
if err := txs.Valid(); err != nil {
return reasons, goodTxs, err
}
goodCount := 0
for i, txn := range txs.Txs {
reason := p.validateTx(txn, txs.IsLocal[i], stateCache)
if reason == txpoolcfg.Success {
goodCount++
// Success here means no DiscardReason yet, so leave it NotSet
continue
}
if reason == txpoolcfg.Spammer {
p.punishSpammer(txn.SenderID)
}
reasons[i] = reason
}
goodTxs.Resize(uint(goodCount))
j := 0
for i, txn := range txs.Txs {
if reasons[i] == txpoolcfg.NotSet {
goodTxs.Txs[j] = txn
goodTxs.IsLocal[j] = txs.IsLocal[i]
copy(goodTxs.Senders.At(j), txs.Senders.At(i))
j++
}
}
return reasons, goodTxs, nil
}
// punishSpammer by drop half of it's transactions with high nonce
func (p *TxPool) punishSpammer(spammer uint64) {
count := p.all.count(spammer) / 2
if count > 0 {
txsToDelete := make([]*metaTx, 0, count)
p.all.descend(spammer, func(mt *metaTx) bool {
txsToDelete = append(txsToDelete, mt)
count--
return count > 0
})
for _, mt := range txsToDelete {
switch mt.currentSubPool {
case PendingSubPool:
p.pending.Remove(mt, "punishSpammer", p.logger)
case BaseFeeSubPool:
p.baseFee.Remove(mt, "punishSpammer", p.logger)
case QueuedSubPool:
p.queued.Remove(mt, "punishSpammer", p.logger)
default:
//already removed
}
p.discardLocked(mt, txpoolcfg.Spammer) // can't call it while iterating by all
}
}
}
func fillDiscardReasons(reasons []txpoolcfg.DiscardReason, newTxs types.TxSlots, discardReasonsLRU *simplelru.LRU[string, txpoolcfg.DiscardReason]) []txpoolcfg.DiscardReason {
for i := range reasons {
if reasons[i] != txpoolcfg.NotSet {
continue
}
reason, ok := discardReasonsLRU.Get(string(newTxs.Txs[i].IDHash[:]))
if ok {
reasons[i] = reason
} else {
reasons[i] = txpoolcfg.Success
}
}
return reasons
}
func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error) {
coreDb, cache := p.coreDBWithCache()
coreTx, err := coreDb.BeginRo(ctx)
if err != nil {
return nil, err
}
defer coreTx.Rollback()
cacheView, err := cache.View(ctx, coreTx)
if err != nil {
return nil, err
}
p.lock.Lock()
defer p.lock.Unlock()
if err = p.senders.registerNewSenders(&newTransactions, p.logger); err != nil {
return nil, err
}
reasons, newTxs, err := p.validateTxs(&newTransactions, cacheView)
if err != nil {
return nil, err
}
announcements, addReasons, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs,
p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), true, p.logger)
if err == nil {
for i, reason := range addReasons {
if reason != txpoolcfg.NotSet {
reasons[i] = reason
}
}
} else {
return nil, err
}
p.promoted.Reset()
p.promoted.AppendOther(announcements)
reasons = fillDiscardReasons(reasons, newTxs, p.discardReasonsLRU)
for i, reason := range reasons {
if reason == txpoolcfg.Success {
txn := newTxs.Txs[i]
if txn.Traced {
p.logger.Info(fmt.Sprintf("TX TRACING: AddLocalTxs promotes idHash=%x, senderId=%d", txn.IDHash, txn.SenderID))
}
p.promoted.Append(txn.Type, txn.Size, txn.IDHash[:])
}
}
if p.promoted.Len() > 0 {
select {
case p.newPendingTxs <- p.promoted.Copy():
default:
}
}
return reasons, nil
}
func (p *TxPool) coreDBWithCache() (kv.RoDB, kvcache.Cache) {
p.lock.Lock()
defer p.lock.Unlock()
return p._chainDB, p._stateCache
}
func (p *TxPool) addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
newTxs types.TxSlots, pendingBaseFee, pendingBlobFee, blockGasLimit uint64, collect bool, logger log.Logger) (types.Announcements, []txpoolcfg.DiscardReason, error) {
if assert.Enable {
for _, txn := range newTxs.Txs {
if txn.SenderID == 0 {
panic(fmt.Errorf("senderID can't be zero"))
}
}
}
// This can be thought of a reverse operation from the one described before.
// When a block that was deemed "the best" of its height, is no longer deemed "the best", the
// transactions contained in it, are now viable for inclusion in other blocks, and therefore should
// be returned into the transaction pool.
// An interesting note here is that if the block contained any transactions local to the node,
// by being first removed from the pool (from the "local" part of it), and then re-injected,
// they effective lose their priority over the "remote" transactions. In order to prevent that,
// somehow the fact that certain transactions were local, needs to be remembered for some
// time (up to some "immutability threshold").
sendersWithChangedState := map[uint64]struct{}{}
discardReasons := make([]txpoolcfg.DiscardReason, len(newTxs.Txs))
announcements := types.Announcements{}
for i, txn := range newTxs.Txs {
if found, ok := p.byHash[string(txn.IDHash[:])]; ok {
discardReasons[i] = txpoolcfg.DuplicateHash
// In case if the transition is stuck, "poke" it to rebroadcast
if collect && newTxs.IsLocal[i] && (found.currentSubPool == PendingSubPool || found.currentSubPool == BaseFeeSubPool) {
announcements.Append(found.Tx.Type, found.Tx.Size, found.Tx.IDHash[:])
}
continue
}
mt := newMetaTx(txn, newTxs.IsLocal[i], blockNum)
if reason := p.addLocked(mt, &announcements); reason != txpoolcfg.NotSet {
discardReasons[i] = reason
continue
}
discardReasons[i] = txpoolcfg.NotSet // unnecessary
if txn.Traced {
logger.Info(fmt.Sprintf("TX TRACING: schedule sendersWithChangedState idHash=%x senderId=%d", txn.IDHash, mt.Tx.SenderID))
}
sendersWithChangedState[mt.Tx.SenderID] = struct{}{}
}
for senderID := range sendersWithChangedState {
nonce, balance, err := senders.info(cacheView, senderID)
if err != nil {
return announcements, discardReasons, err
}
p.onSenderStateChange(senderID, nonce, balance, blockGasLimit, logger)
}
p.promote(pendingBaseFee, pendingBlobFee, &announcements, logger)
p.pending.EnforceBestInvariants()
return announcements, discardReasons, nil
}
// TODO: Looks like a copy of the above
func (p *TxPool) addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges *remote.StateChangeBatch,
senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee uint64, blockGasLimit uint64, logger log.Logger) (types.Announcements, error) {
if assert.Enable {
for _, txn := range newTxs.Txs {
if txn.SenderID == 0 {
panic(fmt.Errorf("senderID can't be zero"))
}
}
}
// This can be thought of a reverse operation from the one described before.
// When a block that was deemed "the best" of its height, is no longer deemed "the best", the
// transactions contained in it, are now viable for inclusion in other blocks, and therefore should
// be returned into the transaction pool.
// An interesting note here is that if the block contained any transactions local to the node,
// by being first removed from the pool (from the "local" part of it), and then re-injected,
// they effective lose their priority over the "remote" transactions. In order to prevent that,
// somehow the fact that certain transactions were local, needs to be remembered for some
// time (up to some "immutability threshold").
sendersWithChangedState := map[uint64]struct{}{}
announcements := types.Announcements{}
for i, txn := range newTxs.Txs {
if _, ok := p.byHash[string(txn.IDHash[:])]; ok {
continue
}
mt := newMetaTx(txn, newTxs.IsLocal[i], blockNum)
if reason := p.addLocked(mt, &announcements); reason != txpoolcfg.NotSet {
p.discardLocked(mt, reason)
continue
}
sendersWithChangedState[mt.Tx.SenderID] = struct{}{}
}
// add senders changed in state to `sendersWithChangedState` list
for _, changesList := range stateChanges.ChangeBatch {
for _, change := range changesList.Changes {
switch change.Action {
case remote.Action_UPSERT, remote.Action_UPSERT_CODE:
if change.Incarnation > 0 {
continue
}
addr := gointerfaces.ConvertH160toAddress(change.Address)
id, ok := senders.getID(addr)
if !ok {
continue
}
sendersWithChangedState[id] = struct{}{}
}
}
}
for senderID := range sendersWithChangedState {
nonce, balance, err := senders.info(cacheView, senderID)
if err != nil {
return announcements, err
}
p.onSenderStateChange(senderID, nonce, balance, blockGasLimit, logger)
}
return announcements, nil
}
func (p *TxPool) setBaseFee(baseFee uint64) (uint64, bool) {
changed := false
if baseFee > 0 {
changed = baseFee != p.pendingBaseFee.Load()
p.pendingBaseFee.Store(baseFee)
}
return p.pendingBaseFee.Load(), changed
}
func (p *TxPool) setBlobFee(blobFee uint64) {
if blobFee > 0 {
p.pendingBlobFee.Store(blobFee)
}
}
func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoolcfg.DiscardReason {
// Insert to pending pool, if pool doesn't have txn with same Nonce and bigger Tip
found := p.all.get(mt.Tx.SenderID, mt.Tx.Nonce)
if found != nil {
if found.Tx.Type == types.BlobTxType && mt.Tx.Type != types.BlobTxType {
return txpoolcfg.BlobTxReplace
}
priceBump := p.cfg.PriceBump
//Blob txn threshold checks for replace txn
if mt.Tx.Type == types.BlobTxType {
priceBump = p.cfg.BlobPriceBump
blobFeeThreshold, overflow := (&uint256.Int{}).MulDivOverflow(
&found.Tx.BlobFeeCap,
uint256.NewInt(100+priceBump),
uint256.NewInt(100),
)
if mt.Tx.BlobFeeCap.Lt(blobFeeThreshold) && !overflow {
if bytes.Equal(found.Tx.IDHash[:], mt.Tx.IDHash[:]) {
return txpoolcfg.NotSet
}
return txpoolcfg.ReplaceUnderpriced // TODO: This is the same as NotReplaced
}
}
//Regular txn threshold checks
tipThreshold := uint256.NewInt(0)
tipThreshold = tipThreshold.Mul(&found.Tx.Tip, uint256.NewInt(100+priceBump))
tipThreshold.Div(tipThreshold, u256.N100)
feecapThreshold := uint256.NewInt(0)
feecapThreshold.Mul(&found.Tx.FeeCap, uint256.NewInt(100+priceBump))
feecapThreshold.Div(feecapThreshold, u256.N100)
if mt.Tx.Tip.Cmp(tipThreshold) < 0 || mt.Tx.FeeCap.Cmp(feecapThreshold) < 0 {
// Both tip and feecap need to be larger than previously to replace the transaction
// In case if the transition is stuck, "poke" it to rebroadcast
if mt.subPool&IsLocal != 0 && (found.currentSubPool == PendingSubPool || found.currentSubPool == BaseFeeSubPool) {
announcements.Append(found.Tx.Type, found.Tx.Size, found.Tx.IDHash[:])
}
if bytes.Equal(found.Tx.IDHash[:], mt.Tx.IDHash[:]) {
return txpoolcfg.NotSet
}
return txpoolcfg.NotReplaced
}
switch found.currentSubPool {
case PendingSubPool:
p.pending.Remove(found, "add", p.logger)
case BaseFeeSubPool:
p.baseFee.Remove(found, "add", p.logger)
case QueuedSubPool:
p.queued.Remove(found, "add", p.logger)
default:
//already removed
}
p.discardLocked(found, txpoolcfg.ReplacedByHigherTip)
}
// Don't add blob tx to queued if it's less than current pending blob base fee
if mt.Tx.Type == types.BlobTxType && mt.Tx.BlobFeeCap.LtUint64(p.pendingBlobFee.Load()) {
return txpoolcfg.FeeTooLow
}
hashStr := string(mt.Tx.IDHash[:])
p.byHash[hashStr] = mt
if replaced := p.all.replaceOrInsert(mt, p.logger); replaced != nil {
if assert.Enable {
panic("must never happen")
}
}
if mt.subPool&IsLocal != 0 {
p.isLocalLRU.Add(hashStr, struct{}{})
}
// All transactions are first added to the queued pool and then immediately promoted from there if required
p.queued.Add(mt, "addLocked", p.logger)
// Remove from mined cache as we are now "resurrecting" it to a sub-pool
p.deleteMinedBlobTxn(hashStr)
return txpoolcfg.NotSet
}
// dropping transaction from all sub-structures and from db
// Important: don't call it while iterating by all
func (p *TxPool) discardLocked(mt *metaTx, reason txpoolcfg.DiscardReason) {
hashStr := string(mt.Tx.IDHash[:])
delete(p.byHash, hashStr)
p.deletedTxs = append(p.deletedTxs, mt)
p.all.delete(mt, reason, p.logger)
p.discardReasonsLRU.Add(hashStr, reason)
}
// Cache recently mined blobs in anticipation of reorg, delete finalized ones
func (p *TxPool) processMinedFinalizedBlobs(coreTx kv.Tx, minedTxs []*types.TxSlot, finalizedBlock uint64) error {
p.lastFinalizedBlock.Store(finalizedBlock)
// Remove blobs in the finalized block and older, loop through all entries
for l := len(p.minedBlobTxsByBlock); l > 0 && finalizedBlock > 0; l-- {
// delete individual hashes
for _, mt := range p.minedBlobTxsByBlock[finalizedBlock] {
delete(p.minedBlobTxsByHash, string(mt.Tx.IDHash[:]))
}
// delete the map entry for this block num
delete(p.minedBlobTxsByBlock, finalizedBlock)
// move on to older blocks, if present
finalizedBlock--
}
// Add mined blobs
minedBlock := p.lastSeenBlock.Load()
p.minedBlobTxsByBlock[minedBlock] = make([]*metaTx, 0)
for _, txn := range minedTxs {
if txn.Type == types.BlobTxType {
mt := &metaTx{Tx: txn, minedBlockNum: minedBlock}
p.minedBlobTxsByBlock[minedBlock] = append(p.minedBlobTxsByBlock[minedBlock], mt)
mt.bestIndex = len(p.minedBlobTxsByBlock[minedBlock]) - 1
p.minedBlobTxsByHash[string(txn.IDHash[:])] = mt
}
}
return nil
}
// Delete individual hash entries from minedBlobTxs cache
func (p *TxPool) deleteMinedBlobTxn(hash string) {
mt, exists := p.minedBlobTxsByHash[hash]
if !exists {
return
}
l := len(p.minedBlobTxsByBlock[mt.minedBlockNum])
if l > 1 {
p.minedBlobTxsByBlock[mt.minedBlockNum][mt.bestIndex] = p.minedBlobTxsByBlock[mt.minedBlockNum][l-1]
}
p.minedBlobTxsByBlock[mt.minedBlockNum] = p.minedBlobTxsByBlock[mt.minedBlockNum][:l-1]
delete(p.minedBlobTxsByHash, hash)
}
func (p *TxPool) NonceFromAddress(addr [20]byte) (nonce uint64, inPool bool) {
p.lock.Lock()
defer p.lock.Unlock()
senderID, found := p.senders.getID(addr)
if !found {
return 0, false
}
return p.all.nonce(senderID)
}
// removeMined - apply new highest block (or batch of blocks)
//
// 1. New best block arrives, which potentially changes the balance and the nonce of some senders.
// We use senderIds data structure to find relevant senderId values, and then use senders data structure to
// modify state_balance and state_nonce, potentially remove some elements (if transaction with some nonce is
// included into a block), and finally, walk over the transaction records and update SubPool fields depending on
// the actual presence of nonce gaps and what the balance is.
func (p *TxPool) removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot) error {
noncesToRemove := map[uint64]uint64{}
for _, txn := range minedTxs {
nonce, ok := noncesToRemove[txn.SenderID]
if !ok || txn.Nonce > nonce {
noncesToRemove[txn.SenderID] = txn.Nonce
}
}
var toDel []*metaTx // can't delete items while iterate them
discarded := 0
pendingRemoved := 0
baseFeeRemoved := 0
queuedRemoved := 0
for senderID, nonce := range noncesToRemove {
byNonce.ascend(senderID, func(mt *metaTx) bool {
if mt.Tx.Nonce > nonce {
if mt.Tx.Traced {
p.logger.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", mt.Tx.Nonce, "sender.nonce", nonce)
}
return false
}
if mt.Tx.Traced {
p.logger.Info("TX TRACING: removeMined", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "senderId", mt.Tx.SenderID, "nonce", mt.Tx.Nonce, "currentSubPool", mt.currentSubPool)
}
toDel = append(toDel, mt)
// del from sub-pool
switch mt.currentSubPool {
case PendingSubPool:
pendingRemoved++
p.pending.Remove(mt, "remove-mined", p.logger)
case BaseFeeSubPool:
baseFeeRemoved++
p.baseFee.Remove(mt, "remove-mined", p.logger)
case QueuedSubPool:
queuedRemoved++
p.queued.Remove(mt, "remove-mined", p.logger)
default:
//already removed
}
return true
})
discarded += len(toDel)
for _, mt := range toDel {
p.discardLocked(mt, txpoolcfg.Mined)
}
toDel = toDel[:0]
}
if discarded > 0 {
p.logger.Debug("Discarded transactions", "count", discarded, "pending", pendingRemoved, "baseFee", baseFeeRemoved, "queued", queuedRemoved)
}
return nil
}
// onSenderStateChange is the function that recalculates ephemeral fields of transactions and determines
// which sub pool they will need to go to. Since this depends on other transactions from the same sender by with lower
// nonces, and also affect other transactions from the same sender with higher nonce, it loops through all transactions
// for a given senderID
func (p *TxPool) onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint256.Int, blockGasLimit uint64, logger log.Logger) {
noGapsNonce := senderNonce
cumulativeRequiredBalance := uint256.NewInt(0)
minFeeCap := uint256.NewInt(0).SetAllOne()
minTip := uint64(math.MaxUint64)
var toDel []*metaTx // can't delete items while iterate them
p.all.ascend(senderID, func(mt *metaTx) bool {
deleteAndContinueReasonLog := ""
if senderNonce > mt.Tx.Nonce {
deleteAndContinueReasonLog = "low nonce"
} else if mt.Tx.Nonce != noGapsNonce && mt.Tx.Type == types.BlobTxType { // Discard nonce-gapped blob txns
deleteAndContinueReasonLog = "nonce-gapped blob txn"
}
if deleteAndContinueReasonLog != "" {
if mt.Tx.Traced {
logger.Info("TX TRACING: onSenderStateChange loop iteration remove", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "senderID", senderID, "senderNonce", senderNonce, "txn.nonce", mt.Tx.Nonce, "currentSubPool", mt.currentSubPool, "reason", deleteAndContinueReasonLog)
}
// del from sub-pool
switch mt.currentSubPool {
case PendingSubPool:
p.pending.Remove(mt, deleteAndContinueReasonLog, p.logger)
case BaseFeeSubPool:
p.baseFee.Remove(mt, deleteAndContinueReasonLog, p.logger)
case QueuedSubPool:
p.queued.Remove(mt, deleteAndContinueReasonLog, p.logger)
default:
//already removed
}
toDel = append(toDel, mt)
return true
}
if minFeeCap.Gt(&mt.Tx.FeeCap) {
*minFeeCap = mt.Tx.FeeCap
}
mt.minFeeCap = *minFeeCap
if mt.Tx.Tip.IsUint64() {
minTip = cmp.Min(minTip, mt.Tx.Tip.Uint64())
}
mt.minTip = minTip
mt.nonceDistance = 0
if mt.Tx.Nonce > senderNonce { // no uint underflow
mt.nonceDistance = mt.Tx.Nonce - senderNonce
}
needBalance := requiredBalance(mt.Tx)
// 2. Absence of nonce gaps. Set to 1 for transactions whose nonce is N, state nonce for
// the sender is M, and there are transactions for all nonces between M and N from the same
// sender. Set to 0 is the transaction's nonce is divided from the state nonce by one or more nonce gaps.
mt.subPool &^= NoNonceGaps
if noGapsNonce == mt.Tx.Nonce {
mt.subPool |= NoNonceGaps
noGapsNonce++
}
// 3. Sufficient balance for gas. Set to 1 if the balance of sender's account in the
// state is B, nonce of the sender in the state is M, nonce of the transaction is N, and the
// sum of feeCap x gasLimit + transferred_value of all transactions from this sender with
// nonces N+1 ... M is no more than B. Set to 0 otherwise. In other words, this bit is
// set if there is currently a guarantee that the transaction and all its required prior
// transactions will be able to pay for gas.
mt.subPool &^= EnoughBalance
mt.cumulativeBalanceDistance = math.MaxUint64
if mt.Tx.Nonce >= senderNonce {
cumulativeRequiredBalance = cumulativeRequiredBalance.Add(cumulativeRequiredBalance, needBalance) // already deleted all transactions with nonce <= sender.nonce
if senderBalance.Gt(cumulativeRequiredBalance) || senderBalance.Eq(cumulativeRequiredBalance) {
mt.subPool |= EnoughBalance
} else {
if cumulativeRequiredBalance.IsUint64() && senderBalance.IsUint64() {
mt.cumulativeBalanceDistance = cumulativeRequiredBalance.Uint64() - senderBalance.Uint64()
}
}
}
mt.subPool &^= NotTooMuchGas
if mt.Tx.Gas < blockGasLimit {
mt.subPool |= NotTooMuchGas
}
if mt.Tx.Traced {
logger.Info("TX TRACING: onSenderStateChange loop iteration update", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "senderId", mt.Tx.SenderID, "nonce", mt.Tx.Nonce, "subPool", mt.currentSubPool)
}
// Some fields of mt might have changed, need to fix the invariants in the subpool best and worst queues
switch mt.currentSubPool {
case PendingSubPool:
p.pending.Updated(mt)
case BaseFeeSubPool:
p.baseFee.Updated(mt)
case QueuedSubPool:
p.queued.Updated(mt)
}
return true
})
for _, mt := range toDel {
p.discardLocked(mt, txpoolcfg.NonceTooLow)
}
logger.Trace("[txpool] onSenderStateChange", "sender", senderID, "count", p.all.count(senderID), "pending", p.pending.Len(), "baseFee", p.baseFee.Len(), "queued", p.queued.Len())
}
// promote reasserts invariants of the subpool and returns the list of transactions that ended up
// being promoted to the pending or basefee pool, for re-broadcasting
func (p *TxPool) promote(pendingBaseFee uint64, pendingBlobFee uint64, announcements *types.Announcements, logger log.Logger) {
// Demote worst transactions that do not qualify for pending sub pool anymore, to other sub pools, or discard
for worst := p.pending.Worst(); p.pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.LtUint64(pendingBaseFee) || (worst.Tx.Type == types.BlobTxType && worst.Tx.BlobFeeCap.LtUint64(pendingBlobFee))); worst = p.pending.Worst() {
if worst.subPool >= BaseFeePoolBits {
tx := p.pending.PopWorst()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
p.baseFee.Add(tx, "demote-pending", logger)
} else {
p.queued.Add(p.pending.PopWorst(), "demote-pending", logger)
}
}
// Promote best transactions from base fee pool to pending pool while they qualify
for best := p.baseFee.Best(); p.baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.CmpUint64(pendingBaseFee) >= 0 && (best.Tx.Type != types.BlobTxType || best.Tx.BlobFeeCap.CmpUint64(pendingBlobFee) >= 0); best = p.baseFee.Best() {
tx := p.baseFee.PopBest()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
p.pending.Add(tx, logger)
}
// Demote worst transactions that do not qualify for base fee pool anymore, to queued sub pool, or discard
for worst := p.baseFee.Worst(); p.baseFee.Len() > 0 && worst.subPool < BaseFeePoolBits; worst = p.baseFee.Worst() {
p.queued.Add(p.baseFee.PopWorst(), "demote-base", logger)
}
// Promote best transactions from the queued pool to either pending or base fee pool, while they qualify
for best := p.queued.Best(); p.queued.Len() > 0 && best.subPool >= BaseFeePoolBits; best = p.queued.Best() {
if best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0 {
tx := p.queued.PopBest()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
p.pending.Add(tx, logger)
} else {
p.baseFee.Add(p.queued.PopBest(), "promote-queued", logger)
}
}
// Discard worst transactions from the queued sub pool if they do not qualify
// <FUNCTIONALITY REMOVED>
// Discard worst transactions from pending pool until it is within capacity limit
for p.pending.Len() > p.pending.limit {
p.discardLocked(p.pending.PopWorst(), txpoolcfg.PendingPoolOverflow)
}
// Discard worst transactions from pending sub pool until it is within capacity limits
for p.baseFee.Len() > p.baseFee.limit {
p.discardLocked(p.baseFee.PopWorst(), txpoolcfg.BaseFeePoolOverflow)
}
// Discard worst transactions from the queued sub pool until it is within its capacity limits
for _ = p.queued.Worst(); p.queued.Len() > p.queued.limit; _ = p.queued.Worst() {
p.discardLocked(p.queued.PopWorst(), txpoolcfg.QueuedPoolOverflow)
}
}
// txMaxBroadcastSize is the max size of a transaction that will be broadcasted.
// All transactions with a higher size will be announced and need to be fetched
// by the peer.
const txMaxBroadcastSize = 4 * 1024
// MainLoop - does:
// send pending byHash to p2p:
// - new byHash
// - all pooled byHash to recently connected peers
// - all local pooled byHash to random peers periodically
//
// promote/demote transactions
// reorgs
func MainLoop(ctx context.Context, db kv.RwDB, p *TxPool, newTxs chan types.Announcements, send *Send, newSlotsStreams *NewSlotsStreams, notifyMiningAboutNewSlots func()) {
syncToNewPeersEvery := time.NewTicker(p.cfg.SyncToNewPeersEvery)
defer syncToNewPeersEvery.Stop()
processRemoteTxsEvery := time.NewTicker(p.cfg.ProcessRemoteTxsEvery)
defer processRemoteTxsEvery.Stop()
commitEvery := time.NewTicker(p.cfg.CommitEvery)
defer commitEvery.Stop()
logEvery := time.NewTicker(p.cfg.LogEvery)
defer logEvery.Stop()
err := p.Start(ctx, db)
if err != nil {
p.logger.Error("[txpool] Failed to start", "err", err)
return
}
for {
select {
case <-ctx.Done():
_, _ = p.flush(ctx, db)
return
case <-logEvery.C:
p.logStats()
case <-processRemoteTxsEvery.C:
if !p.Started() {
continue
}
if err := p.processRemoteTxs(ctx); err != nil {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
continue
}
p.logger.Error("[txpool] process batch remote txs", "err", err)
}
case <-commitEvery.C:
if db != nil && p.Started() {
t := time.Now()
written, err := p.flush(ctx, db)
if err != nil {
p.logger.Error("[txpool] flush is local history", "err", err)
continue
}
writeToDBBytesCounter.SetUint64(written)
p.logger.Debug("[txpool] Commit", "written_kb", written/1024, "in", time.Since(t))
}
case announcements := <-newTxs:
go func() {
for i := 0; i < 16; i++ { // drain more events from channel, then merge and dedup them
select {
case a := <-newTxs:
announcements.AppendOther(a)
continue
default:
}
break
}
if announcements.Len() == 0 {
return
}
defer propagateNewTxsTimer.ObserveDuration(time.Now())
announcements = announcements.DedupCopy()
notifyMiningAboutNewSlots()
if p.cfg.NoGossip {
// drain newTxs for emptying newTx channel
// newTx channel will be filled only with local transactions
// early return to avoid outbound transaction propagation
log.Debug("[txpool] tx gossip disabled", "state", "drain new transactions")
return
}
var localTxTypes []byte
var localTxSizes []uint32
var localTxHashes types.Hashes
var localTxRlps [][]byte
var remoteTxTypes []byte
var remoteTxSizes []uint32
var remoteTxHashes types.Hashes
var remoteTxRlps [][]byte
var broadcastHashes types.Hashes
slotsRlp := make([][]byte, 0, announcements.Len())
if err := db.View(ctx, func(tx kv.Tx) error {
for i := 0; i < announcements.Len(); i++ {
t, size, hash := announcements.At(i)
slotRlp, err := p.GetRlp(tx, hash)
if err != nil {
return err
}
if len(slotRlp) == 0 {
continue
}
// Empty rlp can happen if a transaction we want to broadcast has just been mined, for example
slotsRlp = append(slotsRlp, slotRlp)
if p.IsLocal(hash) {
localTxTypes = append(localTxTypes, t)
localTxSizes = append(localTxSizes, size)
localTxHashes = append(localTxHashes, hash...)
// "Nodes MUST NOT automatically broadcast blob transactions to their peers" - EIP-4844
if t != types.BlobTxType {
localTxRlps = append(localTxRlps, slotRlp)
broadcastHashes = append(broadcastHashes, hash...)
}
} else {
remoteTxTypes = append(remoteTxTypes, t)
remoteTxSizes = append(remoteTxSizes, size)
remoteTxHashes = append(remoteTxHashes, hash...)
// "Nodes MUST NOT automatically broadcast blob transactions to their peers" - EIP-4844
if t != types.BlobTxType && len(slotRlp) < txMaxBroadcastSize {
remoteTxRlps = append(remoteTxRlps, slotRlp)
}
}
}
return nil
}); err != nil {
p.logger.Error("[txpool] collect info to propagate", "err", err)
return
}
if newSlotsStreams != nil {
// TODO(eip-4844) What is this for? Is it OK to broadcast blob transactions?
newSlotsStreams.Broadcast(&proto_txpool.OnAddReply{RplTxs: slotsRlp}, p.logger)
}
// broadcast local transactions
const localTxsBroadcastMaxPeers uint64 = 10
txSentTo := send.BroadcastPooledTxs(localTxRlps, localTxsBroadcastMaxPeers)
for i, peer := range txSentTo {
p.logger.Trace("Local tx broadcast", "txHash", hex.EncodeToString(broadcastHashes.At(i)), "to peer", peer)
}
hashSentTo := send.AnnouncePooledTxs(localTxTypes, localTxSizes, localTxHashes, localTxsBroadcastMaxPeers*2)
for i := 0; i < localTxHashes.Len(); i++ {
hash := localTxHashes.At(i)
p.logger.Trace("Local tx announced", "txHash", hex.EncodeToString(hash), "to peer", hashSentTo[i], "baseFee", p.pendingBaseFee.Load())
}
// broadcast remote transactions
const remoteTxsBroadcastMaxPeers uint64 = 3
send.BroadcastPooledTxs(remoteTxRlps, remoteTxsBroadcastMaxPeers)
send.AnnouncePooledTxs(remoteTxTypes, remoteTxSizes, remoteTxHashes, remoteTxsBroadcastMaxPeers*2)
}()
case <-syncToNewPeersEvery.C: // new peer
newPeers := p.recentlyConnectedPeers.GetAndClean()
if len(newPeers) == 0 {
continue
}
if p.cfg.NoGossip {
// avoid transaction gossiping for new peers
log.Debug("[txpool] tx gossip disabled", "state", "sync new peers")
continue
}
t := time.Now()
var hashes types.Hashes
var types []byte
var sizes []uint32
types, sizes, hashes = p.AppendAllAnnouncements(types, sizes, hashes[:0])
go send.PropagatePooledTxsToPeersList(newPeers, types, sizes, hashes)
propagateToNewPeerTimer.ObserveDuration(t)
}
}
}
func (p *TxPool) flushNoFsync(ctx context.Context, db kv.RwDB) (written uint64, err error) {
p.lock.Lock()
defer p.lock.Unlock()
//it's important that write db tx is done inside lock, to make last writes visible for all read operations
if err := db.UpdateNosync(ctx, func(tx kv.RwTx) error {
err = p.flushLocked(tx)
if err != nil {
return err
}
written, _, err = tx.(*mdbx.MdbxTx).SpaceDirty()
if err != nil {
return err
}
return nil
}); err != nil {
return 0, err
}
return written, nil
}
func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err error) {
defer writeToDBTimer.ObserveDuration(time.Now())
// 1. get global lock on txpool and flush it to db, without fsync (to release lock asap)
// 2. then fsync db without txpool lock
written, err = p.flushNoFsync(ctx, db)
if err != nil {
return 0, err
}
// fsync
if err := db.Update(ctx, func(tx kv.RwTx) error { return nil }); err != nil {
return 0, err
}
return written, nil
}
func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
for i, mt := range p.deletedTxs {
id := mt.Tx.SenderID
idHash := mt.Tx.IDHash[:]
if !p.all.hasTxs(id) {
addr, ok := p.senders.senderID2Addr[id]
if ok {
delete(p.senders.senderID2Addr, id)
delete(p.senders.senderIDs, addr)
}
}
//fmt.Printf("del:%d,%d,%d\n", mt.Tx.senderID, mt.Tx.nonce, mt.Tx.tip)
has, err := tx.Has(kv.PoolTransaction, idHash)
if err != nil {
return err
}
if has {
if err := tx.Delete(kv.PoolTransaction, idHash); err != nil {
return err
}
}
p.deletedTxs[i] = nil // for gc
}
txHashes := p.isLocalLRU.Keys()
encID := make([]byte, 8)
if err := tx.ClearBucket(kv.RecentLocalTransaction); err != nil {
return err
}
for i, txHash := range txHashes {
binary.BigEndian.PutUint64(encID, uint64(i))
if err := tx.Append(kv.RecentLocalTransaction, encID, []byte(txHash)); err != nil {
return err
}
}
v := make([]byte, 0, 1024)
for txHash, metaTx := range p.byHash {
if metaTx.Tx.Rlp == nil {
continue
}
v = common.EnsureEnoughSize(v, 20+len(metaTx.Tx.Rlp))
addr, ok := p.senders.senderID2Addr[metaTx.Tx.SenderID]
if !ok {
p.logger.Warn("[txpool] flush: sender address not found by ID", "senderID", metaTx.Tx.SenderID)
continue
}
copy(v[:20], addr.Bytes())
copy(v[20:], metaTx.Tx.Rlp)
has, err := tx.Has(kv.PoolTransaction, []byte(txHash))
if err != nil {
return err
}
if !has {
if err := tx.Put(kv.PoolTransaction, []byte(txHash), v); err != nil {
return err
}
}
metaTx.Tx.Rlp = nil
}
binary.BigEndian.PutUint64(encID, p.pendingBaseFee.Load())
if err := tx.Put(kv.PoolInfo, PoolPendingBaseFeeKey, encID); err != nil {
return err
}
binary.BigEndian.PutUint64(encID, p.pendingBlobFee.Load())
if err := tx.Put(kv.PoolInfo, PoolPendingBlobFeeKey, encID); err != nil {
return err
}
if err := PutLastSeenBlock(tx, p.lastSeenBlock.Load(), encID); err != nil {
return err
}
// clean - in-memory data structure as later as possible - because if during this Tx will happen error,
// DB will stay consistent but some in-memory structures may be already cleaned, and retry will not work
// failed write transaction must not create side-effects
p.deletedTxs = p.deletedTxs[:0]
return nil
}
func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
if p.lastSeenBlock.Load() == 0 {
lastSeenBlock, err := LastSeenBlock(tx)
if err != nil {
return err
}
p.lastSeenBlock.Store(lastSeenBlock)
}
// this is neccessary as otherwise best - which waits for sync events
// may wait for ever if blocks have been process before the txpool
// starts with an empty db
lastSeenProgress, err := getExecutionProgress(coreTx)
if err != nil {
return err
}
if p.lastSeenBlock.Load() < lastSeenProgress {
// TODO we need to process the blocks since the
// last seen to make sure that the tx pool is in
// sync with the processed blocks
p.lastSeenBlock.Store(lastSeenProgress)
}
cacheView, err := p._stateCache.View(ctx, coreTx)
if err != nil {
return err
}
it, err := tx.Range(kv.RecentLocalTransaction, nil, nil)
if err != nil {
return err
}
for it.HasNext() {
_, v, err := it.Next()
if err != nil {
return err
}
p.isLocalLRU.Add(string(v), struct{}{})
}
txs := types.TxSlots{}
parseCtx := types.NewTxParseContext(p.chainID, false)
parseCtx.WithSender(false)
i := 0
it, err = tx.Range(kv.PoolTransaction, nil, nil)
if err != nil {
return err
}
for it.HasNext() {
k, v, err := it.Next()
if err != nil {
return err
}
addr, txRlp := *(*[20]byte)(v[:20]), v[20:]
txn := &types.TxSlot{}
// TODO(eip-4844) ensure wrappedWithBlobs when transactions are saved to the DB
_, err = parseCtx.ParseTransaction(txRlp, 0, txn, nil, false /* hasEnvelope */, true /*wrappedWithBlobs*/, nil)
if err != nil {
err = fmt.Errorf("err: %w, rlp: %x", err, txRlp)
p.logger.Warn("[txpool] fromDB: parseTransaction", "err", err)
continue
}
txn.Rlp = nil // means that we don't need store it in db anymore
txn.SenderID, txn.Traced = p.senders.getOrCreateID(addr, p.logger)
binary.BigEndian.Uint64(v) // TODO - unnecessary line, remove
isLocalTx := p.isLocalLRU.Contains(string(k))
if reason := p.validateTx(txn, isLocalTx, cacheView); reason != txpoolcfg.NotSet && reason != txpoolcfg.Success {
return nil // TODO: Clarify - if one of the txs has the wrong reason, no pooled txs!
}
txs.Resize(uint(i + 1))
txs.Txs[i] = txn
txs.IsLocal[i] = isLocalTx
copy(txs.Senders.At(i), addr[:])
i++
}
var pendingBaseFee, pendingBlobFee, minBlobGasPrice, blockGasLimit uint64
if p.feeCalculator != nil {
if chainConfig, _ := ChainConfig(tx); chainConfig != nil {
pendingBaseFee, pendingBlobFee, minBlobGasPrice, blockGasLimit, err = p.feeCalculator.CurrentFees(chainConfig, coreTx)
if err != nil {
return err
}
}
}
if pendingBaseFee == 0 {
v, err := tx.GetOne(kv.PoolInfo, PoolPendingBaseFeeKey)
if err != nil {
return err
}
if len(v) > 0 {
pendingBaseFee = binary.BigEndian.Uint64(v)
}
}
if pendingBlobFee == 0 {
v, err := tx.GetOne(kv.PoolInfo, PoolPendingBlobFeeKey)
if err != nil {
return err
}
if len(v) > 0 {
pendingBlobFee = binary.BigEndian.Uint64(v)
}
}
if pendingBlobFee == 0 {
pendingBlobFee = minBlobGasPrice
}
if blockGasLimit == 0 {
blockGasLimit = DefaultBlockGasLimit
}
err = p.senders.registerNewSenders(&txs, p.logger)
if err != nil {
return err
}
if _, _, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs,
pendingBaseFee, pendingBlobFee, blockGasLimit, false, p.logger); err != nil {
return err
}
p.pendingBaseFee.Store(pendingBaseFee)
p.pendingBlobFee.Store(pendingBlobFee)
p.blockGasLimit.Store(blockGasLimit)
return nil
}
func getExecutionProgress(db kv.Getter) (uint64, error) {
data, err := db.GetOne(kv.SyncStageProgress, []byte("Execution"))
if err != nil {
return 0, err
}
if len(data) == 0 {
return 0, nil
}
if len(data) < 8 {
return 0, fmt.Errorf("value must be at least 8 bytes, got %d", len(data))
}
return binary.BigEndian.Uint64(data[:8]), nil
}
func LastSeenBlock(tx kv.Getter) (uint64, error) {
v, err := tx.GetOne(kv.PoolInfo, PoolLastSeenBlockKey)
if err != nil {
return 0, err
}
if len(v) == 0 {
return 0, nil
}
return binary.BigEndian.Uint64(v), nil
}
func PutLastSeenBlock(tx kv.Putter, n uint64, buf []byte) error {
buf = common.EnsureEnoughSize(buf, 8)
binary.BigEndian.PutUint64(buf, n)
err := tx.Put(kv.PoolInfo, PoolLastSeenBlockKey, buf)
if err != nil {
return err
}
return nil
}
func ChainConfig(tx kv.Getter) (*chain.Config, error) {
v, err := tx.GetOne(kv.PoolInfo, PoolChainConfigKey)
if err != nil {
return nil, err
}
if len(v) == 0 {
return nil, nil
}
var config chain.Config
if err := json.Unmarshal(v, &config); err != nil {
return nil, fmt.Errorf("invalid chain config JSON in pool db: %w", err)
}
return &config, nil
}
func PutChainConfig(tx kv.Putter, cc *chain.Config, buf []byte) error {
wr := bytes.NewBuffer(buf)
if err := json.NewEncoder(wr).Encode(cc); err != nil {
return fmt.Errorf("invalid chain config JSON in pool db: %w", err)
}
if err := tx.Put(kv.PoolInfo, PoolChainConfigKey, wr.Bytes()); err != nil {
return err
}
return nil
}
// nolint
func (p *TxPool) printDebug(prefix string) {
fmt.Printf("%s.pool.byHash\n", prefix)
for _, j := range p.byHash {
fmt.Printf("\tsenderID=%d, nonce=%d, tip=%d\n", j.Tx.SenderID, j.Tx.Nonce, j.Tx.Tip)
}
fmt.Printf("%s.pool.queues.len: %d,%d,%d\n", prefix, p.pending.Len(), p.baseFee.Len(), p.queued.Len())
for _, mt := range p.pending.best.ms {
mt.Tx.PrintDebug(fmt.Sprintf("%s.pending: %b,%d,%d,%d", prefix, mt.subPool, mt.Tx.SenderID, mt.Tx.Nonce, mt.Tx.Tip))
}
for _, mt := range p.baseFee.best.ms {
mt.Tx.PrintDebug(fmt.Sprintf("%s.baseFee : %b,%d,%d,%d", prefix, mt.subPool, mt.Tx.SenderID, mt.Tx.Nonce, mt.Tx.Tip))
}
for _, mt := range p.queued.best.ms {
mt.Tx.PrintDebug(fmt.Sprintf("%s.queued : %b,%d,%d,%d", prefix, mt.subPool, mt.Tx.SenderID, mt.Tx.Nonce, mt.Tx.Tip))
}
}
func (p *TxPool) logStats() {
if !p.Started() {
//p.logger.Info("[txpool] Not started yet, waiting for new blocks...")
return
}
p.lock.Lock()
defer p.lock.Unlock()
var m runtime.MemStats
dbg.ReadMemStats(&m)
ctx := []interface{}{
//"block", p.lastSeenBlock.Load(),
"pending", p.pending.Len(),
"baseFee", p.baseFee.Len(),
"queued", p.queued.Len(),
}
cacheKeys := p._stateCache.Len()
if cacheKeys > 0 {
ctx = append(ctx, "cache_keys", cacheKeys)
}
ctx = append(ctx, "alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys))
p.logger.Info("[txpool] stat", ctx...)
pendingSubCounter.SetInt(p.pending.Len())
basefeeSubCounter.SetInt(p.baseFee.Len())
queuedSubCounter.SetInt(p.queued.Len())
}
// Deprecated need switch to streaming-like
func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp []byte, sender common.Address, t SubPoolType), tx kv.Tx) {
p.lock.Lock()
defer p.lock.Unlock()
p.all.ascendAll(func(mt *metaTx) bool {
slot := mt.Tx
slotRlp := slot.Rlp
if slot.Rlp == nil {
v, err := tx.GetOne(kv.PoolTransaction, slot.IDHash[:])
if err != nil {
p.logger.Warn("[txpool] foreach: get tx from db", "err", err)
return true
}
if v == nil {
p.logger.Warn("[txpool] foreach: tx not found in db")
return true
}
slotRlp = v[20:]
}
if sender, found := p.senders.senderID2Addr[slot.SenderID]; found {
f(slotRlp, sender, mt.currentSubPool)
}
return true
})
}
var PoolChainConfigKey = []byte("chain_config")
var PoolLastSeenBlockKey = []byte("last_seen_block")
var PoolPendingBaseFeeKey = []byte("pending_base_fee")
var PoolPendingBlobFeeKey = []byte("pending_blob_fee")
// recentlyConnectedPeers does buffer IDs of recently connected good peers
// then sync of pooled Transaction can happen to all of then at once
// DoS protection and performance saving
// it doesn't track if peer disconnected, it's fine
type recentlyConnectedPeers struct {
peers []types.PeerID
lock sync.Mutex
}
func (l *recentlyConnectedPeers) AddPeer(p types.PeerID) {
l.lock.Lock()
defer l.lock.Unlock()
l.peers = append(l.peers, p)
}
func (l *recentlyConnectedPeers) GetAndClean() []types.PeerID {
l.lock.Lock()
defer l.lock.Unlock()
peers := l.peers
l.peers = nil
return peers
}
// nolint
func (sc *sendersBatch) printDebug(prefix string) {
fmt.Printf("%s.sendersBatch.sender\n", prefix)
//for i, j := range sc.senderInfo {
// fmt.Printf("\tid=%d,nonce=%d,balance=%d\n", i, j.nonce, j.balance.Uint64())
//}
}
// sendersBatch stores in-memory senders-related objects - which are different from DB (updated/dirty)
// flushing to db periodically. it doesn't play as read-cache (because db is small and memory-mapped - doesn't need cache)
// non thread-safe
type sendersBatch struct {
senderIDs map[common.Address]uint64
senderID2Addr map[uint64]common.Address
tracedSenders map[common.Address]struct{}
senderID uint64
}
func newSendersCache(tracedSenders map[common.Address]struct{}) *sendersBatch {
return &sendersBatch{senderIDs: map[common.Address]uint64{}, senderID2Addr: map[uint64]common.Address{}, tracedSenders: tracedSenders}
}
func (sc *sendersBatch) getID(addr common.Address) (uint64, bool) {
id, ok := sc.senderIDs[addr]
return id, ok
}
func (sc *sendersBatch) getOrCreateID(addr common.Address, logger log.Logger) (uint64, bool) {
_, traced := sc.tracedSenders[addr]
if !traced {
traced = TraceAll
}
id, ok := sc.senderIDs[addr]
if !ok {
sc.senderID++
id = sc.senderID
sc.senderIDs[addr] = id
sc.senderID2Addr[id] = addr
if traced {
logger.Info(fmt.Sprintf("TX TRACING: allocated senderID %d to sender %x", id, addr))
}
}
return id, traced
}
func (sc *sendersBatch) info(cacheView kvcache.CacheView, id uint64) (nonce uint64, balance uint256.Int, err error) {
addr, ok := sc.senderID2Addr[id]
if !ok {
panic("must not happen")
}
encoded, err := cacheView.Get(addr.Bytes())
if err != nil {
return 0, emptySender.balance, err
}
if len(encoded) == 0 {
return emptySender.nonce, emptySender.balance, nil
}
nonce, balance, err = types.DecodeSender(encoded)
if err != nil {
return 0, emptySender.balance, err
}
return nonce, balance, nil
}
func (sc *sendersBatch) registerNewSenders(newTxs *types.TxSlots, logger log.Logger) (err error) {
for i, txn := range newTxs.Txs {
txn.SenderID, txn.Traced = sc.getOrCreateID(newTxs.Senders.AddressAt(i), logger)
}
return nil
}
func (sc *sendersBatch) onNewBlock(stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, logger log.Logger) error {
for _, diff := range stateChanges.ChangeBatch {
for _, change := range diff.Changes { // merge state changes
addrB := gointerfaces.ConvertH160toAddress(change.Address)
sc.getOrCreateID(addrB, logger)
}
for i, txn := range unwindTxs.Txs {
txn.SenderID, txn.Traced = sc.getOrCreateID(unwindTxs.Senders.AddressAt(i), logger)
}
for i, txn := range minedTxs.Txs {
txn.SenderID, txn.Traced = sc.getOrCreateID(minedTxs.Senders.AddressAt(i), logger)
}
}
return nil
}
// BySenderAndNonce - designed to perform most expensive operation in TxPool:
// "recalculate all ephemeral fields of all transactions" by algo
// - for all senders - iterate over all transactions in nonce growing order
//
// Performances decisions:
// - All senders stored inside 1 large BTree - because iterate over 1 BTree is faster than over map[senderId]BTree
// - sortByNonce used as non-pointer wrapper - because iterate over BTree of pointers is 2x slower
type BySenderAndNonce struct {
tree *btree.BTreeG[*metaTx]
search *metaTx
senderIDTxnCount map[uint64]int // count of sender's txns in the pool - may differ from nonce
senderIDBlobCount map[uint64]uint64 // count of sender's total number of blobs in the pool
}
func (b *BySenderAndNonce) nonce(senderID uint64) (nonce uint64, ok bool) {
s := b.search
s.Tx.SenderID = senderID
s.Tx.Nonce = math.MaxUint64
b.tree.DescendLessOrEqual(s, func(mt *metaTx) bool {
if mt.Tx.SenderID == senderID {
nonce = mt.Tx.Nonce
ok = true
}
return false
})
return nonce, ok
}
func (b *BySenderAndNonce) ascendAll(f func(*metaTx) bool) {
b.tree.Ascend(func(mt *metaTx) bool {
return f(mt)
})
}
func (b *BySenderAndNonce) ascend(senderID uint64, f func(*metaTx) bool) {
s := b.search
s.Tx.SenderID = senderID
s.Tx.Nonce = 0
b.tree.AscendGreaterOrEqual(s, func(mt *metaTx) bool {
if mt.Tx.SenderID != senderID {
return false
}
return f(mt)
})
}
func (b *BySenderAndNonce) descend(senderID uint64, f func(*metaTx) bool) {
s := b.search
s.Tx.SenderID = senderID
s.Tx.Nonce = math.MaxUint64
b.tree.DescendLessOrEqual(s, func(mt *metaTx) bool {
if mt.Tx.SenderID != senderID {
return false
}
return f(mt)
})
}
func (b *BySenderAndNonce) count(senderID uint64) int {
return b.senderIDTxnCount[senderID]
}
func (b *BySenderAndNonce) blobCount(senderID uint64) uint64 {
return b.senderIDBlobCount[senderID]
}
func (b *BySenderAndNonce) hasTxs(senderID uint64) bool {
has := false
b.ascend(senderID, func(*metaTx) bool {
has = true
return false
})
return has
}
func (b *BySenderAndNonce) get(senderID, txNonce uint64) *metaTx {
s := b.search
s.Tx.SenderID = senderID
s.Tx.Nonce = txNonce
if found, ok := b.tree.Get(s); ok {
return found
}
return nil
}
// nolint
func (b *BySenderAndNonce) has(mt *metaTx) bool {
return b.tree.Has(mt)
}
func (b *BySenderAndNonce) delete(mt *metaTx, reason txpoolcfg.DiscardReason, logger log.Logger) {
if _, ok := b.tree.Delete(mt); ok {
if mt.Tx.Traced {
logger.Info("TX TRACING: Deleted tx by nonce", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "sender", mt.Tx.SenderID, "nonce", mt.Tx.Nonce, "reason", reason)
}
senderID := mt.Tx.SenderID
count := b.senderIDTxnCount[senderID]
if count > 1 {
b.senderIDTxnCount[senderID] = count - 1
} else {
delete(b.senderIDTxnCount, senderID)
}
if mt.Tx.Type == types.BlobTxType && mt.Tx.Blobs != nil {
accBlobCount := b.senderIDBlobCount[senderID]
txnBlobCount := len(mt.Tx.Blobs)
if txnBlobCount > 1 {
b.senderIDBlobCount[senderID] = accBlobCount - uint64(txnBlobCount)
} else {
delete(b.senderIDBlobCount, senderID)
}
}
}
}
func (b *BySenderAndNonce) replaceOrInsert(mt *metaTx, logger log.Logger) *metaTx {
it, ok := b.tree.ReplaceOrInsert(mt)
if ok {
if mt.Tx.Traced {
logger.Info("TX TRACING: Replaced tx by nonce", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "sender", mt.Tx.SenderID, "nonce", mt.Tx.Nonce)
}
return it
}
if mt.Tx.Traced {
logger.Info("TX TRACING: Inserted tx by nonce", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "sender", mt.Tx.SenderID, "nonce", mt.Tx.Nonce)
}
b.senderIDTxnCount[mt.Tx.SenderID]++
if mt.Tx.Type == types.BlobTxType && mt.Tx.Blobs != nil {
b.senderIDBlobCount[mt.Tx.SenderID] += uint64(len(mt.Tx.Blobs))
}
return nil
}
// PendingPool - is different from other pools - it's best is Slice instead of Heap
// It's more expensive to maintain "slice sort" invariant, but it allow do cheap copy of
// pending.best slice for mining (because we consider txs and metaTx are immutable)
type PendingPool struct {
best *bestSlice
worst *WorstQueue
limit int
t SubPoolType
}
func NewPendingSubPool(t SubPoolType, limit int) *PendingPool {
return &PendingPool{limit: limit, t: t, best: &bestSlice{ms: []*metaTx{}}, worst: &WorstQueue{ms: []*metaTx{}}}
}
// bestSlice - is similar to best queue, but uses a linear structure with O(n log n) sort complexity and
// it maintains element.bestIndex field
type bestSlice struct {
ms []*metaTx
pendingBaseFee uint64
}
func (s *bestSlice) Len() int { return len(s.ms) }
func (s *bestSlice) Swap(i, j int) {
s.ms[i], s.ms[j] = s.ms[j], s.ms[i]
s.ms[i].bestIndex, s.ms[j].bestIndex = i, j
}
func (s *bestSlice) Less(i, j int) bool {
return s.ms[i].better(s.ms[j], *uint256.NewInt(s.pendingBaseFee))
}
func (s *bestSlice) UnsafeRemove(i *metaTx) {
s.Swap(i.bestIndex, len(s.ms)-1)
s.ms[len(s.ms)-1].bestIndex = -1
s.ms[len(s.ms)-1] = nil
s.ms = s.ms[:len(s.ms)-1]
}
func (s *bestSlice) UnsafeAdd(i *metaTx) {
i.bestIndex = len(s.ms)
s.ms = append(s.ms, i)
}
func (p *PendingPool) EnforceWorstInvariants() {
heap.Init(p.worst)
}
func (p *PendingPool) EnforceBestInvariants() {
sort.Sort(p.best)
}
func (p *PendingPool) Best() *metaTx { //nolint
if len(p.best.ms) == 0 {
return nil
}
return p.best.ms[0]
}
func (p *PendingPool) Worst() *metaTx { //nolint
if len(p.worst.ms) == 0 {
return nil
}
return (p.worst.ms)[0]
}
func (p *PendingPool) PopWorst() *metaTx { //nolint
i := heap.Pop(p.worst).(*metaTx)
if i.bestIndex >= 0 {
p.best.UnsafeRemove(i)
}
return i
}
func (p *PendingPool) Updated(mt *metaTx) {
heap.Fix(p.worst, mt.worstIndex)
}
func (p *PendingPool) Len() int { return len(p.best.ms) }
func (p *PendingPool) Remove(i *metaTx, reason string, logger log.Logger) {
if i.Tx.Traced {
logger.Info(fmt.Sprintf("TX TRACING: removed from subpool %s", p.t), "idHash", fmt.Sprintf("%x", i.Tx.IDHash), "sender", i.Tx.SenderID, "nonce", i.Tx.Nonce, "reason", reason)
}
if i.worstIndex >= 0 {
heap.Remove(p.worst, i.worstIndex)
}
if i.bestIndex >= 0 {
p.best.UnsafeRemove(i)
}
i.currentSubPool = 0
}
func (p *PendingPool) Add(i *metaTx, logger log.Logger) {
if i.Tx.Traced {
logger.Info(fmt.Sprintf("TX TRACING: added to subpool %s, IdHash=%x, sender=%d, nonce=%d", p.t, i.Tx.IDHash, i.Tx.SenderID, i.Tx.Nonce))
}
i.currentSubPool = p.t
heap.Push(p.worst, i)
p.best.UnsafeAdd(i)
}
func (p *PendingPool) DebugPrint(prefix string) {
for i, it := range p.best.ms {
fmt.Printf("%s.best: %d, %d, %d,%d\n", prefix, i, it.subPool, it.bestIndex, it.Tx.Nonce)
}
for i, it := range p.worst.ms {
fmt.Printf("%s.worst: %d, %d, %d,%d\n", prefix, i, it.subPool, it.worstIndex, it.Tx.Nonce)
}
}
type SubPool struct {
best *BestQueue
worst *WorstQueue
limit int
t SubPoolType
}
func NewSubPool(t SubPoolType, limit int) *SubPool {
return &SubPool{limit: limit, t: t, best: &BestQueue{}, worst: &WorstQueue{}}
}
func (p *SubPool) EnforceInvariants() {
heap.Init(p.worst)
heap.Init(p.best)
}
func (p *SubPool) Best() *metaTx { //nolint
if len(p.best.ms) == 0 {
return nil
}
return p.best.ms[0]
}
func (p *SubPool) Worst() *metaTx { //nolint
if len(p.worst.ms) == 0 {
return nil
}
return p.worst.ms[0]
}
func (p *SubPool) PopBest() *metaTx { //nolint
i := heap.Pop(p.best).(*metaTx)
heap.Remove(p.worst, i.worstIndex)
return i
}
func (p *SubPool) PopWorst() *metaTx { //nolint
i := heap.Pop(p.worst).(*metaTx)
heap.Remove(p.best, i.bestIndex)
return i
}
func (p *SubPool) Len() int { return p.best.Len() }
func (p *SubPool) Add(i *metaTx, reason string, logger log.Logger) {
if i.Tx.Traced {
logger.Info(fmt.Sprintf("TX TRACING: added to subpool %s", p.t), "idHash", fmt.Sprintf("%x", i.Tx.IDHash), "sender", i.Tx.SenderID, "nonce", i.Tx.Nonce, "reason", reason)
}
i.currentSubPool = p.t
heap.Push(p.best, i)
heap.Push(p.worst, i)
}
func (p *SubPool) Remove(i *metaTx, reason string, logger log.Logger) {
if i.Tx.Traced {
logger.Info(fmt.Sprintf("TX TRACING: removed from subpool %s", p.t), "idHash", fmt.Sprintf("%x", i.Tx.IDHash), "sender", i.Tx.SenderID, "nonce", i.Tx.Nonce, "reason", reason)
}
heap.Remove(p.best, i.bestIndex)
heap.Remove(p.worst, i.worstIndex)
i.currentSubPool = 0
}
func (p *SubPool) Updated(i *metaTx) {
heap.Fix(p.best, i.bestIndex)
heap.Fix(p.worst, i.worstIndex)
}
func (p *SubPool) DebugPrint(prefix string) {
for i, it := range p.best.ms {
fmt.Printf("%s.best: %d, %d, %d\n", prefix, i, it.subPool, it.bestIndex)
}
for i, it := range p.worst.ms {
fmt.Printf("%s.worst: %d, %d, %d\n", prefix, i, it.subPool, it.worstIndex)
}
}
type BestQueue struct {
ms []*metaTx
pendingBastFee uint64
}
// Returns true if the txn "mt" is better than the parameter txn "than"
// it first compares the subpool markers of the two meta txns, then,
// (since they have the same subpool marker, and thus same pool)
// depending on the pool - pending (P), basefee (B), queued (Q) -
// it compares the effective tip (for P), nonceDistance (for both P,Q)
// minFeeCap (for B), and cumulative balance distance (for P, Q)
func (mt *metaTx) better(than *metaTx, pendingBaseFee uint256.Int) bool {
subPool := mt.subPool
thanSubPool := than.subPool
if mt.minFeeCap.Cmp(&pendingBaseFee) >= 0 {
subPool |= EnoughFeeCapBlock
}
if than.minFeeCap.Cmp(&pendingBaseFee) >= 0 {
thanSubPool |= EnoughFeeCapBlock
}
if subPool != thanSubPool {
return subPool > thanSubPool
}
switch mt.currentSubPool {
case PendingSubPool:
var effectiveTip, thanEffectiveTip uint256.Int
if mt.minFeeCap.Cmp(&pendingBaseFee) >= 0 {
difference := uint256.NewInt(0)
difference.Sub(&mt.minFeeCap, &pendingBaseFee)
if difference.Cmp(uint256.NewInt(mt.minTip)) <= 0 {
effectiveTip = *difference
} else {
effectiveTip = *uint256.NewInt(mt.minTip)
}
}
if than.minFeeCap.Cmp(&pendingBaseFee) >= 0 {
difference := uint256.NewInt(0)
difference.Sub(&than.minFeeCap, &pendingBaseFee)
if difference.Cmp(uint256.NewInt(than.minTip)) <= 0 {
thanEffectiveTip = *difference
} else {
thanEffectiveTip = *uint256.NewInt(than.minTip)
}
}
if effectiveTip.Cmp(&thanEffectiveTip) != 0 {
return effectiveTip.Cmp(&thanEffectiveTip) > 0
}
// Compare nonce and cumulative balance. Just as a side note, it doesn't
// matter if they're from same sender or not because we're comparing
// nonce distance of the sender from state's nonce and not the actual
// value of nonce.
if mt.nonceDistance != than.nonceDistance {
return mt.nonceDistance < than.nonceDistance
}
if mt.cumulativeBalanceDistance != than.cumulativeBalanceDistance {
return mt.cumulativeBalanceDistance < than.cumulativeBalanceDistance
}
case BaseFeeSubPool:
if mt.minFeeCap.Cmp(&than.minFeeCap) != 0 {
return mt.minFeeCap.Cmp(&than.minFeeCap) > 0
}
case QueuedSubPool:
if mt.nonceDistance != than.nonceDistance {
return mt.nonceDistance < than.nonceDistance
}
if mt.cumulativeBalanceDistance != than.cumulativeBalanceDistance {
return mt.cumulativeBalanceDistance < than.cumulativeBalanceDistance
}
}
return mt.timestamp < than.timestamp
}
func (mt *metaTx) worse(than *metaTx, pendingBaseFee uint256.Int) bool {
subPool := mt.subPool
thanSubPool := than.subPool
if mt.minFeeCap.Cmp(&pendingBaseFee) >= 0 {
subPool |= EnoughFeeCapBlock
}
if than.minFeeCap.Cmp(&pendingBaseFee) >= 0 {
thanSubPool |= EnoughFeeCapBlock
}
if subPool != thanSubPool {
return subPool < thanSubPool
}
switch mt.currentSubPool {
case PendingSubPool:
if mt.minFeeCap != than.minFeeCap {
return mt.minFeeCap.Cmp(&than.minFeeCap) < 0
}
if mt.nonceDistance != than.nonceDistance {
return mt.nonceDistance > than.nonceDistance
}
if mt.cumulativeBalanceDistance != than.cumulativeBalanceDistance {
return mt.cumulativeBalanceDistance > than.cumulativeBalanceDistance
}
case BaseFeeSubPool, QueuedSubPool:
if mt.nonceDistance != than.nonceDistance {
return mt.nonceDistance > than.nonceDistance
}
if mt.cumulativeBalanceDistance != than.cumulativeBalanceDistance {
return mt.cumulativeBalanceDistance > than.cumulativeBalanceDistance
}
}
return mt.timestamp > than.timestamp
}
func (p BestQueue) Len() int { return len(p.ms) }
func (p BestQueue) Less(i, j int) bool {
return p.ms[i].better(p.ms[j], *uint256.NewInt(p.pendingBastFee))
}
func (p BestQueue) Swap(i, j int) {
p.ms[i], p.ms[j] = p.ms[j], p.ms[i]
p.ms[i].bestIndex = i
p.ms[j].bestIndex = j
}
func (p *BestQueue) Push(x interface{}) {
n := len(p.ms)
item := x.(*metaTx)
item.bestIndex = n
p.ms = append(p.ms, item)
}
func (p *BestQueue) Pop() interface{} {
old := p.ms
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.bestIndex = -1 // for safety
item.currentSubPool = 0 // for safety
p.ms = old[0 : n-1]
return item
}
type WorstQueue struct {
ms []*metaTx
pendingBaseFee uint64
}
func (p WorstQueue) Len() int { return len(p.ms) }
func (p WorstQueue) Less(i, j int) bool {
return p.ms[i].worse(p.ms[j], *uint256.NewInt(p.pendingBaseFee))
}
func (p WorstQueue) Swap(i, j int) {
p.ms[i], p.ms[j] = p.ms[j], p.ms[i]
p.ms[i].worstIndex = i
p.ms[j].worstIndex = j
}
func (p *WorstQueue) Push(x interface{}) {
n := len(p.ms)
item := x.(*metaTx)
item.worstIndex = n
p.ms = append(p.ms, x.(*metaTx))
}
func (p *WorstQueue) Pop() interface{} {
old := p.ms
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.worstIndex = -1 // for safety
item.currentSubPool = 0 // for safety
p.ms = old[0 : n-1]
return item
}