Fix txpool queue overflow (#9197)

When discarding spamming we need to remove the tx from the subpools as
well as the sender tx list. Otherwise the tx is ignored by other
operations and left in the subpool

As well as the fix here this PR also contains several changes to TX
TRACING and other logging to make it easier to see what is going on with
pool processing
This commit is contained in:
Mark Holt 2024-01-11 10:03:41 +00:00 committed by GitHub
parent 470f05ee6f
commit 7308e87c0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 135 additions and 95 deletions

View File

@ -74,6 +74,8 @@ var (
basefeeSubCounter = metrics.GetOrCreateGauge(`txpool_basefee`)
)
var TraceAll = false
// Pool is interface for the transaction pool
// This interface exists for the convenience of testing, and not yet because
// there are multiple implementations
@ -420,14 +422,14 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
return err
}
if err = removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil {
if err = p.removeMined(p.all, minedTxs.Txs); err != nil {
return err
}
var announcements types.Announcements
announcements, err = addTxsOnNewBlock(block, cacheView, stateChanges, p.senders, unwindTxs, /* newTxs */
pendingBaseFee, stateChanges.BlockGasLimit, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, p.logger)
announcements, err = p.addTxsOnNewBlock(block, cacheView, stateChanges, p.senders, unwindTxs, /* newTxs */
pendingBaseFee, stateChanges.BlockGasLimit, p.logger)
if err != nil {
return err
@ -436,7 +438,7 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
p.pending.EnforceWorstInvariants()
p.baseFee.EnforceInvariants()
p.queued.EnforceInvariants()
promote(p.pending, p.baseFee, p.queued, pendingBaseFee, pendingBlobFee, p.discardLocked, &announcements, p.logger)
p.promote(pendingBaseFee, pendingBlobFee, &announcements, p.logger)
p.pending.EnforceBestInvariants()
p.promoted.Reset()
p.promoted.AppendOther(announcements)
@ -487,8 +489,8 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error {
return err
}
announcements, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs,
p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger)
announcements, _, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs,
p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), true, p.logger)
if err != nil {
return err
}
@ -727,7 +729,7 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG
txs.Resize(uint(count))
if len(toRemove) > 0 {
for _, mt := range toRemove {
p.pending.Remove(mt)
p.pending.Remove(mt, "best", p.logger)
}
}
return true, count, nil
@ -848,13 +850,13 @@ func (p *TxPool) validateTx(txn *types.TxSlot, isLocal bool, stateCache kvcache.
}
if !isLocal && uint64(p.all.count(txn.SenderID)) > p.cfg.AccountSlots {
if txn.Traced {
log.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming idHash=%x slots=%d, limit=%d", txn.IDHash, p.all.count(txn.SenderID), p.cfg.AccountSlots))
p.logger.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming idHash=%x slots=%d, limit=%d", txn.IDHash, p.all.count(txn.SenderID), p.cfg.AccountSlots))
}
return txpoolcfg.Spammer
}
if !isLocal && p.all.blobCount(txn.SenderID) > p.cfg.BlobSlots {
if txn.Traced {
log.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming (too many blobs) idHash=%x slots=%d, limit=%d", txn.IDHash, p.all.count(txn.SenderID), p.cfg.AccountSlots))
p.logger.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming (too many blobs) idHash=%x slots=%d, limit=%d", txn.IDHash, p.all.count(txn.SenderID), p.cfg.AccountSlots))
}
return txpoolcfg.Spammer
}
@ -1076,7 +1078,19 @@ func (p *TxPool) punishSpammer(spammer uint64) {
count--
return count > 0
})
for _, mt := range txsToDelete {
switch mt.currentSubPool {
case PendingSubPool:
p.pending.Remove(mt, "punishSpammer", p.logger)
case BaseFeeSubPool:
p.baseFee.Remove(mt, "punishSpammer", p.logger)
case QueuedSubPool:
p.queued.Remove(mt, "punishSpammer", p.logger)
default:
//already removed
}
p.discardLocked(mt, txpoolcfg.Spammer) // can't call it while iterating by all
}
}
@ -1122,8 +1136,8 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots,
return nil, err
}
announcements, addReasons, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs,
p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger)
announcements, addReasons, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs,
p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), true, p.logger)
if err == nil {
for i, reason := range addReasons {
if reason != txpoolcfg.NotSet {
@ -1159,11 +1173,9 @@ func (p *TxPool) coreDBWithCache() (kv.RoDB, kvcache.Cache) {
defer p.lock.Unlock()
return p._chainDB, p._stateCache
}
func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
newTxs types.TxSlots, pendingBaseFee, pendingBlobFee, blockGasLimit uint64,
pending *PendingPool, baseFee, queued *SubPool,
byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) txpoolcfg.DiscardReason, discard func(*metaTx, txpoolcfg.DiscardReason), collect bool,
logger log.Logger) (types.Announcements, []txpoolcfg.DiscardReason, error) {
func (p *TxPool) addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
newTxs types.TxSlots, pendingBaseFee, pendingBlobFee, blockGasLimit uint64, collect bool, logger log.Logger) (types.Announcements, []txpoolcfg.DiscardReason, error) {
if assert.Enable {
for _, txn := range newTxs.Txs {
if txn.SenderID == 0 {
@ -1171,6 +1183,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
}
}
}
// This can be thought of a reverse operation from the one described before.
// When a block that was deemed "the best" of its height, is no longer deemed "the best", the
// transactions contained in it, are now viable for inclusion in other blocks, and therefore should
@ -1184,7 +1197,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
discardReasons := make([]txpoolcfg.DiscardReason, len(newTxs.Txs))
announcements := types.Announcements{}
for i, txn := range newTxs.Txs {
if found, ok := byHash[string(txn.IDHash[:])]; ok {
if found, ok := p.byHash[string(txn.IDHash[:])]; ok {
discardReasons[i] = txpoolcfg.DuplicateHash
// In case if the transition is stuck, "poke" it to rebroadcast
if collect && newTxs.IsLocal[i] && (found.currentSubPool == PendingSubPool || found.currentSubPool == BaseFeeSubPool) {
@ -1193,7 +1206,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
continue
}
mt := newMetaTx(txn, newTxs.IsLocal[i], blockNum)
if reason := add(mt, &announcements); reason != txpoolcfg.NotSet {
if reason := p.addLocked(mt, &announcements); reason != txpoolcfg.NotSet {
discardReasons[i] = reason
continue
}
@ -1209,22 +1222,18 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
if err != nil {
return announcements, discardReasons, err
}
onSenderStateChange(senderID, nonce, balance, byNonce,
blockGasLimit, pending, baseFee, queued, discard, logger)
p.onSenderStateChange(senderID, nonce, balance, blockGasLimit, logger)
}
promote(pending, baseFee, queued, pendingBaseFee, pendingBlobFee, discard, &announcements, logger)
pending.EnforceBestInvariants()
p.promote(pendingBaseFee, pendingBlobFee, &announcements, logger)
p.pending.EnforceBestInvariants()
return announcements, discardReasons, nil
}
// TODO: Looks like a copy of the above
func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges *remote.StateChangeBatch,
senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee uint64, blockGasLimit uint64,
pending *PendingPool, baseFee, queued *SubPool,
byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) txpoolcfg.DiscardReason, discard func(*metaTx, txpoolcfg.DiscardReason),
logger log.Logger) (types.Announcements, error) {
func (p *TxPool) addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges *remote.StateChangeBatch,
senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee uint64, blockGasLimit uint64, logger log.Logger) (types.Announcements, error) {
if assert.Enable {
for _, txn := range newTxs.Txs {
if txn.SenderID == 0 {
@ -1244,12 +1253,12 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges
sendersWithChangedState := map[uint64]struct{}{}
announcements := types.Announcements{}
for i, txn := range newTxs.Txs {
if _, ok := byHash[string(txn.IDHash[:])]; ok {
if _, ok := p.byHash[string(txn.IDHash[:])]; ok {
continue
}
mt := newMetaTx(txn, newTxs.IsLocal[i], blockNum)
if reason := add(mt, &announcements); reason != txpoolcfg.NotSet {
discard(mt, reason)
if reason := p.addLocked(mt, &announcements); reason != txpoolcfg.NotSet {
p.discardLocked(mt, reason)
continue
}
sendersWithChangedState[mt.Tx.SenderID] = struct{}{}
@ -1277,8 +1286,7 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges
if err != nil {
return announcements, err
}
onSenderStateChange(senderID, nonce, balance, byNonce,
blockGasLimit, pending, baseFee, queued, discard, logger)
p.onSenderStateChange(senderID, nonce, balance, blockGasLimit, logger)
}
return announcements, nil
@ -1345,11 +1353,11 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo
switch found.currentSubPool {
case PendingSubPool:
p.pending.Remove(found)
p.pending.Remove(found, "add", p.logger)
case BaseFeeSubPool:
p.baseFee.Remove(found)
p.baseFee.Remove(found, "add", p.logger)
case QueuedSubPool:
p.queued.Remove(found)
p.queued.Remove(found, "add", p.logger)
default:
//already removed
}
@ -1365,7 +1373,7 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo
hashStr := string(mt.Tx.IDHash[:])
p.byHash[hashStr] = mt
if replaced := p.all.replaceOrInsert(mt); replaced != nil {
if replaced := p.all.replaceOrInsert(mt, p.logger); replaced != nil {
if assert.Enable {
panic("must never happen")
}
@ -1375,7 +1383,7 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo
p.isLocalLRU.Add(hashStr, struct{}{})
}
// All transactions are first added to the queued pool and then immediately promoted from there if required
p.queued.Add(mt, p.logger)
p.queued.Add(mt, "addLocked", p.logger)
// Remove from mined cache as we are now "resurrecting" it to a sub-pool
p.deleteMinedBlobTxn(hashStr)
return txpoolcfg.NotSet
@ -1387,7 +1395,7 @@ func (p *TxPool) discardLocked(mt *metaTx, reason txpoolcfg.DiscardReason) {
hashStr := string(mt.Tx.IDHash[:])
delete(p.byHash, hashStr)
p.deletedTxs = append(p.deletedTxs, mt)
p.all.delete(mt)
p.all.delete(mt, reason, p.logger)
p.discardReasonsLRU.Add(hashStr, reason)
}
@ -1451,7 +1459,7 @@ func (p *TxPool) NonceFromAddress(addr [20]byte) (nonce uint64, inPool bool) {
// modify state_balance and state_nonce, potentially remove some elements (if transaction with some nonce is
// included into a block), and finally, walk over the transaction records and update SubPool fields depending on
// the actual presence of nonce gaps and what the balance is.
func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *PendingPool, baseFee, queued *SubPool, discard func(*metaTx, txpoolcfg.DiscardReason), logger log.Logger) error {
func (p *TxPool) removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot) error {
noncesToRemove := map[uint64]uint64{}
for _, txn := range minedTxs {
nonce, ok := noncesToRemove[txn.SenderID]
@ -1468,18 +1476,17 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P
queuedRemoved := 0
for senderID, nonce := range noncesToRemove {
byNonce.ascend(senderID, func(mt *metaTx) bool {
if mt.Tx.Nonce > nonce {
if mt.Tx.Traced {
logger.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", mt.Tx.Nonce, "sender.nonce", nonce)
p.logger.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", mt.Tx.Nonce, "sender.nonce", nonce)
}
return false
}
if mt.Tx.Traced {
logger.Info(fmt.Sprintf("TX TRACING: removeMined idHash=%x senderId=%d, currentSubPool=%s", mt.Tx.IDHash, mt.Tx.SenderID, mt.currentSubPool))
p.logger.Info("TX TRACING: removeMined", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "senderId", mt.Tx.SenderID, "nonce", mt.Tx.Nonce, "currentSubPool", mt.currentSubPool)
}
toDel = append(toDel, mt)
@ -1487,13 +1494,13 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P
switch mt.currentSubPool {
case PendingSubPool:
pendingRemoved++
pending.Remove(mt)
p.pending.Remove(mt, "remove-mined", p.logger)
case BaseFeeSubPool:
baseFeeRemoved++
baseFee.Remove(mt)
p.baseFee.Remove(mt, "remove-mined", p.logger)
case QueuedSubPool:
queuedRemoved++
queued.Remove(mt)
p.queued.Remove(mt, "remove-mined", p.logger)
default:
//already removed
}
@ -1503,13 +1510,13 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P
discarded += len(toDel)
for _, mt := range toDel {
discard(mt, txpoolcfg.Mined)
p.discardLocked(mt, txpoolcfg.Mined)
}
toDel = toDel[:0]
}
if discarded > 0 {
logger.Debug("Discarding Transactions", "count", discarded, "pending", pendingRemoved, "baseFee", baseFeeRemoved, "queued", queuedRemoved)
p.logger.Debug("Discarded transactions", "count", discarded, "pending", pendingRemoved, "baseFee", baseFeeRemoved, "queued", queuedRemoved)
}
return nil
@ -1519,17 +1526,14 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P
// which sub pool they will need to go to. Since this depends on other transactions from the same sender by with lower
// nonces, and also affect other transactions from the same sender with higher nonce, it loops through all transactions
// for a given senderID
func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint256.Int, byNonce *BySenderAndNonce,
blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, discard func(*metaTx, txpoolcfg.DiscardReason), logger log.Logger) {
func (p *TxPool) onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint256.Int, blockGasLimit uint64, logger log.Logger) {
noGapsNonce := senderNonce
cumulativeRequiredBalance := uint256.NewInt(0)
minFeeCap := uint256.NewInt(0).SetAllOne()
minTip := uint64(math.MaxUint64)
var toDel []*metaTx // can't delete items while iterate them
byNonce.ascend(senderID, func(mt *metaTx) bool {
if mt.Tx.Traced {
logger.Info(fmt.Sprintf("TX TRACING: onSenderStateChange loop iteration idHash=%x senderID=%d, senderNonce=%d, txn.nonce=%d, currentSubPool=%s", mt.Tx.IDHash, senderID, senderNonce, mt.Tx.Nonce, mt.currentSubPool))
}
p.all.ascend(senderID, func(mt *metaTx) bool {
deleteAndContinueReasonLog := ""
if senderNonce > mt.Tx.Nonce {
deleteAndContinueReasonLog = "low nonce"
@ -1538,16 +1542,16 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint
}
if deleteAndContinueReasonLog != "" {
if mt.Tx.Traced {
logger.Info(fmt.Sprintf("TX TRACING: removing due to %s for idHash=%x senderID=%d, senderNonce=%d, txn.nonce=%d, currentSubPool=%s", deleteAndContinueReasonLog, mt.Tx.IDHash, senderID, senderNonce, mt.Tx.Nonce, mt.currentSubPool))
logger.Info("TX TRACING: onSenderStateChange loop iteration remove", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "senderID", senderID, "senderNonce", senderNonce, "txn.nonce", mt.Tx.Nonce, "currentSubPool", mt.currentSubPool, "reason", deleteAndContinueReasonLog)
}
// del from sub-pool
switch mt.currentSubPool {
case PendingSubPool:
pending.Remove(mt)
p.pending.Remove(mt, deleteAndContinueReasonLog, p.logger)
case BaseFeeSubPool:
baseFee.Remove(mt)
p.baseFee.Remove(mt, deleteAndContinueReasonLog, p.logger)
case QueuedSubPool:
queued.Remove(mt)
p.queued.Remove(mt, deleteAndContinueReasonLog, p.logger)
default:
//already removed
}
@ -1605,60 +1609,62 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint
}
if mt.Tx.Traced {
logger.Info(fmt.Sprintf("TX TRACING: onSenderStateChange loop iteration idHash=%x senderId=%d subPool=%b", mt.Tx.IDHash, mt.Tx.SenderID, mt.subPool))
logger.Info("TX TRACING: onSenderStateChange loop iteration update", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "senderId", mt.Tx.SenderID, "nonce", mt.Tx.Nonce, "subPool", mt.currentSubPool)
}
// Some fields of mt might have changed, need to fix the invariants in the subpool best and worst queues
switch mt.currentSubPool {
case PendingSubPool:
pending.Updated(mt)
p.pending.Updated(mt)
case BaseFeeSubPool:
baseFee.Updated(mt)
p.baseFee.Updated(mt)
case QueuedSubPool:
queued.Updated(mt)
p.queued.Updated(mt)
}
return true
})
for _, mt := range toDel {
discard(mt, txpoolcfg.NonceTooLow)
p.discardLocked(mt, txpoolcfg.NonceTooLow)
}
logger.Trace("[txpool] onSenderStateChange", "sender", senderID, "count", p.all.count(senderID), "pending", p.pending.Len(), "baseFee", p.baseFee.Len(), "queued", p.queued.Len())
}
// promote reasserts invariants of the subpool and returns the list of transactions that ended up
// being promoted to the pending or basefee pool, for re-broadcasting
func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint64, pendingBlobFee uint64, discard func(*metaTx, txpoolcfg.DiscardReason), announcements *types.Announcements,
logger log.Logger) {
func (p *TxPool) promote(pendingBaseFee uint64, pendingBlobFee uint64, announcements *types.Announcements, logger log.Logger) {
// Demote worst transactions that do not qualify for pending sub pool anymore, to other sub pools, or discard
for worst := pending.Worst(); pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.LtUint64(pendingBaseFee) || (worst.Tx.Type == types.BlobTxType && worst.Tx.BlobFeeCap.LtUint64(pendingBlobFee))); worst = pending.Worst() {
for worst := p.pending.Worst(); p.pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.LtUint64(pendingBaseFee) || (worst.Tx.Type == types.BlobTxType && worst.Tx.BlobFeeCap.LtUint64(pendingBlobFee))); worst = p.pending.Worst() {
if worst.subPool >= BaseFeePoolBits {
tx := pending.PopWorst()
tx := p.pending.PopWorst()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
baseFee.Add(tx, logger)
p.baseFee.Add(tx, "demote-pending", logger)
} else {
queued.Add(pending.PopWorst(), logger)
p.queued.Add(p.pending.PopWorst(), "demote-pending", logger)
}
}
// Promote best transactions from base fee pool to pending pool while they qualify
for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.CmpUint64(pendingBaseFee) >= 0 && (best.Tx.Type != types.BlobTxType || best.Tx.BlobFeeCap.CmpUint64(pendingBlobFee) >= 0); best = baseFee.Best() {
tx := baseFee.PopBest()
for best := p.baseFee.Best(); p.baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.CmpUint64(pendingBaseFee) >= 0 && (best.Tx.Type != types.BlobTxType || best.Tx.BlobFeeCap.CmpUint64(pendingBlobFee) >= 0); best = p.baseFee.Best() {
tx := p.baseFee.PopBest()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
pending.Add(tx, logger)
p.pending.Add(tx, logger)
}
// Demote worst transactions that do not qualify for base fee pool anymore, to queued sub pool, or discard
for worst := baseFee.Worst(); baseFee.Len() > 0 && worst.subPool < BaseFeePoolBits; worst = baseFee.Worst() {
queued.Add(baseFee.PopWorst(), logger)
for worst := p.baseFee.Worst(); p.baseFee.Len() > 0 && worst.subPool < BaseFeePoolBits; worst = p.baseFee.Worst() {
p.queued.Add(p.baseFee.PopWorst(), "demote-base", logger)
}
// Promote best transactions from the queued pool to either pending or base fee pool, while they qualify
for best := queued.Best(); queued.Len() > 0 && best.subPool >= BaseFeePoolBits; best = queued.Best() {
for best := p.queued.Best(); p.queued.Len() > 0 && best.subPool >= BaseFeePoolBits; best = p.queued.Best() {
if best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0 {
tx := queued.PopBest()
tx := p.queued.PopBest()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
pending.Add(tx, logger)
p.pending.Add(tx, logger)
} else {
baseFee.Add(queued.PopBest(), logger)
p.baseFee.Add(p.queued.PopBest(), "promote-queued", logger)
}
}
@ -1666,18 +1672,18 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint
// <FUNCTIONALITY REMOVED>
// Discard worst transactions from pending pool until it is within capacity limit
for pending.Len() > pending.limit {
discard(pending.PopWorst(), txpoolcfg.PendingPoolOverflow)
for p.pending.Len() > p.pending.limit {
p.discardLocked(p.pending.PopWorst(), txpoolcfg.PendingPoolOverflow)
}
// Discard worst transactions from pending sub pool until it is within capacity limits
for baseFee.Len() > baseFee.limit {
discard(baseFee.PopWorst(), txpoolcfg.BaseFeePoolOverflow)
for p.baseFee.Len() > p.baseFee.limit {
p.discardLocked(p.baseFee.PopWorst(), txpoolcfg.BaseFeePoolOverflow)
}
// Discard worst transactions from the queued sub pool until it is within its capacity limits
for _ = queued.Worst(); queued.Len() > queued.limit; _ = queued.Worst() {
discard(queued.PopWorst(), txpoolcfg.QueuedPoolOverflow)
for _ = p.queued.Worst(); p.queued.Len() > p.queued.limit; _ = p.queued.Worst() {
p.discardLocked(p.queued.PopWorst(), txpoolcfg.QueuedPoolOverflow)
}
}
@ -2091,8 +2097,8 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
if err != nil {
return err
}
if _, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs,
pendingBaseFee, pendingBlobFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, false, p.logger); err != nil {
if _, _, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs,
pendingBaseFee, pendingBlobFee, math.MaxUint64 /* blockGasLimit */, false, p.logger); err != nil {
return err
}
p.pendingBaseFee.Store(pendingBaseFee)
@ -2288,6 +2294,11 @@ func (sc *sendersBatch) getID(addr common.Address) (uint64, bool) {
}
func (sc *sendersBatch) getOrCreateID(addr common.Address, logger log.Logger) (uint64, bool) {
_, traced := sc.tracedSenders[addr]
if !traced {
traced = TraceAll
}
id, ok := sc.senderIDs[addr]
if !ok {
sc.senderID++
@ -2371,11 +2382,13 @@ func (b *BySenderAndNonce) nonce(senderID uint64) (nonce uint64, ok bool) {
})
return nonce, ok
}
func (b *BySenderAndNonce) ascendAll(f func(*metaTx) bool) {
b.tree.Ascend(func(mt *metaTx) bool {
return f(mt)
})
}
func (b *BySenderAndNonce) ascend(senderID uint64, f func(*metaTx) bool) {
s := b.search
s.Tx.SenderID = senderID
@ -2387,6 +2400,7 @@ func (b *BySenderAndNonce) ascend(senderID uint64, f func(*metaTx) bool) {
return f(mt)
})
}
func (b *BySenderAndNonce) descend(senderID uint64, f func(*metaTx) bool) {
s := b.search
s.Tx.SenderID = senderID
@ -2398,12 +2412,15 @@ func (b *BySenderAndNonce) descend(senderID uint64, f func(*metaTx) bool) {
return f(mt)
})
}
func (b *BySenderAndNonce) count(senderID uint64) int {
return b.senderIDTxnCount[senderID]
}
func (b *BySenderAndNonce) blobCount(senderID uint64) uint64 {
return b.senderIDBlobCount[senderID]
}
func (b *BySenderAndNonce) hasTxs(senderID uint64) bool {
has := false
b.ascend(senderID, func(*metaTx) bool {
@ -2412,6 +2429,7 @@ func (b *BySenderAndNonce) hasTxs(senderID uint64) bool {
})
return has
}
func (b *BySenderAndNonce) get(senderID, txNonce uint64) *metaTx {
s := b.search
s.Tx.SenderID = senderID
@ -2426,8 +2444,13 @@ func (b *BySenderAndNonce) get(senderID, txNonce uint64) *metaTx {
func (b *BySenderAndNonce) has(mt *metaTx) bool {
return b.tree.Has(mt)
}
func (b *BySenderAndNonce) delete(mt *metaTx) {
func (b *BySenderAndNonce) delete(mt *metaTx, reason txpoolcfg.DiscardReason, logger log.Logger) {
if _, ok := b.tree.Delete(mt); ok {
if mt.Tx.Traced {
logger.Info("TX TRACING: Deleted tx by nonce", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "sender", mt.Tx.SenderID, "nonce", mt.Tx.Nonce, "reason", reason)
}
senderID := mt.Tx.SenderID
count := b.senderIDTxnCount[senderID]
if count > 1 {
@ -2447,11 +2470,21 @@ func (b *BySenderAndNonce) delete(mt *metaTx) {
}
}
}
func (b *BySenderAndNonce) replaceOrInsert(mt *metaTx) *metaTx {
func (b *BySenderAndNonce) replaceOrInsert(mt *metaTx, logger log.Logger) *metaTx {
it, ok := b.tree.ReplaceOrInsert(mt)
if ok {
if mt.Tx.Traced {
logger.Info("TX TRACING: Replaced tx by nonce", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "sender", mt.Tx.SenderID, "nonce", mt.Tx.Nonce)
}
return it
}
if mt.Tx.Traced {
logger.Info("TX TRACING: Inserted tx by nonce", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "sender", mt.Tx.SenderID, "nonce", mt.Tx.Nonce)
}
b.senderIDTxnCount[mt.Tx.SenderID]++
if mt.Tx.Type == types.BlobTxType && mt.Tx.Blobs != nil {
b.senderIDBlobCount[mt.Tx.SenderID] += uint64(len(mt.Tx.Blobs))
@ -2530,7 +2563,10 @@ func (p *PendingPool) Updated(mt *metaTx) {
}
func (p *PendingPool) Len() int { return len(p.best.ms) }
func (p *PendingPool) Remove(i *metaTx) {
func (p *PendingPool) Remove(i *metaTx, reason string, logger log.Logger) {
if i.Tx.Traced {
logger.Info(fmt.Sprintf("TX TRACING: removed from subpool %s", p.t), "idHash", fmt.Sprintf("%x", i.Tx.IDHash), "sender", i.Tx.SenderID, "nonce", i.Tx.Nonce, "reason", reason)
}
if i.worstIndex >= 0 {
heap.Remove(p.worst, i.worstIndex)
}
@ -2542,7 +2578,7 @@ func (p *PendingPool) Remove(i *metaTx) {
func (p *PendingPool) Add(i *metaTx, logger log.Logger) {
if i.Tx.Traced {
logger.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IDHash, i.Tx.SenderID))
logger.Info(fmt.Sprintf("TX TRACING: added to subpool %s, IdHash=%x, sender=%d, nonce=%d", p.t, i.Tx.IDHash, i.Tx.SenderID, i.Tx.Nonce))
}
i.currentSubPool = p.t
heap.Push(p.worst, i)
@ -2595,16 +2631,19 @@ func (p *SubPool) PopWorst() *metaTx { //nolint
return i
}
func (p *SubPool) Len() int { return p.best.Len() }
func (p *SubPool) Add(i *metaTx, logger log.Logger) {
func (p *SubPool) Add(i *metaTx, reason string, logger log.Logger) {
if i.Tx.Traced {
logger.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IDHash, i.Tx.SenderID))
logger.Info(fmt.Sprintf("TX TRACING: added to subpool %s", p.t), "idHash", fmt.Sprintf("%x", i.Tx.IDHash), "sender", i.Tx.SenderID, "nonce", i.Tx.Nonce, "reason", reason)
}
i.currentSubPool = p.t
heap.Push(p.best, i)
heap.Push(p.worst, i)
}
func (p *SubPool) Remove(i *metaTx) {
func (p *SubPool) Remove(i *metaTx, reason string, logger log.Logger) {
if i.Tx.Traced {
logger.Info(fmt.Sprintf("TX TRACING: removed from subpool %s", p.t), "idHash", fmt.Sprintf("%x", i.Tx.IDHash), "sender", i.Tx.SenderID, "nonce", i.Tx.Nonce, "reason", reason)
}
heap.Remove(p.best, i.bestIndex)
heap.Remove(p.worst, i.worstIndex)
i.currentSubPool = 0

View File

@ -1145,7 +1145,7 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient
select {
case stateChanges := <-stateChangeCh:
block := stateChanges.BlockHeight
s.logger.Debug("Start mining new block based on previous block", "block", block)
s.logger.Debug("Start mining based on previous block", "block", block)
// TODO - can do mining clean up here as we have previous
// block info in the state channel
hasWork = true
@ -1154,11 +1154,11 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient
// Skip mining based on new tx notif for bor consensus
hasWork = s.chainConfig.Bor == nil
if hasWork {
s.logger.Debug("Start mining new block based on txpool notif")
s.logger.Debug("Start mining based on txpool notif")
}
case <-mineEvery.C:
if !(working || waiting.Load()) {
s.logger.Debug("Start mining new block based on miner.recommit", "duration", miner.MiningConfig.Recommit)
s.logger.Debug("Start mining based on miner.recommit", "duration", miner.MiningConfig.Recommit)
}
hasWork = !(working || waiting.Load())
case err := <-errc:

View File

@ -192,6 +192,7 @@ func BorHeimdallForward(
defer logTimer.Stop()
logger.Info("["+s.LogPrefix()+"] Processing sync events...", "from", lastBlockNum+1, "to", headNumber)
for blockNum = lastBlockNum + 1; blockNum <= headNumber; blockNum++ {
select {
default: