erigon-pulse/core/tx_pool.go
Péter Szilágyi 34ec9913f6
core: test locals support in txpool queue limits, fix
The commit reworks the transaction pool queue limitation tests
to cater for testing local accounts, also testing the nolocal flag.

In addition, it also fixes a panic if local transactions exceeded
the global queue allowance (no accounts left to drop from) and also
fixes queue eviction to operate on all accounts, not just the one
being updated.
2017-07-06 11:51:59 +03:00

993 lines
33 KiB
Go

// Copyright 2014 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"errors"
"fmt"
"math/big"
"sort"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
// ErrInvalidSender is returned if the transaction contains an invalid signature.
ErrInvalidSender = errors.New("invalid sender")
// ErrNonceTooLow is returned if the nonce of a transaction is lower than the
// one present in the local chain.
ErrNonceTooLow = errors.New("nonce too low")
// ErrUnderpriced is returned if a transaction's gas price is below the minimum
// configured for the transaction pool.
ErrUnderpriced = errors.New("transaction underpriced")
// ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
// with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
// ErrInsufficientFunds is returned if the total cost of executing a transaction
// is higher than the balance of the user's account.
ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value")
// ErrIntrinsicGas is returned if the transaction is specified to use less gas
// than required to start the invocation.
ErrIntrinsicGas = errors.New("intrinsic gas too low")
// ErrGasLimit is returned if a transaction's requested gas limit exceeds the
// maximum allowance of the current block.
ErrGasLimit = errors.New("exceeds block gas limit")
// ErrNegativeValue is a sanity error to ensure noone is able to specify a
// transaction with a negative value.
ErrNegativeValue = errors.New("negative value")
// ErrOversizedData is returned if the input data of a transaction is greater
// than some meaningful limit a user might use. This is not a consensus error
// making the transaction invalid, rather a DOS protection.
ErrOversizedData = errors.New("oversized data")
)
var (
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
)
var (
// Metrics for the pending pool
pendingDiscardCounter = metrics.NewCounter("txpool/pending/discard")
pendingReplaceCounter = metrics.NewCounter("txpool/pending/replace")
pendingRateLimitCounter = metrics.NewCounter("txpool/pending/ratelimit") // Dropped due to rate limiting
pendingNofundsCounter = metrics.NewCounter("txpool/pending/nofunds") // Dropped due to out-of-funds
// Metrics for the queued pool
queuedDiscardCounter = metrics.NewCounter("txpool/queued/discard")
queuedReplaceCounter = metrics.NewCounter("txpool/queued/replace")
queuedRateLimitCounter = metrics.NewCounter("txpool/queued/ratelimit") // Dropped due to rate limiting
queuedNofundsCounter = metrics.NewCounter("txpool/queued/nofunds") // Dropped due to out-of-funds
// General tx metrics
invalidTxCounter = metrics.NewCounter("txpool/invalid")
underpricedTxCounter = metrics.NewCounter("txpool/underpriced")
)
type stateFn func() (*state.StateDB, error)
// TxPoolConfig are the configuration parameters of the transaction pool.
type TxPoolConfig struct {
NoLocals bool // Whether local transaction handling should be disabled
PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)
AccountSlots uint64 // Minimum number of executable transaction slots guaranteed per account
GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
}
// DefaultTxPoolConfig contains the default configurations for the transaction
// pool.
var DefaultTxPoolConfig = TxPoolConfig{
PriceLimit: 1,
PriceBump: 10,
AccountSlots: 16,
GlobalSlots: 4096,
AccountQueue: 64,
GlobalQueue: 1024,
Lifetime: 3 * time.Hour,
}
// sanitize checks the provided user configurations and changes anything that's
// unreasonable or unworkable.
func (config *TxPoolConfig) sanitize() TxPoolConfig {
conf := *config
if conf.PriceLimit < 1 {
log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit)
conf.PriceLimit = DefaultTxPoolConfig.PriceLimit
}
if conf.PriceBump < 1 {
log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultTxPoolConfig.PriceBump)
conf.PriceBump = DefaultTxPoolConfig.PriceBump
}
return conf
}
// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
//
// The pool separates processable transactions (which can be applied to the
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
currentState stateFn // The state function which will allow us to do some pre checks
pendingState *state.ManagedState
gasLimit func() *big.Int // The current gas limit function callback
gasPrice *big.Int
eventMux *event.TypeMux
events *event.TypeMuxSubscription
locals *accountSet
signer types.Signer
mu sync.RWMutex
pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
all map[common.Hash]*types.Transaction // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price
wg sync.WaitGroup // for shutdown sync
quit chan struct{}
homestead bool
}
// NewTxPool creates a new transaction pool to gather, sort and filter inbound
// trnsactions from the network.
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()
// Create the transaction pool with its initial settings
pool := &TxPool{
config: config,
chainconfig: chainconfig,
signer: types.NewEIP155Signer(chainconfig.ChainId),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: make(map[common.Hash]*types.Transaction),
eventMux: eventMux,
currentState: currentStateFn,
gasLimit: gasLimitFn,
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
pendingState: nil,
events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}),
quit: make(chan struct{}),
}
pool.locals = newAccountSet(pool.signer)
pool.priced = newTxPricedList(&pool.all)
pool.resetState()
// Start the various events loops and return
pool.wg.Add(2)
go pool.eventLoop()
go pool.expirationLoop()
return pool
}
func (pool *TxPool) eventLoop() {
defer pool.wg.Done()
// Start a ticker and keep track of interesting pool stats to report
var prevPending, prevQueued, prevStales int
report := time.NewTicker(statsReportInterval)
defer report.Stop()
// Track chain events. When a chain events occurs (new chain canon block)
// we need to know the new state. The new state will help us determine
// the nonces in the managed state
for {
select {
// Handle any events fired by the system
case ev, ok := <-pool.events.Chan():
if !ok {
return
}
switch ev := ev.Data.(type) {
case ChainHeadEvent:
pool.mu.Lock()
if ev.Block != nil {
if pool.chainconfig.IsHomestead(ev.Block.Number()) {
pool.homestead = true
}
}
pool.resetState()
pool.mu.Unlock()
case RemovedTransactionEvent:
pool.addTxs(ev.Txs, false)
}
// Handle stats reporting ticks
case <-report.C:
pool.mu.RLock()
pending, queued := pool.stats()
stales := pool.priced.stales
pool.mu.RUnlock()
if pending != prevPending || queued != prevQueued || stales != prevStales {
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
prevPending, prevQueued, prevStales = pending, queued, stales
}
}
}
}
func (pool *TxPool) resetState() {
currentState, err := pool.currentState()
if err != nil {
log.Error("Failed reset txpool state", "err", err)
return
}
pool.pendingState = state.ManageState(currentState)
// validate the pool of pending transactions, this will remove
// any transactions that have been included in the block or
// have been invalidated because of another transaction (e.g.
// higher gas price)
pool.demoteUnexecutables(currentState)
// Update all accounts to the latest known pending nonce
for addr, list := range pool.pending {
txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
}
// Check the queue and move transactions over to the pending if possible
// or remove those that have become invalid
pool.promoteExecutables(currentState, nil)
}
// Stop terminates the transaction pool.
func (pool *TxPool) Stop() {
pool.events.Unsubscribe()
close(pool.quit)
pool.wg.Wait()
log.Info("Transaction pool stopped")
}
// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
defer pool.mu.RUnlock()
return new(big.Int).Set(pool.gasPrice)
}
// SetGasPrice updates the minimum price required by the transaction pool for a
// new transaction, and drops all transactions below this threshold.
func (pool *TxPool) SetGasPrice(price *big.Int) {
pool.mu.Lock()
defer pool.mu.Unlock()
pool.gasPrice = price
for _, tx := range pool.priced.Cap(price, pool.locals) {
pool.removeTx(tx.Hash())
}
log.Info("Transaction pool price threshold updated", "price", price)
}
// State returns the virtual managed state of the transaction pool.
func (pool *TxPool) State() *state.ManagedState {
pool.mu.RLock()
defer pool.mu.RUnlock()
return pool.pendingState
}
// Stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
func (pool *TxPool) Stats() (int, int) {
pool.mu.RLock()
defer pool.mu.RUnlock()
return pool.stats()
}
// stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
func (pool *TxPool) stats() (int, int) {
pending := 0
for _, list := range pool.pending {
pending += list.Len()
}
queued := 0
for _, list := range pool.queue {
queued += list.Len()
}
return pending, queued
}
// Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and sorted by nonce.
func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
pool.mu.RLock()
defer pool.mu.RUnlock()
pending := make(map[common.Address]types.Transactions)
for addr, list := range pool.pending {
pending[addr] = list.Flatten()
}
queued := make(map[common.Address]types.Transactions)
for addr, list := range pool.queue {
queued[addr] = list.Flatten()
}
return pending, queued
}
// Pending retrieves all currently processable transactions, groupped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
pool.mu.Lock()
defer pool.mu.Unlock()
pending := make(map[common.Address]types.Transactions)
for addr, list := range pool.pending {
pending[addr] = list.Flatten()
}
return pending, nil
}
// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Heuristic limit, reject transactions over 32KB to prevent DOS attacks
if tx.Size() > 32*1024 {
return ErrOversizedData
}
// Transactions can't be negative. This may never happen using RLP decoded
// transactions but may occur if you create a transaction using the RPC.
if tx.Value().Sign() < 0 {
return ErrNegativeValue
}
// Ensure the transaction doesn't exceed the current block limit gas.
if pool.gasLimit().Cmp(tx.Gas()) < 0 {
return ErrGasLimit
}
// Make sure the transaction is signed properly
from, err := types.Sender(pool.signer, tx)
if err != nil {
return ErrInvalidSender
}
// Drop non-local transactions under our own minimal accepted gas price
local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
return ErrUnderpriced
}
// Ensure the transaction adheres to nonce ordering
currentState, err := pool.currentState()
if err != nil {
return err
}
if currentState.GetNonce(from) > tx.Nonce() {
return ErrNonceTooLow
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}
intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
if tx.Gas().Cmp(intrGas) < 0 {
return ErrIntrinsicGas
}
return nil
}
// add validates a transaction and inserts it into the non-executable queue for
// later pending promotion and execution. If the transaction is a replacement for
// an already pending or queued one, it overwrites the previous and returns this
// so outer code doesn't uselessly call promote.
//
// If a newly added transaction is marked as local, its sending account will be
// whitelisted, preventing any associated transaction from being dropped out of
// the pool due to pricing constraints.
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// If the transaction is already known, discard it
hash := tx.Hash()
if pool.all[hash] != nil {
log.Trace("Discarding already known transaction", "hash", hash)
return false, fmt.Errorf("known transaction: %x", hash)
}
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxCounter.Inc(1)
return false, err
}
// If the transaction pool is full, discard underpriced transactions
if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
if pool.priced.Underpriced(tx, pool.locals) {
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
return false, ErrUnderpriced
}
// New transaction is better than our worse ones, make room for it
drop := pool.priced.Discard(len(pool.all)-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
pool.removeTx(tx.Hash())
}
}
// If the transaction is replacing an already pending one, do directly
from, _ := types.Sender(pool.signer, tx) // already validated
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
pendingDiscardCounter.Inc(1)
return false, ErrReplaceUnderpriced
}
// New transaction is better, replace old one
if old != nil {
delete(pool.all, old.Hash())
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
pool.all[tx.Hash()] = tx
pool.priced.Put(tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
return old != nil, nil
}
// New transaction isn't replacing a pending one, push into queue and potentially mark local
replace, err := pool.enqueueTx(hash, tx)
if err != nil {
return false, err
}
if local {
pool.locals.add(from)
}
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replace, nil
}
// enqueueTx inserts a new transaction into the non-executable transaction queue.
//
// Note, this method assumes the pool lock is held!
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
// Try to insert the transaction into the future queue
from, _ := types.Sender(pool.signer, tx) // already validated
if pool.queue[from] == nil {
pool.queue[from] = newTxList(false)
}
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
queuedDiscardCounter.Inc(1)
return false, ErrReplaceUnderpriced
}
// Discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
pool.priced.Removed()
queuedReplaceCounter.Inc(1)
}
pool.all[hash] = tx
pool.priced.Put(tx)
return old != nil, nil
}
// promoteTx adds a transaction to the pending (processable) list of transactions.
//
// Note, this method assumes the pool lock is held!
func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) {
// Try to insert the transaction into the pending queue
if pool.pending[addr] == nil {
pool.pending[addr] = newTxList(true)
}
list := pool.pending[addr]
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
delete(pool.all, hash)
pool.priced.Removed()
pendingDiscardCounter.Inc(1)
return
}
// Otherwise discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
// Failsafe to work around direct pending inserts (tests)
if pool.all[hash] == nil {
pool.all[hash] = tx
pool.priced.Put(tx)
}
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
go pool.eventMux.Post(TxPreEvent{tx})
}
// AddLocal enqueues a single transaction into the pool if it is valid, marking
// the sender as a local one in the mean time, ensuring it goes around the local
// pricing constraints.
func (pool *TxPool) AddLocal(tx *types.Transaction) error {
return pool.addTx(tx, !pool.config.NoLocals)
}
// AddRemote enqueues a single transaction into the pool if it is valid. If the
// sender is not among the locally tracked ones, full pricing constraints will
// apply.
func (pool *TxPool) AddRemote(tx *types.Transaction) error {
return pool.addTx(tx, false)
}
// AddLocals enqueues a batch of transactions into the pool if they are valid,
// marking the senders as a local ones in the mean time, ensuring they go around
// the local pricing constraints.
func (pool *TxPool) AddLocals(txs []*types.Transaction) error {
return pool.addTxs(txs, !pool.config.NoLocals)
}
// AddRemotes enqueues a batch of transactions into the pool if they are valid.
// If the senders are not among the locally tracked ones, full pricing constraints
// will apply.
func (pool *TxPool) AddRemotes(txs []*types.Transaction) error {
return pool.addTxs(txs, false)
}
// addTx enqueues a single transaction into the pool if it is valid.
func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
pool.mu.Lock()
defer pool.mu.Unlock()
// Try to inject the transaction and update any state
replace, err := pool.add(tx, local)
if err != nil {
return err
}
// If we added a new transaction, run promotion checks and return
if !replace {
state, err := pool.currentState()
if err != nil {
return err
}
from, _ := types.Sender(pool.signer, tx) // already validated
pool.promoteExecutables(state, []common.Address{from})
}
return nil
}
// addTxs attempts to queue a batch of transactions if they are valid.
func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error {
pool.mu.Lock()
defer pool.mu.Unlock()
// Add the batch of transaction, tracking the accepted ones
dirty := make(map[common.Address]struct{})
for _, tx := range txs {
if replace, err := pool.add(tx, local); err == nil {
if !replace {
from, _ := types.Sender(pool.signer, tx) // already validated
dirty[from] = struct{}{}
}
}
}
// Only reprocess the internal state if something was actually added
if len(dirty) > 0 {
state, err := pool.currentState()
if err != nil {
return err
}
addrs := make([]common.Address, 0, len(dirty))
for addr, _ := range dirty {
addrs = append(addrs, addr)
}
pool.promoteExecutables(state, addrs)
}
return nil
}
// Get returns a transaction if it is contained in the pool
// and nil otherwise.
func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
pool.mu.RLock()
defer pool.mu.RUnlock()
return pool.all[hash]
}
// Remove removes the transaction with the given hash from the pool.
func (pool *TxPool) Remove(hash common.Hash) {
pool.mu.Lock()
defer pool.mu.Unlock()
pool.removeTx(hash)
}
// RemoveBatch removes all given transactions from the pool.
func (pool *TxPool) RemoveBatch(txs types.Transactions) {
pool.mu.Lock()
defer pool.mu.Unlock()
for _, tx := range txs {
pool.removeTx(tx.Hash())
}
}
// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash) {
// Fetch the transaction we wish to delete
tx, ok := pool.all[hash]
if !ok {
return
}
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
// Remove it from the list of known transactions
delete(pool.all, hash)
pool.priced.Removed()
// Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil {
if removed, invalids := pending.Remove(tx); removed {
// If no more transactions are left, remove the list
if pending.Empty() {
delete(pool.pending, addr)
delete(pool.beats, addr)
} else {
// Otherwise postpone any invalidated transactions
for _, tx := range invalids {
pool.enqueueTx(tx.Hash(), tx)
}
}
// Update the account nonce if needed
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
pool.pendingState.SetNonce(addr, nonce)
}
return
}
}
// Transaction is in the future queue
if future := pool.queue[addr]; future != nil {
future.Remove(tx)
if future.Empty() {
delete(pool.queue, addr)
}
}
}
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) {
gaslimit := pool.gasLimit()
// Gather all the accounts potentially needing updates
if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue))
for addr, _ := range pool.queue {
accounts = append(accounts, addr)
}
}
// Iterate over all accounts and promote any executable transactions
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(state.GetNonce(addr)) {
hash := tx.Hash()
log.Trace("Removed old queued transaction", "hash", hash)
delete(pool.all, hash)
pool.priced.Removed()
}
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(state.GetBalance(addr), gaslimit)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable queued transaction", "hash", hash)
delete(pool.all, hash)
pool.priced.Removed()
queuedNofundsCounter.Inc(1)
}
// Gather all executable transactions and promote them
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
hash := tx.Hash()
log.Trace("Promoting queued transaction", "hash", hash)
pool.promoteTx(addr, hash, tx)
}
// Drop all transactions over the allowed limit
if !pool.locals.contains(addr) {
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
hash := tx.Hash()
delete(pool.all, hash)
pool.priced.Removed()
queuedRateLimitCounter.Inc(1)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.queue, addr)
}
}
// If the pending limit is overflown, start equalizing allowances
pending := uint64(0)
for _, list := range pool.pending {
pending += uint64(list.Len())
}
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New()
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, float32(list.Len()))
}
}
// Gradually drop transactions from offenders
offenders := []common.Address{}
for pending > pool.config.GlobalSlots && !spammers.Empty() {
// Retrieve the next offender if not local address
offender, _ := spammers.Pop()
offenders = append(offenders, offender.(common.Address))
// Equalize balances until all the same or below threshold
if len(offenders) > 1 {
// Calculate the equalization threshold for all current offenders
threshold := pool.pending[offender.(common.Address)].Len()
// Iteratively reduce all offenders until below limit or threshold reached
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
for i := 0; i < len(offenders)-1; i++ {
list := pool.pending[offenders[i]]
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
delete(pool.all, hash)
pool.priced.Removed()
// Update the account nonce to the dropped transaction
if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
pool.pendingState.SetNonce(offenders[i], nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pending--
}
}
}
}
// If still above threshold, reduce to limit or min allowance
if pending > pool.config.GlobalSlots && len(offenders) > 0 {
for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
for _, addr := range offenders {
list := pool.pending[addr]
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
delete(pool.all, hash)
pool.priced.Removed()
// Update the account nonce to the dropped transaction
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
pool.pendingState.SetNonce(addr, nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pending--
}
}
}
pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
}
// If we've queued more transactions than the hard limit, drop oldest ones
queued := uint64(0)
for _, list := range pool.queue {
queued += uint64(list.Len())
}
if queued > pool.config.GlobalQueue {
// Sort all accounts with queued transactions by heartbeat
addresses := make(addresssByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
}
sort.Sort(addresses)
// Drop transactions until the total is below the limit or only locals remain
for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
addr := addresses[len(addresses)-1]
list := pool.queue[addr.address]
addresses = addresses[:len(addresses)-1]
// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
pool.removeTx(tx.Hash())
}
drop -= size
queuedRateLimitCounter.Inc(int64(size))
continue
}
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash())
drop--
queuedRateLimitCounter.Inc(1)
}
}
}
}
// demoteUnexecutables removes invalid and processed transactions from the pools
// executable/pending queue and any subsequent transactions that become unexecutable
// are moved back into the future queue.
func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
gaslimit := pool.gasLimit()
// Iterate over all accounts and demote any non-executable transactions
for addr, list := range pool.pending {
nonce := state.GetNonce(addr)
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(nonce) {
hash := tx.Hash()
log.Trace("Removed old pending transaction", "hash", hash)
delete(pool.all, hash)
pool.priced.Removed()
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(state.GetBalance(addr), gaslimit)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
delete(pool.all, hash)
pool.priced.Removed()
pendingNofundsCounter.Inc(1)
}
for _, tx := range invalids {
hash := tx.Hash()
log.Trace("Demoting pending transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.pending, addr)
delete(pool.beats, addr)
}
}
}
// expirationLoop is a loop that periodically iterates over all accounts with
// queued transactions and drop all that have been inactive for a prolonged amount
// of time.
func (pool *TxPool) expirationLoop() {
defer pool.wg.Done()
evict := time.NewTicker(evictionInterval)
defer evict.Stop()
for {
select {
case <-evict.C:
pool.mu.Lock()
for addr := range pool.queue {
// Skip local transactions from the eviction mechanism
if pool.locals.contains(addr) {
continue
}
// Any non-locals old enough should be removed
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
for _, tx := range pool.queue[addr].Flatten() {
pool.removeTx(tx.Hash())
}
}
}
pool.mu.Unlock()
case <-pool.quit:
return
}
}
}
// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
address common.Address
heartbeat time.Time
}
type addresssByHeartbeat []addressByHeartbeat
func (a addresssByHeartbeat) Len() int { return len(a) }
func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
func (a addresssByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// accountSet is simply a set of addresses to check for existance, and a signer
// capable of deriving addresses from transactions.
type accountSet struct {
accounts map[common.Address]struct{}
signer types.Signer
}
// newAccountSet creates a new address set with an associated signer for sender
// derivations.
func newAccountSet(signer types.Signer) *accountSet {
return &accountSet{
accounts: make(map[common.Address]struct{}),
signer: signer,
}
}
// contains checks if a given address is contained within the set.
func (as *accountSet) contains(addr common.Address) bool {
_, exist := as.accounts[addr]
return exist
}
// containsTx checks if the sender of a given tx is within the set. If the sender
// cannot be derived, this method returns false.
func (as *accountSet) containsTx(tx *types.Transaction) bool {
if addr, err := types.Sender(as.signer, tx); err == nil {
return as.contains(addr)
}
return false
}
// add inserts a new address into the set to track.
func (as *accountSet) add(addr common.Address) {
as.accounts[addr] = struct{}{}
}