diff --git a/txpool/pool.go b/txpool/pool.go index 73a6b342a..2699adb66 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -73,9 +73,10 @@ type Config struct { BaseFeeSubPoolLimit int QueuedSubPoolLimit int - MinFeeCap uint64 - AccountSlots uint64 // Number of executable transaction slots guaranteed per account - PriceBump uint64 // Price bump percentage to replace an already existing transaction + MinFeeCap uint64 + AccountSlots uint64 // Number of executable transaction slots guaranteed per account + PriceBump uint64 // Price bump percentage to replace an already existing transaction + TracedSenders []string // List of senders for which tx pool should print out debugging info } var DefaultConfig = Config{ @@ -316,7 +317,10 @@ func New(newTxs chan Hashes, coreDB kv.RoDB, cfg Config, cache kvcache.Cache, ch search: sortByNonce{&metaTx{Tx: &TxSlot{}}}, senderIDTxnCount: map[uint64]int{}, } - + tracedSenders := make(map[string]struct{}) + for _, sender := range cfg.TracedSenders { + tracedSenders[sender] = struct{}{} + } return &TxPool{ lock: &sync.RWMutex{}, byHash: map[string]*metaTx{}, @@ -329,7 +333,7 @@ func New(newTxs chan Hashes, coreDB kv.RoDB, cfg Config, cache kvcache.Cache, ch queued: NewSubPool(QueuedSubPool, cfg.QueuedSubPoolLimit), newPendingTxs: newTxs, _stateCache: cache, - senders: newSendersCache(), + senders: newSendersCache(tracedSenders), _chainDB: coreDB, cfg: cfg, chainID: chainID, @@ -600,22 +604,40 @@ func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs TxSlots) { func (p *TxPool) validateTx(txn *TxSlot, isLocal bool, stateCache kvcache.CacheView) DiscardReason { // Drop non-local transactions under our own minimal accepted gas price or tip if !isLocal && txn.feeCap < p.cfg.MinFeeCap { + if txn.traced { + log.Info(fmt.Sprintf("TX TRACING: validateTx underpriced idHash=%x local=%t, feeCap=%d, cfg.MinFeeCap=%d", txn.IdHash, isLocal, txn.feeCap, p.cfg.MinFeeCap)) + } return UnderPriced } gas, reason := CalcIntrinsicGas(uint64(txn.dataLen), uint64(txn.dataNonZeroLen), nil, txn.creation, true, true) + if txn.traced { + log.Info(fmt.Sprintf("TX TRACING: validateTx intrinsic gas idHash=%x gas=%d", txn.IdHash, gas)) + } if reason != Success { + if txn.traced { + log.Info(fmt.Sprintf("TX TRACING: validateTx intrinsic gas calculated failed idHash=%x reason=%s", txn.IdHash, reason)) + } return reason } if gas > txn.gas { + if txn.traced { + log.Info(fmt.Sprintf("TX TRACING: validateTx intrinsic gas > txn.gas idHash=%x gas=%d, txn.gas=%d", txn.IdHash, gas, txn.gas)) + } return IntrinsicGas } if uint64(p.all.count(txn.senderID)) > p.cfg.AccountSlots { + if txn.traced { + log.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming idHash=%x slots=%d, limit=%d", txn.IdHash, p.all.count(txn.senderID), p.cfg.AccountSlots)) + } return Spammer } // check nonce and balance senderNonce, senderBalance, _ := p.senders.info(stateCache, txn.senderID) if senderNonce > txn.nonce { + if txn.traced { + log.Info(fmt.Sprintf("TX TRACING: validateTx nonce too low idHash=%x nonce in state=%d, txn.nonce=%d", txn.IdHash, senderNonce, txn.nonce)) + } return NonceTooLow } // Transactor should have enough funds to cover the costs @@ -623,6 +645,9 @@ func (p *TxPool) validateTx(txn *TxSlot, isLocal bool, stateCache kvcache.CacheV total.Mul(total, uint256.NewInt(txn.tip)) total.Add(total, &txn.value) if senderBalance.Cmp(total) < 0 { + if txn.traced { + log.Info(fmt.Sprintf("TX TRACING: validateTx insufficient funds idHash=%x balance in state=%d, txn.gas*txn.tip=%d", txn.IdHash, senderBalance, total)) + } return InsufficientFunds } return Success @@ -758,7 +783,11 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions TxSlots) ([]Di reasons = fillDiscardReasons(reasons, newTxs, p.discardReasonsLRU) for i, reason := range reasons { if reason == Success { - p.promoted = append(p.promoted, newTxs.txs[i].IdHash[:]...) + txn := newTxs.txs[i] + if txn.traced { + log.Info(fmt.Sprintf("TX TRACING: AddLocalTxs promotes idHash=%x, senderId=%d", txn.IdHash, txn.senderID)) + } + p.promoted = append(p.promoted, txn.IdHash[:]...) } } if p.promoted.Len() > 0 { @@ -815,6 +844,9 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, continue } discardReasons[i] = NotSet + if txn.traced { + log.Info(fmt.Sprintf("TX TRACING: schedule sendersWithChangedState idHash=%x senderId=%d", txn.IdHash, mt.Tx.senderID)) + } sendersWithChangedState[mt.Tx.senderID] = struct{}{} } @@ -1058,6 +1090,9 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint minFeeCap := uint64(math.MaxUint64) minTip := uint64(math.MaxUint64) byNonce.ascend(senderID, func(mt *metaTx) bool { + if mt.Tx.traced { + log.Info(fmt.Sprintf("TX TRACING: onSenderStateChange loop iteration idHash=%x senderID=%d, senderNonce=%d, txn.nonce=%d, currentSubPool=%b", mt.Tx.IdHash, senderID, senderNonce, mt.Tx.nonce, mt.currentSubPool)) + } minFeeCap = min(minFeeCap, mt.Tx.feeCap) mt.minFeeCap = minFeeCap minTip = min(minTip, mt.Tx.tip) @@ -1122,6 +1157,10 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint mt.subPool |= EnoughFeeCapBlock } + if mt.Tx.traced { + log.Info(fmt.Sprintf("TX TRACING: onSenderStateChange loop iteration idHash=%x senderId=%d subPool=%b", mt.Tx.IdHash, mt.Tx.senderID, mt.subPool)) + } + // 5. Local transaction. Set to 1 if transaction is local. // can't change @@ -1466,7 +1505,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { } txn.rlp = nil // means that we don't need store it in db anymore - txn.senderID = p.senders.getOrCreateID(addr) + txn.senderID, txn.traced = p.senders.getOrCreateID(addr) binary.BigEndian.Uint64(v) isLocalTx := p.isLocalLRU.Contains(string(k)) @@ -1727,26 +1766,31 @@ type sendersBatch struct { senderID uint64 senderIDs map[string]uint64 senderID2Addr map[uint64]string + tracedSenders map[string]struct{} } -func newSendersCache() *sendersBatch { - return &sendersBatch{senderIDs: map[string]uint64{}, senderID2Addr: map[uint64]string{}} +func newSendersCache(tracedSenders map[string]struct{}) *sendersBatch { + return &sendersBatch{senderIDs: map[string]uint64{}, senderID2Addr: map[uint64]string{}, tracedSenders: tracedSenders} } func (sc *sendersBatch) getID(addr []byte) (uint64, bool) { id, ok := sc.senderIDs[string(addr)] return id, ok } -func (sc *sendersBatch) getOrCreateID(addr []byte) uint64 { +func (sc *sendersBatch) getOrCreateID(addr []byte) (uint64, bool) { addrS := string(addr) + _, traced := sc.tracedSenders[addrS] id, ok := sc.senderIDs[addrS] if !ok { sc.senderID++ id = sc.senderID sc.senderIDs[addrS] = id sc.senderID2Addr[id] = addrS + if traced { + log.Info(fmt.Sprintf("TX TRACING: allocated senderID %d to sender %x", id, addr)) + } } - return id + return id, traced } func (sc *sendersBatch) info(cacheView kvcache.CacheView, id uint64) (nonce uint64, balance uint256.Int, err error) { addr, ok := sc.senderID2Addr[id] @@ -1769,7 +1813,7 @@ func (sc *sendersBatch) info(cacheView kvcache.CacheView, id uint64) (nonce uint func (sc *sendersBatch) registerNewSenders(newTxs TxSlots) (err error) { for i, txn := range newTxs.txs { - txn.senderID = sc.getOrCreateID(newTxs.senders.At(i)) + txn.senderID, txn.traced = sc.getOrCreateID(newTxs.senders.At(i)) } return nil } @@ -1781,11 +1825,11 @@ func (sc *sendersBatch) onNewBlock(stateChanges *remote.StateChangeBatch, unwind } for i, txn := range unwindTxs.txs { - txn.senderID = sc.getOrCreateID(unwindTxs.senders.At(i)) + txn.senderID, txn.traced = sc.getOrCreateID(unwindTxs.senders.At(i)) } for i, txn := range minedTxs.txs { - txn.senderID = sc.getOrCreateID(minedTxs.senders.At(i)) + txn.senderID, txn.traced = sc.getOrCreateID(minedTxs.senders.At(i)) } } return nil diff --git a/txpool/pool_test.go b/txpool/pool_test.go index e74e8e4cd..d26a760c8 100644 --- a/txpool/pool_test.go +++ b/txpool/pool_test.go @@ -19,6 +19,7 @@ package txpool import ( "container/heap" "context" + "fmt" "math/rand" "testing" @@ -305,3 +306,126 @@ func TestReplaceWithHigherFee(t *testing.T) { assert.Equal(uint64(3), nonce) } } + +func TestReverseNonces(t *testing.T) { + assert, require := assert.New(t), require.New(t) + ch := make(chan Hashes, 100) + db, coreDB := memdb.NewTestPoolDB(t), memdb.NewTestDB(t) + + cfg := DefaultConfig + sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) + pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1) + assert.NoError(err) + require.True(pool != nil) + ctx := context.Background() + var txID uint64 + _ = coreDB.View(ctx, func(tx kv.Tx) error { + txID = tx.ViewID() + return nil + }) + pendingBaseFee := uint64(1_000_000) + // start blocks from 0, set empty hash - then kvcache will also work on this + h1 := gointerfaces.ConvertHashToH256([32]byte{}) + change := &remote.StateChangeBatch{ + DatabaseViewID: txID, + PendingBlockBaseFee: pendingBaseFee, + ChangeBatch: []*remote.StateChange{ + {BlockHeight: 0, BlockHash: h1}, + }, + } + var addr [20]byte + addr[0] = 1 + v := make([]byte, EncodeSenderLengthForStorage(2, *uint256.NewInt(1 * common.Ether))) + EncodeSender(2, *uint256.NewInt(1 * common.Ether), v) + change.ChangeBatch[0].Changes = append(change.ChangeBatch[0].Changes, &remote.AccountChange{ + Action: remote.Action_UPSERT, + Address: gointerfaces.ConvertAddressToH160(addr), + Data: v, + }) + tx, err := db.BeginRw(ctx) + require.NoError(err) + defer tx.Rollback() + err = pool.OnNewBlock(ctx, change, TxSlots{}, TxSlots{}, tx) + assert.NoError(err) + // 1. Send high fee transaction with nonce gap + { + var txSlots TxSlots + txSlot := &TxSlot{ + tip: 500_000, + feeCap: 3_000_000, + gas: 100000, + nonce: 3, + } + txSlot.IdHash[0] = 1 + txSlots.Append(txSlot, addr[:], true) + + reasons, err := pool.AddLocalTxs(ctx, txSlots) + assert.NoError(err) + for _, reason := range reasons { + assert.Equal(Success, reason, reason.String()) + } + } + fmt.Printf("AFTER TX 1\n") + select { + case hashes := <-ch: + for i := 0; i < hashes.Len(); i++ { + fmt.Printf("propagated hash %x\n", hashes.At(i)) + } + default: + + } + // 2. Send low fee (below base fee) transaction without nonce gap + { + var txSlots TxSlots + txSlot := &TxSlot{ + tip: 500_000, + feeCap: 500_000, + gas: 100000, + nonce: 2, + } + txSlot.IdHash[0] = 2 + txSlots.Append(txSlot, addr[:], true) + + reasons, err := pool.AddLocalTxs(ctx, txSlots) + assert.NoError(err) + for _, reason := range reasons { + assert.Equal(Success, reason, reason.String()) + } + } + fmt.Printf("AFTER TX 2\n") + select { + case hashes := <-ch: + for i := 0; i < hashes.Len(); i++ { + fmt.Printf("propagated hash %x\n", hashes.At(i)) + } + default: + + } + + { + var txSlots TxSlots + txSlot := &TxSlot{ + tip: 600_000, + feeCap: 3_000_000, + gas: 100000, + nonce: 2, + } + txSlot.IdHash[0] = 3 + txSlots.Append(txSlot, addr[:], true) + + reasons, err := pool.AddLocalTxs(ctx, txSlots) + assert.NoError(err) + for _, reason := range reasons { + assert.Equal(Success, reason, reason.String()) + } + } + fmt.Printf("AFTER TX 3\n") + select { + case hashes := <-ch: + for i := 0; i < hashes.Len(); i++ { + fmt.Printf("propagated hash %x\n", hashes.At(i)) + } + default: + + } +} diff --git a/txpool/types.go b/txpool/types.go index 6223b21d3..ec20130c0 100644 --- a/txpool/types.go +++ b/txpool/types.go @@ -84,6 +84,7 @@ type TxSlot struct { value uint256.Int // Value transferred by the transaction IdHash [32]byte // Transaction hash for the purposes of using it as a transaction Id senderID uint64 // SenderID - require external mapping to it's address + traced bool // Whether transaction needs to be traced throughout transcation pool code and generate debug printing creation bool // Set to true if "To" field of the transation is not set dataLen int // Length of transaction's data (for calculation of intrinsic gas) dataNonZeroLen int