diff --git a/erigon-lib/txpool/pool.go b/erigon-lib/txpool/pool.go index 45dc6f115..e6e479815 100644 --- a/erigon-lib/txpool/pool.go +++ b/erigon-lib/txpool/pool.go @@ -74,6 +74,8 @@ var ( basefeeSubCounter = metrics.GetOrCreateGauge(`txpool_basefee`) ) +var TraceAll = false + // Pool is interface for the transaction pool // This interface exists for the convenience of testing, and not yet because // there are multiple implementations @@ -420,14 +422,14 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang return err } - if err = removeMined(p.all, minedTxs.Txs, p.pending, p.baseFee, p.queued, p.discardLocked, p.logger); err != nil { + if err = p.removeMined(p.all, minedTxs.Txs); err != nil { return err } var announcements types.Announcements - announcements, err = addTxsOnNewBlock(block, cacheView, stateChanges, p.senders, unwindTxs, /* newTxs */ - pendingBaseFee, stateChanges.BlockGasLimit, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, p.logger) + announcements, err = p.addTxsOnNewBlock(block, cacheView, stateChanges, p.senders, unwindTxs, /* newTxs */ + pendingBaseFee, stateChanges.BlockGasLimit, p.logger) if err != nil { return err @@ -436,7 +438,7 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang p.pending.EnforceWorstInvariants() p.baseFee.EnforceInvariants() p.queued.EnforceInvariants() - promote(p.pending, p.baseFee, p.queued, pendingBaseFee, pendingBlobFee, p.discardLocked, &announcements, p.logger) + p.promote(pendingBaseFee, pendingBlobFee, &announcements, p.logger) p.pending.EnforceBestInvariants() p.promoted.Reset() p.promoted.AppendOther(announcements) @@ -487,8 +489,8 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error { return err } - announcements, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, - p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger) + announcements, _, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, + p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), true, p.logger) if err != nil { return err } @@ -727,7 +729,7 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG txs.Resize(uint(count)) if len(toRemove) > 0 { for _, mt := range toRemove { - p.pending.Remove(mt) + p.pending.Remove(mt, "best", p.logger) } } return true, count, nil @@ -848,13 +850,13 @@ func (p *TxPool) validateTx(txn *types.TxSlot, isLocal bool, stateCache kvcache. } if !isLocal && uint64(p.all.count(txn.SenderID)) > p.cfg.AccountSlots { if txn.Traced { - log.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming idHash=%x slots=%d, limit=%d", txn.IDHash, p.all.count(txn.SenderID), p.cfg.AccountSlots)) + p.logger.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming idHash=%x slots=%d, limit=%d", txn.IDHash, p.all.count(txn.SenderID), p.cfg.AccountSlots)) } return txpoolcfg.Spammer } if !isLocal && p.all.blobCount(txn.SenderID) > p.cfg.BlobSlots { if txn.Traced { - log.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming (too many blobs) idHash=%x slots=%d, limit=%d", txn.IDHash, p.all.count(txn.SenderID), p.cfg.AccountSlots)) + p.logger.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming (too many blobs) idHash=%x slots=%d, limit=%d", txn.IDHash, p.all.count(txn.SenderID), p.cfg.AccountSlots)) } return txpoolcfg.Spammer } @@ -1076,7 +1078,19 @@ func (p *TxPool) punishSpammer(spammer uint64) { count-- return count > 0 }) + for _, mt := range txsToDelete { + switch mt.currentSubPool { + case PendingSubPool: + p.pending.Remove(mt, "punishSpammer", p.logger) + case BaseFeeSubPool: + p.baseFee.Remove(mt, "punishSpammer", p.logger) + case QueuedSubPool: + p.queued.Remove(mt, "punishSpammer", p.logger) + default: + //already removed + } + p.discardLocked(mt, txpoolcfg.Spammer) // can't call it while iterating by all } } @@ -1122,8 +1136,8 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, return nil, err } - announcements, addReasons, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, - p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, true, p.logger) + announcements, addReasons, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, + p.pendingBaseFee.Load(), p.pendingBlobFee.Load(), p.blockGasLimit.Load(), true, p.logger) if err == nil { for i, reason := range addReasons { if reason != txpoolcfg.NotSet { @@ -1159,11 +1173,9 @@ func (p *TxPool) coreDBWithCache() (kv.RoDB, kvcache.Cache) { defer p.lock.Unlock() return p._chainDB, p._stateCache } -func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, - newTxs types.TxSlots, pendingBaseFee, pendingBlobFee, blockGasLimit uint64, - pending *PendingPool, baseFee, queued *SubPool, - byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) txpoolcfg.DiscardReason, discard func(*metaTx, txpoolcfg.DiscardReason), collect bool, - logger log.Logger) (types.Announcements, []txpoolcfg.DiscardReason, error) { + +func (p *TxPool) addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, + newTxs types.TxSlots, pendingBaseFee, pendingBlobFee, blockGasLimit uint64, collect bool, logger log.Logger) (types.Announcements, []txpoolcfg.DiscardReason, error) { if assert.Enable { for _, txn := range newTxs.Txs { if txn.SenderID == 0 { @@ -1171,6 +1183,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, } } } + // This can be thought of a reverse operation from the one described before. // When a block that was deemed "the best" of its height, is no longer deemed "the best", the // transactions contained in it, are now viable for inclusion in other blocks, and therefore should @@ -1184,7 +1197,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, discardReasons := make([]txpoolcfg.DiscardReason, len(newTxs.Txs)) announcements := types.Announcements{} for i, txn := range newTxs.Txs { - if found, ok := byHash[string(txn.IDHash[:])]; ok { + if found, ok := p.byHash[string(txn.IDHash[:])]; ok { discardReasons[i] = txpoolcfg.DuplicateHash // In case if the transition is stuck, "poke" it to rebroadcast if collect && newTxs.IsLocal[i] && (found.currentSubPool == PendingSubPool || found.currentSubPool == BaseFeeSubPool) { @@ -1193,7 +1206,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, continue } mt := newMetaTx(txn, newTxs.IsLocal[i], blockNum) - if reason := add(mt, &announcements); reason != txpoolcfg.NotSet { + if reason := p.addLocked(mt, &announcements); reason != txpoolcfg.NotSet { discardReasons[i] = reason continue } @@ -1209,22 +1222,18 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, if err != nil { return announcements, discardReasons, err } - onSenderStateChange(senderID, nonce, balance, byNonce, - blockGasLimit, pending, baseFee, queued, discard, logger) + p.onSenderStateChange(senderID, nonce, balance, blockGasLimit, logger) } - promote(pending, baseFee, queued, pendingBaseFee, pendingBlobFee, discard, &announcements, logger) - pending.EnforceBestInvariants() + p.promote(pendingBaseFee, pendingBlobFee, &announcements, logger) + p.pending.EnforceBestInvariants() return announcements, discardReasons, nil } // TODO: Looks like a copy of the above -func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges *remote.StateChangeBatch, - senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee uint64, blockGasLimit uint64, - pending *PendingPool, baseFee, queued *SubPool, - byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx, *types.Announcements) txpoolcfg.DiscardReason, discard func(*metaTx, txpoolcfg.DiscardReason), - logger log.Logger) (types.Announcements, error) { +func (p *TxPool) addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges *remote.StateChangeBatch, + senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee uint64, blockGasLimit uint64, logger log.Logger) (types.Announcements, error) { if assert.Enable { for _, txn := range newTxs.Txs { if txn.SenderID == 0 { @@ -1244,12 +1253,12 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges sendersWithChangedState := map[uint64]struct{}{} announcements := types.Announcements{} for i, txn := range newTxs.Txs { - if _, ok := byHash[string(txn.IDHash[:])]; ok { + if _, ok := p.byHash[string(txn.IDHash[:])]; ok { continue } mt := newMetaTx(txn, newTxs.IsLocal[i], blockNum) - if reason := add(mt, &announcements); reason != txpoolcfg.NotSet { - discard(mt, reason) + if reason := p.addLocked(mt, &announcements); reason != txpoolcfg.NotSet { + p.discardLocked(mt, reason) continue } sendersWithChangedState[mt.Tx.SenderID] = struct{}{} @@ -1277,8 +1286,7 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges if err != nil { return announcements, err } - onSenderStateChange(senderID, nonce, balance, byNonce, - blockGasLimit, pending, baseFee, queued, discard, logger) + p.onSenderStateChange(senderID, nonce, balance, blockGasLimit, logger) } return announcements, nil @@ -1345,11 +1353,11 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo switch found.currentSubPool { case PendingSubPool: - p.pending.Remove(found) + p.pending.Remove(found, "add", p.logger) case BaseFeeSubPool: - p.baseFee.Remove(found) + p.baseFee.Remove(found, "add", p.logger) case QueuedSubPool: - p.queued.Remove(found) + p.queued.Remove(found, "add", p.logger) default: //already removed } @@ -1365,7 +1373,7 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo hashStr := string(mt.Tx.IDHash[:]) p.byHash[hashStr] = mt - if replaced := p.all.replaceOrInsert(mt); replaced != nil { + if replaced := p.all.replaceOrInsert(mt, p.logger); replaced != nil { if assert.Enable { panic("must never happen") } @@ -1375,7 +1383,7 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo p.isLocalLRU.Add(hashStr, struct{}{}) } // All transactions are first added to the queued pool and then immediately promoted from there if required - p.queued.Add(mt, p.logger) + p.queued.Add(mt, "addLocked", p.logger) // Remove from mined cache as we are now "resurrecting" it to a sub-pool p.deleteMinedBlobTxn(hashStr) return txpoolcfg.NotSet @@ -1387,7 +1395,7 @@ func (p *TxPool) discardLocked(mt *metaTx, reason txpoolcfg.DiscardReason) { hashStr := string(mt.Tx.IDHash[:]) delete(p.byHash, hashStr) p.deletedTxs = append(p.deletedTxs, mt) - p.all.delete(mt) + p.all.delete(mt, reason, p.logger) p.discardReasonsLRU.Add(hashStr, reason) } @@ -1451,7 +1459,7 @@ func (p *TxPool) NonceFromAddress(addr [20]byte) (nonce uint64, inPool bool) { // modify state_balance and state_nonce, potentially remove some elements (if transaction with some nonce is // included into a block), and finally, walk over the transaction records and update SubPool fields depending on // the actual presence of nonce gaps and what the balance is. -func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *PendingPool, baseFee, queued *SubPool, discard func(*metaTx, txpoolcfg.DiscardReason), logger log.Logger) error { +func (p *TxPool) removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot) error { noncesToRemove := map[uint64]uint64{} for _, txn := range minedTxs { nonce, ok := noncesToRemove[txn.SenderID] @@ -1468,18 +1476,17 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P queuedRemoved := 0 for senderID, nonce := range noncesToRemove { - byNonce.ascend(senderID, func(mt *metaTx) bool { if mt.Tx.Nonce > nonce { if mt.Tx.Traced { - logger.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", mt.Tx.Nonce, "sender.nonce", nonce) + p.logger.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", mt.Tx.Nonce, "sender.nonce", nonce) } return false } if mt.Tx.Traced { - logger.Info(fmt.Sprintf("TX TRACING: removeMined idHash=%x senderId=%d, currentSubPool=%s", mt.Tx.IDHash, mt.Tx.SenderID, mt.currentSubPool)) + p.logger.Info("TX TRACING: removeMined", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "senderId", mt.Tx.SenderID, "nonce", mt.Tx.Nonce, "currentSubPool", mt.currentSubPool) } toDel = append(toDel, mt) @@ -1487,13 +1494,13 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P switch mt.currentSubPool { case PendingSubPool: pendingRemoved++ - pending.Remove(mt) + p.pending.Remove(mt, "remove-mined", p.logger) case BaseFeeSubPool: baseFeeRemoved++ - baseFee.Remove(mt) + p.baseFee.Remove(mt, "remove-mined", p.logger) case QueuedSubPool: queuedRemoved++ - queued.Remove(mt) + p.queued.Remove(mt, "remove-mined", p.logger) default: //already removed } @@ -1503,13 +1510,13 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P discarded += len(toDel) for _, mt := range toDel { - discard(mt, txpoolcfg.Mined) + p.discardLocked(mt, txpoolcfg.Mined) } toDel = toDel[:0] } if discarded > 0 { - logger.Debug("Discarding Transactions", "count", discarded, "pending", pendingRemoved, "baseFee", baseFeeRemoved, "queued", queuedRemoved) + p.logger.Debug("Discarded transactions", "count", discarded, "pending", pendingRemoved, "baseFee", baseFeeRemoved, "queued", queuedRemoved) } return nil @@ -1519,17 +1526,14 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P // which sub pool they will need to go to. Since this depends on other transactions from the same sender by with lower // nonces, and also affect other transactions from the same sender with higher nonce, it loops through all transactions // for a given senderID -func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint256.Int, byNonce *BySenderAndNonce, - blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, discard func(*metaTx, txpoolcfg.DiscardReason), logger log.Logger) { +func (p *TxPool) onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint256.Int, blockGasLimit uint64, logger log.Logger) { noGapsNonce := senderNonce cumulativeRequiredBalance := uint256.NewInt(0) minFeeCap := uint256.NewInt(0).SetAllOne() minTip := uint64(math.MaxUint64) var toDel []*metaTx // can't delete items while iterate them - byNonce.ascend(senderID, func(mt *metaTx) bool { - if mt.Tx.Traced { - logger.Info(fmt.Sprintf("TX TRACING: onSenderStateChange loop iteration idHash=%x senderID=%d, senderNonce=%d, txn.nonce=%d, currentSubPool=%s", mt.Tx.IDHash, senderID, senderNonce, mt.Tx.Nonce, mt.currentSubPool)) - } + + p.all.ascend(senderID, func(mt *metaTx) bool { deleteAndContinueReasonLog := "" if senderNonce > mt.Tx.Nonce { deleteAndContinueReasonLog = "low nonce" @@ -1538,16 +1542,16 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint } if deleteAndContinueReasonLog != "" { if mt.Tx.Traced { - logger.Info(fmt.Sprintf("TX TRACING: removing due to %s for idHash=%x senderID=%d, senderNonce=%d, txn.nonce=%d, currentSubPool=%s", deleteAndContinueReasonLog, mt.Tx.IDHash, senderID, senderNonce, mt.Tx.Nonce, mt.currentSubPool)) + logger.Info("TX TRACING: onSenderStateChange loop iteration remove", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "senderID", senderID, "senderNonce", senderNonce, "txn.nonce", mt.Tx.Nonce, "currentSubPool", mt.currentSubPool, "reason", deleteAndContinueReasonLog) } // del from sub-pool switch mt.currentSubPool { case PendingSubPool: - pending.Remove(mt) + p.pending.Remove(mt, deleteAndContinueReasonLog, p.logger) case BaseFeeSubPool: - baseFee.Remove(mt) + p.baseFee.Remove(mt, deleteAndContinueReasonLog, p.logger) case QueuedSubPool: - queued.Remove(mt) + p.queued.Remove(mt, deleteAndContinueReasonLog, p.logger) default: //already removed } @@ -1605,60 +1609,62 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint } if mt.Tx.Traced { - logger.Info(fmt.Sprintf("TX TRACING: onSenderStateChange loop iteration idHash=%x senderId=%d subPool=%b", mt.Tx.IDHash, mt.Tx.SenderID, mt.subPool)) + logger.Info("TX TRACING: onSenderStateChange loop iteration update", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "senderId", mt.Tx.SenderID, "nonce", mt.Tx.Nonce, "subPool", mt.currentSubPool) } // Some fields of mt might have changed, need to fix the invariants in the subpool best and worst queues switch mt.currentSubPool { case PendingSubPool: - pending.Updated(mt) + p.pending.Updated(mt) case BaseFeeSubPool: - baseFee.Updated(mt) + p.baseFee.Updated(mt) case QueuedSubPool: - queued.Updated(mt) + p.queued.Updated(mt) } return true }) + for _, mt := range toDel { - discard(mt, txpoolcfg.NonceTooLow) + p.discardLocked(mt, txpoolcfg.NonceTooLow) } + + logger.Trace("[txpool] onSenderStateChange", "sender", senderID, "count", p.all.count(senderID), "pending", p.pending.Len(), "baseFee", p.baseFee.Len(), "queued", p.queued.Len()) } // promote reasserts invariants of the subpool and returns the list of transactions that ended up // being promoted to the pending or basefee pool, for re-broadcasting -func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint64, pendingBlobFee uint64, discard func(*metaTx, txpoolcfg.DiscardReason), announcements *types.Announcements, - logger log.Logger) { +func (p *TxPool) promote(pendingBaseFee uint64, pendingBlobFee uint64, announcements *types.Announcements, logger log.Logger) { // Demote worst transactions that do not qualify for pending sub pool anymore, to other sub pools, or discard - for worst := pending.Worst(); pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.LtUint64(pendingBaseFee) || (worst.Tx.Type == types.BlobTxType && worst.Tx.BlobFeeCap.LtUint64(pendingBlobFee))); worst = pending.Worst() { + for worst := p.pending.Worst(); p.pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.LtUint64(pendingBaseFee) || (worst.Tx.Type == types.BlobTxType && worst.Tx.BlobFeeCap.LtUint64(pendingBlobFee))); worst = p.pending.Worst() { if worst.subPool >= BaseFeePoolBits { - tx := pending.PopWorst() + tx := p.pending.PopWorst() announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:]) - baseFee.Add(tx, logger) + p.baseFee.Add(tx, "demote-pending", logger) } else { - queued.Add(pending.PopWorst(), logger) + p.queued.Add(p.pending.PopWorst(), "demote-pending", logger) } } // Promote best transactions from base fee pool to pending pool while they qualify - for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.CmpUint64(pendingBaseFee) >= 0 && (best.Tx.Type != types.BlobTxType || best.Tx.BlobFeeCap.CmpUint64(pendingBlobFee) >= 0); best = baseFee.Best() { - tx := baseFee.PopBest() + for best := p.baseFee.Best(); p.baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.CmpUint64(pendingBaseFee) >= 0 && (best.Tx.Type != types.BlobTxType || best.Tx.BlobFeeCap.CmpUint64(pendingBlobFee) >= 0); best = p.baseFee.Best() { + tx := p.baseFee.PopBest() announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:]) - pending.Add(tx, logger) + p.pending.Add(tx, logger) } // Demote worst transactions that do not qualify for base fee pool anymore, to queued sub pool, or discard - for worst := baseFee.Worst(); baseFee.Len() > 0 && worst.subPool < BaseFeePoolBits; worst = baseFee.Worst() { - queued.Add(baseFee.PopWorst(), logger) + for worst := p.baseFee.Worst(); p.baseFee.Len() > 0 && worst.subPool < BaseFeePoolBits; worst = p.baseFee.Worst() { + p.queued.Add(p.baseFee.PopWorst(), "demote-base", logger) } // Promote best transactions from the queued pool to either pending or base fee pool, while they qualify - for best := queued.Best(); queued.Len() > 0 && best.subPool >= BaseFeePoolBits; best = queued.Best() { + for best := p.queued.Best(); p.queued.Len() > 0 && best.subPool >= BaseFeePoolBits; best = p.queued.Best() { if best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0 { - tx := queued.PopBest() + tx := p.queued.PopBest() announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:]) - pending.Add(tx, logger) + p.pending.Add(tx, logger) } else { - baseFee.Add(queued.PopBest(), logger) + p.baseFee.Add(p.queued.PopBest(), "promote-queued", logger) } } @@ -1666,18 +1672,18 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint // // Discard worst transactions from pending pool until it is within capacity limit - for pending.Len() > pending.limit { - discard(pending.PopWorst(), txpoolcfg.PendingPoolOverflow) + for p.pending.Len() > p.pending.limit { + p.discardLocked(p.pending.PopWorst(), txpoolcfg.PendingPoolOverflow) } // Discard worst transactions from pending sub pool until it is within capacity limits - for baseFee.Len() > baseFee.limit { - discard(baseFee.PopWorst(), txpoolcfg.BaseFeePoolOverflow) + for p.baseFee.Len() > p.baseFee.limit { + p.discardLocked(p.baseFee.PopWorst(), txpoolcfg.BaseFeePoolOverflow) } // Discard worst transactions from the queued sub pool until it is within its capacity limits - for _ = queued.Worst(); queued.Len() > queued.limit; _ = queued.Worst() { - discard(queued.PopWorst(), txpoolcfg.QueuedPoolOverflow) + for _ = p.queued.Worst(); p.queued.Len() > p.queued.limit; _ = p.queued.Worst() { + p.discardLocked(p.queued.PopWorst(), txpoolcfg.QueuedPoolOverflow) } } @@ -2091,8 +2097,8 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { if err != nil { return err } - if _, _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs, - pendingBaseFee, pendingBlobFee, math.MaxUint64 /* blockGasLimit */, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked, false, p.logger); err != nil { + if _, _, err := p.addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs, + pendingBaseFee, pendingBlobFee, math.MaxUint64 /* blockGasLimit */, false, p.logger); err != nil { return err } p.pendingBaseFee.Store(pendingBaseFee) @@ -2288,6 +2294,11 @@ func (sc *sendersBatch) getID(addr common.Address) (uint64, bool) { } func (sc *sendersBatch) getOrCreateID(addr common.Address, logger log.Logger) (uint64, bool) { _, traced := sc.tracedSenders[addr] + + if !traced { + traced = TraceAll + } + id, ok := sc.senderIDs[addr] if !ok { sc.senderID++ @@ -2371,11 +2382,13 @@ func (b *BySenderAndNonce) nonce(senderID uint64) (nonce uint64, ok bool) { }) return nonce, ok } + func (b *BySenderAndNonce) ascendAll(f func(*metaTx) bool) { b.tree.Ascend(func(mt *metaTx) bool { return f(mt) }) } + func (b *BySenderAndNonce) ascend(senderID uint64, f func(*metaTx) bool) { s := b.search s.Tx.SenderID = senderID @@ -2387,6 +2400,7 @@ func (b *BySenderAndNonce) ascend(senderID uint64, f func(*metaTx) bool) { return f(mt) }) } + func (b *BySenderAndNonce) descend(senderID uint64, f func(*metaTx) bool) { s := b.search s.Tx.SenderID = senderID @@ -2398,12 +2412,15 @@ func (b *BySenderAndNonce) descend(senderID uint64, f func(*metaTx) bool) { return f(mt) }) } + func (b *BySenderAndNonce) count(senderID uint64) int { return b.senderIDTxnCount[senderID] } + func (b *BySenderAndNonce) blobCount(senderID uint64) uint64 { return b.senderIDBlobCount[senderID] } + func (b *BySenderAndNonce) hasTxs(senderID uint64) bool { has := false b.ascend(senderID, func(*metaTx) bool { @@ -2412,6 +2429,7 @@ func (b *BySenderAndNonce) hasTxs(senderID uint64) bool { }) return has } + func (b *BySenderAndNonce) get(senderID, txNonce uint64) *metaTx { s := b.search s.Tx.SenderID = senderID @@ -2426,8 +2444,13 @@ func (b *BySenderAndNonce) get(senderID, txNonce uint64) *metaTx { func (b *BySenderAndNonce) has(mt *metaTx) bool { return b.tree.Has(mt) } -func (b *BySenderAndNonce) delete(mt *metaTx) { + +func (b *BySenderAndNonce) delete(mt *metaTx, reason txpoolcfg.DiscardReason, logger log.Logger) { if _, ok := b.tree.Delete(mt); ok { + if mt.Tx.Traced { + logger.Info("TX TRACING: Deleted tx by nonce", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "sender", mt.Tx.SenderID, "nonce", mt.Tx.Nonce, "reason", reason) + } + senderID := mt.Tx.SenderID count := b.senderIDTxnCount[senderID] if count > 1 { @@ -2447,11 +2470,21 @@ func (b *BySenderAndNonce) delete(mt *metaTx) { } } } -func (b *BySenderAndNonce) replaceOrInsert(mt *metaTx) *metaTx { + +func (b *BySenderAndNonce) replaceOrInsert(mt *metaTx, logger log.Logger) *metaTx { it, ok := b.tree.ReplaceOrInsert(mt) + if ok { + if mt.Tx.Traced { + logger.Info("TX TRACING: Replaced tx by nonce", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "sender", mt.Tx.SenderID, "nonce", mt.Tx.Nonce) + } return it } + + if mt.Tx.Traced { + logger.Info("TX TRACING: Inserted tx by nonce", "idHash", fmt.Sprintf("%x", mt.Tx.IDHash), "sender", mt.Tx.SenderID, "nonce", mt.Tx.Nonce) + } + b.senderIDTxnCount[mt.Tx.SenderID]++ if mt.Tx.Type == types.BlobTxType && mt.Tx.Blobs != nil { b.senderIDBlobCount[mt.Tx.SenderID] += uint64(len(mt.Tx.Blobs)) @@ -2530,7 +2563,10 @@ func (p *PendingPool) Updated(mt *metaTx) { } func (p *PendingPool) Len() int { return len(p.best.ms) } -func (p *PendingPool) Remove(i *metaTx) { +func (p *PendingPool) Remove(i *metaTx, reason string, logger log.Logger) { + if i.Tx.Traced { + logger.Info(fmt.Sprintf("TX TRACING: removed from subpool %s", p.t), "idHash", fmt.Sprintf("%x", i.Tx.IDHash), "sender", i.Tx.SenderID, "nonce", i.Tx.Nonce, "reason", reason) + } if i.worstIndex >= 0 { heap.Remove(p.worst, i.worstIndex) } @@ -2542,7 +2578,7 @@ func (p *PendingPool) Remove(i *metaTx) { func (p *PendingPool) Add(i *metaTx, logger log.Logger) { if i.Tx.Traced { - logger.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IDHash, i.Tx.SenderID)) + logger.Info(fmt.Sprintf("TX TRACING: added to subpool %s, IdHash=%x, sender=%d, nonce=%d", p.t, i.Tx.IDHash, i.Tx.SenderID, i.Tx.Nonce)) } i.currentSubPool = p.t heap.Push(p.worst, i) @@ -2595,16 +2631,19 @@ func (p *SubPool) PopWorst() *metaTx { //nolint return i } func (p *SubPool) Len() int { return p.best.Len() } -func (p *SubPool) Add(i *metaTx, logger log.Logger) { +func (p *SubPool) Add(i *metaTx, reason string, logger log.Logger) { if i.Tx.Traced { - logger.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IDHash, i.Tx.SenderID)) + logger.Info(fmt.Sprintf("TX TRACING: added to subpool %s", p.t), "idHash", fmt.Sprintf("%x", i.Tx.IDHash), "sender", i.Tx.SenderID, "nonce", i.Tx.Nonce, "reason", reason) } i.currentSubPool = p.t heap.Push(p.best, i) heap.Push(p.worst, i) } -func (p *SubPool) Remove(i *metaTx) { +func (p *SubPool) Remove(i *metaTx, reason string, logger log.Logger) { + if i.Tx.Traced { + logger.Info(fmt.Sprintf("TX TRACING: removed from subpool %s", p.t), "idHash", fmt.Sprintf("%x", i.Tx.IDHash), "sender", i.Tx.SenderID, "nonce", i.Tx.Nonce, "reason", reason) + } heap.Remove(p.best, i.bestIndex) heap.Remove(p.worst, i.worstIndex) i.currentSubPool = 0 diff --git a/eth/backend.go b/eth/backend.go index 105092b37..bc95f34f1 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1145,7 +1145,7 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient select { case stateChanges := <-stateChangeCh: block := stateChanges.BlockHeight - s.logger.Debug("Start mining new block based on previous block", "block", block) + s.logger.Debug("Start mining based on previous block", "block", block) // TODO - can do mining clean up here as we have previous // block info in the state channel hasWork = true @@ -1154,11 +1154,11 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient // Skip mining based on new tx notif for bor consensus hasWork = s.chainConfig.Bor == nil if hasWork { - s.logger.Debug("Start mining new block based on txpool notif") + s.logger.Debug("Start mining based on txpool notif") } case <-mineEvery.C: if !(working || waiting.Load()) { - s.logger.Debug("Start mining new block based on miner.recommit", "duration", miner.MiningConfig.Recommit) + s.logger.Debug("Start mining based on miner.recommit", "duration", miner.MiningConfig.Recommit) } hasWork = !(working || waiting.Load()) case err := <-errc: diff --git a/eth/stagedsync/stage_bor_heimdall.go b/eth/stagedsync/stage_bor_heimdall.go index 65cff4086..5c7bef88a 100644 --- a/eth/stagedsync/stage_bor_heimdall.go +++ b/eth/stagedsync/stage_bor_heimdall.go @@ -192,6 +192,7 @@ func BorHeimdallForward( defer logTimer.Stop() logger.Info("["+s.LogPrefix()+"] Processing sync events...", "from", lastBlockNum+1, "to", headNumber) + for blockNum = lastBlockNum + 1; blockNum <= headNumber; blockNum++ { select { default: