From 7308e87c0ed8afbf4dd91be10f05078a1c5745d2 Mon Sep 17 00:00:00 2001 From: Mark Holt <135143369+mh0lt@users.noreply.github.com> Date: Thu, 11 Jan 2024 10:03:41 +0000 Subject: [PATCH] Fix txpool queue overflow (#9197) When discarding spamming we need to remove the tx from the subpools as well as the sender tx list. Otherwise the tx is ignored by other operations and left in the subpool As well as the fix here this PR also contains several changes to TX TRACING and other logging to make it easier to see what is going on with pool processing --- erigon-lib/txpool/pool.go | 223 ++++++++++++++++----------- eth/backend.go | 6 +- eth/stagedsync/stage_bor_heimdall.go | 1 + 3 files changed, 135 insertions(+), 95 deletions(-) diff --git a/erigon-lib/txpool/pool.go b/erigon-lib/txpool/pool.go index 45dc6f115..e6e479815 100644 --- a/erigon-lib/txpool/pool.go +++ b/erigon-lib/txpool/pool.go @@ -74,6 +74,8 @@ var ( basefeeSubCounter = metrics.GetOrCreateGauge(`txpool_basefee`) ) +var TraceAll = false + // Pool is interface for the transaction pool // This interface exists for the convenience of testing, and not yet because // there are multiple implementations @@ -420,14 +422,14 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang return err } - if err = removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil { + if err = p.removeMined(p.all, minedTxs.Txs); err != nil { return err } var announcements types.Announcements - announcements, err = addTxsOnNewBlock(block, cacheView, stateChanges, p.senders, unwindTxs, /* newTxs */ - pendingBaseFee, stateChanges.BlockGasLimit, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, p.logger) + announcements, err = p.addTxsOnNewBlock(block, cacheView, stateChanges, p.senders, unwindTxs, /* newTxs */ + pendingBaseFee, stateChanges.BlockGasLimit, p.logger) if err != nil { return err @@ -436,7 +438,7 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang p.pending.EnforceWorstInvariants() p.baseFee.EnforceInvariants() p.queued.EnforceInvariants() - promote(p.pending, p.baseFee, p.queued, pendingBaseFee, pendingBlobFee, p.discardLocked, &announcements, p.logger) + p.promote(pendingBaseFee, pendingBlobFee, &announcements, p.logger) p.pending.EnforceBestInvariants() p.promoted.Reset() p.promoted.AppendOther(announcements) @@ -487,8 +489,8 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error { return err } - announcements, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, - p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger) + announcements, _, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, + p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), true, p.logger) if err != nil { return err } @@ -727,7 +729,7 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG txs.Resize(uint(count)) if len(toRemove) > 0 { for _, mt := range toRemove { - p.pending.Remove(mt) + p.pending.Remove(mt, "best", p.logger) } } return true, count, nil @@ -848,13 +850,13 @@ func (p *TxPool) validateTx(txn *types.TxSlot, isLocal bool, stateCache kvcache. } if !isLocal && uint64(p.all.count(txn.SenderID)) > p.cfg.AccountSlots { if txn.Traced { - log.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming idHash=%x slots=%d, limit=%d", txn.IDHash, p.all.count(txn.SenderID), p.cfg.AccountSlots)) + p.logger.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming idHash=%x slots=%d, limit=%d", txn.IDHash, p.all.count(txn.SenderID), p.cfg.AccountSlots)) } return txpoolcfg.Spammer } if !isLocal && p.all.blobCount(txn.SenderID) > p.cfg.BlobSlots { if txn.Traced { - log.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming (too many blobs) idHash=%x slots=%d, limit=%d", txn.IDHash, p.all.count(txn.SenderID), p.cfg.AccountSlots)) + p.logger.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming (too many blobs) idHash=%x slots=%d, limit=%d", txn.IDHash, p.all.count(txn.SenderID), p.cfg.AccountSlots)) } return txpoolcfg.Spammer } @@ -1076,7 +1078,19 @@ func (p *TxPool) punishSpammer(spammer uint64) { count-- return count > 0 }) + for _, mt := range txsToDelete { + switch mt.currentSubPool { + case PendingSubPool: + p.pending.Remove(mt, "punishSpammer", p.logger) + case BaseFeeSubPool: + p.baseFee.Remove(mt, "punishSpammer", p.logger) + case QueuedSubPool: + p.queued.Remove(mt, "punishSpammer", p.logger) + default: + //already removed + } + p.discardLocked(mt, txpoolcfg.Spammer) // can't call it while iterating by all } } @@ -1122,8 +1136,8 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, return nil, err } - announcements, addReasons, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, - p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger) + announcements, addReasons, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, + p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), true, p.logger) if err == nil { for i, reason := range addReasons { if reason != txpoolcfg.NotSet { @@ -1159,11 +1173,9 @@ func (p *TxPool) coreDBWithCache() (kv.RoDB, kvcache.Cache) { defer p.lock.Unlock() return p._chainDB, p._stateCache } -func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, - newTxs types.TxSlots, pendingBaseFee, pendingBlobFee, blockGasLimit uint64, - pending *PendingPool, baseFee, queued *SubPool, - byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) txpoolcfg.DiscardReason, discard func(*metaTx, txpoolcfg.DiscardReason), collect bool, - logger log.Logger) (types.Announcements, []txpoolcfg.DiscardReason, error) { + +func (p *TxPool) addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, + newTxs types.TxSlots, pendingBaseFee, pendingBlobFee, blockGasLimit uint64, collect bool, logger log.Logger) (types.Announcements, []txpoolcfg.DiscardReason, error) { if assert.Enable { for _, txn := range newTxs.Txs { if txn.SenderID == 0 { @@ -1171,6 +1183,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, } } } + // This can be thought of a reverse operation from the one described before. // When a block that was deemed "the best" of its height, is no longer deemed "the best", the // transactions contained in it, are now viable for inclusion in other blocks, and therefore should @@ -1184,7 +1197,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, discardReasons := make([]txpoolcfg.DiscardReason, len(newTxs.Txs)) announcements := types.Announcements{} for i, txn := range newTxs.Txs { - if found, ok := byHash[string(txn.IDHash[:])]; ok { + if found, ok := p.byHash[string(txn.IDHash[:])]; ok { discardReasons[i] = txpoolcfg.DuplicateHash // In case if the transition is stuck, "poke" it to rebroadcast if collect && newTxs.IsLocal[i] && (found.currentSubPool == PendingSubPool || found.currentSubPool == BaseFeeSubPool) { @@ -1193,7 +1206,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, continue } mt := newMetaTx(txn, newTxs.IsLocal[i], blockNum) - if reason := add(mt, &announcements); reason != txpoolcfg.NotSet { + if reason := p.addLocked(mt, &announcements); reason != txpoolcfg.NotSet { discardReasons[i] = reason continue } @@ -1209,22 +1222,18 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, if err != nil { return announcements, discardReasons, err } - onSenderStateChange(senderID, nonce, balance, byNonce, - blockGasLimit, pending, baseFee, queued, discard, logger) + p.onSenderStateChange(senderID, nonce, balance, blockGasLimit, logger) } - promote(pending, baseFee, queued, pendingBaseFee, pendingBlobFee, discard, &announcements, logger) - pending.EnforceBestInvariants() + p.promote(pendingBaseFee, pendingBlobFee, &announcements, logger) + p.pending.EnforceBestInvariants() return announcements, discardReasons, nil } // TODO: Looks like a copy of the above -func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges *remote.StateChangeBatch, - senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee uint64, blockGasLimit uint64, - pending *PendingPool, baseFee, queued *SubPool, - byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) txpoolcfg.DiscardReason, discard func(*metaTx, txpoolcfg.DiscardReason), - logger log.Logger) (types.Announcements, error) { +func (p *TxPool) addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges *remote.StateChangeBatch, + senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee uint64, blockGasLimit uint64, logger log.Logger) (types.Announcements, error) { if assert.Enable { for _, txn := range newTxs.Txs { if txn.SenderID == 0 { @@ -1244,12 +1253,12 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges sendersWithChangedState := map[uint64]struct{}{} announcements := types.Announcements{} for i, txn := range newTxs.Txs { - if _, ok := byHash[string(txn.IDHash[:])]; ok { + if _, ok := p.byHash[string(txn.IDHash[:])]; ok { continue } mt := newMetaTx(txn, newTxs.IsLocal[i], blockNum) - if reason := add(mt, &announcements); reason != txpoolcfg.NotSet { - discard(mt, reason) + if reason := p.addLocked(mt, &announcements); reason != txpoolcfg.NotSet { + p.discardLocked(mt, reason) continue } sendersWithChangedState[mt.Tx.SenderID] = struct{}{} @@ -1277,8 +1286,7 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges if err != nil { return announcements, err } - onSenderStateChange(senderID, nonce, balance, byNonce, - blockGasLimit, pending, baseFee, queued, discard, logger) + p.onSenderStateChange(senderID, nonce, balance, blockGasLimit, logger) } return announcements, nil @@ -1345,11 +1353,11 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo switch found.currentSubPool { case PendingSubPool: - p.pending.Remove(found) + p.pending.Remove(found, "add", p.logger) case BaseFeeSubPool: - p.baseFee.Remove(found) + p.baseFee.Remove(found, "add", p.logger) case QueuedSubPool: - p.queued.Remove(found) + p.queued.Remove(found, "add", p.logger) default: //already removed } @@ -1365,7 +1373,7 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo hashStr := string(mt.Tx.IDHash[:]) p.byHash[hashStr] = mt - if replaced := p.all.replaceOrInsert(mt); replaced != nil { + if replaced := p.all.replaceOrInsert(mt, p.logger); replaced != nil { if assert.Enable { panic("must never happen") } @@ -1375,7 +1383,7 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo p.isLocalLRU.Add(hashStr, struct{}{}) } // All transactions are first added to the queued pool and then immediately promoted from there if required - p.queued.Add(mt, p.logger) + p.queued.Add(mt, "addLocked", p.logger) // Remove from mined cache as we are now "resurrecting" it to a sub-pool p.deleteMinedBlobTxn(hashStr) return txpoolcfg.NotSet @@ -1387,7 +1395,7 @@ func (p *TxPool) discardLocked(mt *metaTx, reason txpoolcfg.DiscardReason) { hashStr := string(mt.Tx.IDHash[:]) delete(p.byHash, hashStr) p.deletedTxs = append(p.deletedTxs, mt) - p.all.delete(mt) + p.all.delete(mt, reason, p.logger) p.discardReasonsLRU.Add(hashStr, reason) } @@ -1451,7 +1459,7 @@ func (p *TxPool) NonceFromAddress(addr [20]byte) (nonce uint64, inPool bool) { // modify state_balance and state_nonce, potentially remove some elements (if transaction with some nonce is // included into a block), and finally, walk over the transaction records and update SubPool fields depending on // the actual presence of nonce gaps and what the balance is. -func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *PendingPool, baseFee, queued *SubPool, discard func(*metaTx, txpoolcfg.DiscardReason), logger log.Logger) error { +func (p *TxPool) removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot) error { noncesToRemove := map[uint64]uint64{} for _, txn := range minedTxs { nonce, ok := noncesToRemove[txn.SenderID] @@ -1468,18 +1476,17 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P queuedRemoved := 0 for senderID, nonce := range noncesToRemove { - byNonce.ascend(senderID, func(mt *metaTx) bool { if mt.Tx.Nonce > nonce { if mt.Tx.Traced { - logger.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", mt.Tx.Nonce, "sender.nonce", nonce) + p.logger.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", mt.Tx.Nonce, "sender.nonce", nonce) } return false } if mt.Tx.Traced { - logger.Info(fmt.Sprintf("TX TRACING: removeMined idHash=%x senderId=%d, currentSubPool=%s", mt.Tx.IDHash, mt.Tx.SenderID, mt.currentSubPool)) + p.logger.Info("TX TRACING: removeMined", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "senderId", mt.Tx.SenderID, "nonce", mt.Tx.Nonce, "currentSubPool", mt.currentSubPool) } toDel = append(toDel, mt) @@ -1487,13 +1494,13 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P switch mt.currentSubPool { case PendingSubPool: pendingRemoved++ - pending.Remove(mt) + p.pending.Remove(mt, "remove-mined", p.logger) case BaseFeeSubPool: baseFeeRemoved++ - baseFee.Remove(mt) + p.baseFee.Remove(mt, "remove-mined", p.logger) case QueuedSubPool: queuedRemoved++ - queued.Remove(mt) + p.queued.Remove(mt, "remove-mined", p.logger) default: //already removed } @@ -1503,13 +1510,13 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P discarded += len(toDel) for _, mt := range toDel { - discard(mt, txpoolcfg.Mined) + p.discardLocked(mt, txpoolcfg.Mined) } toDel = toDel[:0] } if discarded > 0 { - logger.Debug("Discarding Transactions", "count", discarded, "pending", pendingRemoved, "baseFee", baseFeeRemoved, "queued", queuedRemoved) + p.logger.Debug("Discarded transactions", "count", discarded, "pending", pendingRemoved, "baseFee", baseFeeRemoved, "queued", queuedRemoved) } return nil @@ -1519,17 +1526,14 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P // which sub pool they will need to go to. Since this depends on other transactions from the same sender by with lower // nonces, and also affect other transactions from the same sender with higher nonce, it loops through all transactions // for a given senderID -func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint256.Int, byNonce *BySenderAndNonce, - blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, discard func(*metaTx, txpoolcfg.DiscardReason), logger log.Logger) { +func (p *TxPool) onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint256.Int, blockGasLimit uint64, logger log.Logger) { noGapsNonce := senderNonce cumulativeRequiredBalance := uint256.NewInt(0) minFeeCap := uint256.NewInt(0).SetAllOne() minTip := uint64(math.MaxUint64) var toDel []*metaTx // can't delete items while iterate them - byNonce.ascend(senderID, func(mt *metaTx) bool { - if mt.Tx.Traced { - logger.Info(fmt.Sprintf("TX TRACING: onSenderStateChange loop iteration idHash=%x senderID=%d, senderNonce=%d, txn.nonce=%d, currentSubPool=%s", mt.Tx.IDHash, senderID, senderNonce, mt.Tx.Nonce, mt.currentSubPool)) - } + + p.all.ascend(senderID, func(mt *metaTx) bool { deleteAndContinueReasonLog := "" if senderNonce > mt.Tx.Nonce { deleteAndContinueReasonLog = "low nonce" @@ -1538,16 +1542,16 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint } if deleteAndContinueReasonLog != "" { if mt.Tx.Traced { - logger.Info(fmt.Sprintf("TX TRACING: removing due to %s for idHash=%x senderID=%d, senderNonce=%d, txn.nonce=%d, currentSubPool=%s", deleteAndContinueReasonLog, mt.Tx.IDHash, senderID, senderNonce, mt.Tx.Nonce, mt.currentSubPool)) + logger.Info("TX TRACING: onSenderStateChange loop iteration remove", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "senderID", senderID, "senderNonce", senderNonce, "txn.nonce", mt.Tx.Nonce, "currentSubPool", mt.currentSubPool, "reason", deleteAndContinueReasonLog) } // del from sub-pool switch mt.currentSubPool { case PendingSubPool: - pending.Remove(mt) + p.pending.Remove(mt, deleteAndContinueReasonLog, p.logger) case BaseFeeSubPool: - baseFee.Remove(mt) + p.baseFee.Remove(mt, deleteAndContinueReasonLog, p.logger) case QueuedSubPool: - queued.Remove(mt) + p.queued.Remove(mt, deleteAndContinueReasonLog, p.logger) default: //already removed } @@ -1605,60 +1609,62 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint } if mt.Tx.Traced { - logger.Info(fmt.Sprintf("TX TRACING: onSenderStateChange loop iteration idHash=%x senderId=%d subPool=%b", mt.Tx.IDHash, mt.Tx.SenderID, mt.subPool)) + logger.Info("TX TRACING: onSenderStateChange loop iteration update", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "senderId", mt.Tx.SenderID, "nonce", mt.Tx.Nonce, "subPool", mt.currentSubPool) } // Some fields of mt might have changed, need to fix the invariants in the subpool best and worst queues switch mt.currentSubPool { case PendingSubPool: - pending.Updated(mt) + p.pending.Updated(mt) case BaseFeeSubPool: - baseFee.Updated(mt) + p.baseFee.Updated(mt) case QueuedSubPool: - queued.Updated(mt) + p.queued.Updated(mt) } return true }) + for _, mt := range toDel { - discard(mt, txpoolcfg.NonceTooLow) + p.discardLocked(mt, txpoolcfg.NonceTooLow) } + + logger.Trace("[txpool] onSenderStateChange", "sender", senderID, "count", p.all.count(senderID), "pending", p.pending.Len(), "baseFee", p.baseFee.Len(), "queued", p.queued.Len()) } // promote reasserts invariants of the subpool and returns the list of transactions that ended up // being promoted to the pending or basefee pool, for re-broadcasting -func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint64, pendingBlobFee uint64, discard func(*metaTx, txpoolcfg.DiscardReason), announcements *types.Announcements, - logger log.Logger) { +func (p *TxPool) promote(pendingBaseFee uint64, pendingBlobFee uint64, announcements *types.Announcements, logger log.Logger) { // Demote worst transactions that do not qualify for pending sub pool anymore, to other sub pools, or discard - for worst := pending.Worst(); pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.LtUint64(pendingBaseFee) || (worst.Tx.Type == types.BlobTxType && worst.Tx.BlobFeeCap.LtUint64(pendingBlobFee))); worst = pending.Worst() { + for worst := p.pending.Worst(); p.pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.LtUint64(pendingBaseFee) || (worst.Tx.Type == types.BlobTxType && worst.Tx.BlobFeeCap.LtUint64(pendingBlobFee))); worst = p.pending.Worst() { if worst.subPool >= BaseFeePoolBits { - tx := pending.PopWorst() + tx := p.pending.PopWorst() announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:]) - baseFee.Add(tx, logger) + p.baseFee.Add(tx, "demote-pending", logger) } else { - queued.Add(pending.PopWorst(), logger) + p.queued.Add(p.pending.PopWorst(), "demote-pending", logger) } } // Promote best transactions from base fee pool to pending pool while they qualify - for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.CmpUint64(pendingBaseFee) >= 0 && (best.Tx.Type != types.BlobTxType || best.Tx.BlobFeeCap.CmpUint64(pendingBlobFee) >= 0); best = baseFee.Best() { - tx := baseFee.PopBest() + for best := p.baseFee.Best(); p.baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.CmpUint64(pendingBaseFee) >= 0 && (best.Tx.Type != types.BlobTxType || best.Tx.BlobFeeCap.CmpUint64(pendingBlobFee) >= 0); best = p.baseFee.Best() { + tx := p.baseFee.PopBest() announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:]) - pending.Add(tx, logger) + p.pending.Add(tx, logger) } // Demote worst transactions that do not qualify for base fee pool anymore, to queued sub pool, or discard - for worst := baseFee.Worst(); baseFee.Len() > 0 && worst.subPool < BaseFeePoolBits; worst = baseFee.Worst() { - queued.Add(baseFee.PopWorst(), logger) + for worst := p.baseFee.Worst(); p.baseFee.Len() > 0 && worst.subPool < BaseFeePoolBits; worst = p.baseFee.Worst() { + p.queued.Add(p.baseFee.PopWorst(), "demote-base", logger) } // Promote best transactions from the queued pool to either pending or base fee pool, while they qualify - for best := queued.Best(); queued.Len() > 0 && best.subPool >= BaseFeePoolBits; best = queued.Best() { + for best := p.queued.Best(); p.queued.Len() > 0 && best.subPool >= BaseFeePoolBits; best = p.queued.Best() { if best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0 { - tx := queued.PopBest() + tx := p.queued.PopBest() announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:]) - pending.Add(tx, logger) + p.pending.Add(tx, logger) } else { - baseFee.Add(queued.PopBest(), logger) + p.baseFee.Add(p.queued.PopBest(), "promote-queued", logger) } } @@ -1666,18 +1672,18 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint // // Discard worst transactions from pending pool until it is within capacity limit - for pending.Len() > pending.limit { - discard(pending.PopWorst(), txpoolcfg.PendingPoolOverflow) + for p.pending.Len() > p.pending.limit { + p.discardLocked(p.pending.PopWorst(), txpoolcfg.PendingPoolOverflow) } // Discard worst transactions from pending sub pool until it is within capacity limits - for baseFee.Len() > baseFee.limit { - discard(baseFee.PopWorst(), txpoolcfg.BaseFeePoolOverflow) + for p.baseFee.Len() > p.baseFee.limit { + p.discardLocked(p.baseFee.PopWorst(), txpoolcfg.BaseFeePoolOverflow) } // Discard worst transactions from the queued sub pool until it is within its capacity limits - for _ = queued.Worst(); queued.Len() > queued.limit; _ = queued.Worst() { - discard(queued.PopWorst(), txpoolcfg.QueuedPoolOverflow) + for _ = p.queued.Worst(); p.queued.Len() > p.queued.limit; _ = p.queued.Worst() { + p.discardLocked(p.queued.PopWorst(), txpoolcfg.QueuedPoolOverflow) } } @@ -2091,8 +2097,8 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { if err != nil { return err } - if _, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs, - pendingBaseFee, pendingBlobFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, false, p.logger); err != nil { + if _, _, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs, + pendingBaseFee, pendingBlobFee, math.MaxUint64 /* blockGasLimit */, false, p.logger); err != nil { return err } p.pendingBaseFee.Store(pendingBaseFee) @@ -2288,6 +2294,11 @@ func (sc *sendersBatch) getID(addr common.Address) (uint64, bool) { } func (sc *sendersBatch) getOrCreateID(addr common.Address, logger log.Logger) (uint64, bool) { _, traced := sc.tracedSenders[addr] + + if !traced { + traced = TraceAll + } + id, ok := sc.senderIDs[addr] if !ok { sc.senderID++ @@ -2371,11 +2382,13 @@ func (b *BySenderAndNonce) nonce(senderID uint64) (nonce uint64, ok bool) { }) return nonce, ok } + func (b *BySenderAndNonce) ascendAll(f func(*metaTx) bool) { b.tree.Ascend(func(mt *metaTx) bool { return f(mt) }) } + func (b *BySenderAndNonce) ascend(senderID uint64, f func(*metaTx) bool) { s := b.search s.Tx.SenderID = senderID @@ -2387,6 +2400,7 @@ func (b *BySenderAndNonce) ascend(senderID uint64, f func(*metaTx) bool) { return f(mt) }) } + func (b *BySenderAndNonce) descend(senderID uint64, f func(*metaTx) bool) { s := b.search s.Tx.SenderID = senderID @@ -2398,12 +2412,15 @@ func (b *BySenderAndNonce) descend(senderID uint64, f func(*metaTx) bool) { return f(mt) }) } + func (b *BySenderAndNonce) count(senderID uint64) int { return b.senderIDTxnCount[senderID] } + func (b *BySenderAndNonce) blobCount(senderID uint64) uint64 { return b.senderIDBlobCount[senderID] } + func (b *BySenderAndNonce) hasTxs(senderID uint64) bool { has := false b.ascend(senderID, func(*metaTx) bool { @@ -2412,6 +2429,7 @@ func (b *BySenderAndNonce) hasTxs(senderID uint64) bool { }) return has } + func (b *BySenderAndNonce) get(senderID, txNonce uint64) *metaTx { s := b.search s.Tx.SenderID = senderID @@ -2426,8 +2444,13 @@ func (b *BySenderAndNonce) get(senderID, txNonce uint64) *metaTx { func (b *BySenderAndNonce) has(mt *metaTx) bool { return b.tree.Has(mt) } -func (b *BySenderAndNonce) delete(mt *metaTx) { + +func (b *BySenderAndNonce) delete(mt *metaTx, reason txpoolcfg.DiscardReason, logger log.Logger) { if _, ok := b.tree.Delete(mt); ok { + if mt.Tx.Traced { + logger.Info("TX TRACING: Deleted tx by nonce", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "sender", mt.Tx.SenderID, "nonce", mt.Tx.Nonce, "reason", reason) + } + senderID := mt.Tx.SenderID count := b.senderIDTxnCount[senderID] if count > 1 { @@ -2447,11 +2470,21 @@ func (b *BySenderAndNonce) delete(mt *metaTx) { } } } -func (b *BySenderAndNonce) replaceOrInsert(mt *metaTx) *metaTx { + +func (b *BySenderAndNonce) replaceOrInsert(mt *metaTx, logger log.Logger) *metaTx { it, ok := b.tree.ReplaceOrInsert(mt) + if ok { + if mt.Tx.Traced { + logger.Info("TX TRACING: Replaced tx by nonce", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "sender", mt.Tx.SenderID, "nonce", mt.Tx.Nonce) + } return it } + + if mt.Tx.Traced { + logger.Info("TX TRACING: Inserted tx by nonce", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "sender", mt.Tx.SenderID, "nonce", mt.Tx.Nonce) + } + b.senderIDTxnCount[mt.Tx.SenderID]++ if mt.Tx.Type == types.BlobTxType && mt.Tx.Blobs != nil { b.senderIDBlobCount[mt.Tx.SenderID] += uint64(len(mt.Tx.Blobs)) @@ -2530,7 +2563,10 @@ func (p *PendingPool) Updated(mt *metaTx) { } func (p *PendingPool) Len() int { return len(p.best.ms) } -func (p *PendingPool) Remove(i *metaTx) { +func (p *PendingPool) Remove(i *metaTx, reason string, logger log.Logger) { + if i.Tx.Traced { + logger.Info(fmt.Sprintf("TX TRACING: removed from subpool %s", p.t), "idHash", fmt.Sprintf("%x", i.Tx.IDHash), "sender", i.Tx.SenderID, "nonce", i.Tx.Nonce, "reason", reason) + } if i.worstIndex >= 0 { heap.Remove(p.worst, i.worstIndex) } @@ -2542,7 +2578,7 @@ func (p *PendingPool) Remove(i *metaTx) { func (p *PendingPool) Add(i *metaTx, logger log.Logger) { if i.Tx.Traced { - logger.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IDHash, i.Tx.SenderID)) + logger.Info(fmt.Sprintf("TX TRACING: added to subpool %s, IdHash=%x, sender=%d, nonce=%d", p.t, i.Tx.IDHash, i.Tx.SenderID, i.Tx.Nonce)) } i.currentSubPool = p.t heap.Push(p.worst, i) @@ -2595,16 +2631,19 @@ func (p *SubPool) PopWorst() *metaTx { //nolint return i } func (p *SubPool) Len() int { return p.best.Len() } -func (p *SubPool) Add(i *metaTx, logger log.Logger) { +func (p *SubPool) Add(i *metaTx, reason string, logger log.Logger) { if i.Tx.Traced { - logger.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IDHash, i.Tx.SenderID)) + logger.Info(fmt.Sprintf("TX TRACING: added to subpool %s", p.t), "idHash", fmt.Sprintf("%x", i.Tx.IDHash), "sender", i.Tx.SenderID, "nonce", i.Tx.Nonce, "reason", reason) } i.currentSubPool = p.t heap.Push(p.best, i) heap.Push(p.worst, i) } -func (p *SubPool) Remove(i *metaTx) { +func (p *SubPool) Remove(i *metaTx, reason string, logger log.Logger) { + if i.Tx.Traced { + logger.Info(fmt.Sprintf("TX TRACING: removed from subpool %s", p.t), "idHash", fmt.Sprintf("%x", i.Tx.IDHash), "sender", i.Tx.SenderID, "nonce", i.Tx.Nonce, "reason", reason) + } heap.Remove(p.best, i.bestIndex) heap.Remove(p.worst, i.worstIndex) i.currentSubPool = 0 diff --git a/eth/backend.go b/eth/backend.go index 105092b37..bc95f34f1 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1145,7 +1145,7 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient select { case stateChanges := <-stateChangeCh: block := stateChanges.BlockHeight - s.logger.Debug("Start mining new block based on previous block", "block", block) + s.logger.Debug("Start mining based on previous block", "block", block) // TODO - can do mining clean up here as we have previous // block info in the state channel hasWork = true @@ -1154,11 +1154,11 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient // Skip mining based on new tx notif for bor consensus hasWork = s.chainConfig.Bor == nil if hasWork { - s.logger.Debug("Start mining new block based on txpool notif") + s.logger.Debug("Start mining based on txpool notif") } case <-mineEvery.C: if !(working || waiting.Load()) { - s.logger.Debug("Start mining new block based on miner.recommit", "duration", miner.MiningConfig.Recommit) + s.logger.Debug("Start mining based on miner.recommit", "duration", miner.MiningConfig.Recommit) } hasWork = !(working || waiting.Load()) case err := <-errc: diff --git a/eth/stagedsync/stage_bor_heimdall.go b/eth/stagedsync/stage_bor_heimdall.go index 65cff4086..5c7bef88a 100644 --- a/eth/stagedsync/stage_bor_heimdall.go +++ b/eth/stagedsync/stage_bor_heimdall.go @@ -192,6 +192,7 @@ func BorHeimdallForward( defer logTimer.Stop() logger.Info("["+s.LogPrefix()+"] Processing sync events...", "from", lastBlockNum+1, "to", headNumber) + for blockNum = lastBlockNum + 1; blockNum <= headNumber; blockNum++ { select { default: