Introduce transaction tracing in tx pool (#205)

* Introduce transaction tracing in tx pool

* Add tracing prints

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
ledgerwatch 2021-12-14 09:40:07 +00:00 committed by GitHub
parent b06f3cec6b
commit c7cd5d65d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 183 additions and 14 deletions

View File

@ -73,9 +73,10 @@ type Config struct {
BaseFeeSubPoolLimit int
QueuedSubPoolLimit int
MinFeeCap uint64
AccountSlots uint64 // Number of executable transaction slots guaranteed per account
PriceBump uint64 // Price bump percentage to replace an already existing transaction
MinFeeCap uint64
AccountSlots uint64 // Number of executable transaction slots guaranteed per account
PriceBump uint64 // Price bump percentage to replace an already existing transaction
TracedSenders []string // List of senders for which tx pool should print out debugging info
}
var DefaultConfig = Config{
@ -316,7 +317,10 @@ func New(newTxs chan Hashes, coreDB kv.RoDB, cfg Config, cache kvcache.Cache, ch
search: sortByNonce{&metaTx{Tx: &TxSlot{}}},
senderIDTxnCount: map[uint64]int{},
}
tracedSenders := make(map[string]struct{})
for _, sender := range cfg.TracedSenders {
tracedSenders[sender] = struct{}{}
}
return &TxPool{
lock: &sync.RWMutex{},
byHash: map[string]*metaTx{},
@ -329,7 +333,7 @@ func New(newTxs chan Hashes, coreDB kv.RoDB, cfg Config, cache kvcache.Cache, ch
queued: NewSubPool(QueuedSubPool, cfg.QueuedSubPoolLimit),
newPendingTxs: newTxs,
_stateCache: cache,
senders: newSendersCache(),
senders: newSendersCache(tracedSenders),
_chainDB: coreDB,
cfg: cfg,
chainID: chainID,
@ -600,22 +604,40 @@ func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs TxSlots) {
func (p *TxPool) validateTx(txn *TxSlot, isLocal bool, stateCache kvcache.CacheView) DiscardReason {
// Drop non-local transactions under our own minimal accepted gas price or tip
if !isLocal && txn.feeCap < p.cfg.MinFeeCap {
if txn.traced {
log.Info(fmt.Sprintf("TX TRACING: validateTx underpriced idHash=%x local=%t, feeCap=%d, cfg.MinFeeCap=%d", txn.IdHash, isLocal, txn.feeCap, p.cfg.MinFeeCap))
}
return UnderPriced
}
gas, reason := CalcIntrinsicGas(uint64(txn.dataLen), uint64(txn.dataNonZeroLen), nil, txn.creation, true, true)
if txn.traced {
log.Info(fmt.Sprintf("TX TRACING: validateTx intrinsic gas idHash=%x gas=%d", txn.IdHash, gas))
}
if reason != Success {
if txn.traced {
log.Info(fmt.Sprintf("TX TRACING: validateTx intrinsic gas calculated failed idHash=%x reason=%s", txn.IdHash, reason))
}
return reason
}
if gas > txn.gas {
if txn.traced {
log.Info(fmt.Sprintf("TX TRACING: validateTx intrinsic gas > txn.gas idHash=%x gas=%d, txn.gas=%d", txn.IdHash, gas, txn.gas))
}
return IntrinsicGas
}
if uint64(p.all.count(txn.senderID)) > p.cfg.AccountSlots {
if txn.traced {
log.Info(fmt.Sprintf("TX TRACING: validateTx marked as spamming idHash=%x slots=%d, limit=%d", txn.IdHash, p.all.count(txn.senderID), p.cfg.AccountSlots))
}
return Spammer
}
// check nonce and balance
senderNonce, senderBalance, _ := p.senders.info(stateCache, txn.senderID)
if senderNonce > txn.nonce {
if txn.traced {
log.Info(fmt.Sprintf("TX TRACING: validateTx nonce too low idHash=%x nonce in state=%d, txn.nonce=%d", txn.IdHash, senderNonce, txn.nonce))
}
return NonceTooLow
}
// Transactor should have enough funds to cover the costs
@ -623,6 +645,9 @@ func (p *TxPool) validateTx(txn *TxSlot, isLocal bool, stateCache kvcache.CacheV
total.Mul(total, uint256.NewInt(txn.tip))
total.Add(total, &txn.value)
if senderBalance.Cmp(total) < 0 {
if txn.traced {
log.Info(fmt.Sprintf("TX TRACING: validateTx insufficient funds idHash=%x balance in state=%d, txn.gas*txn.tip=%d", txn.IdHash, senderBalance, total))
}
return InsufficientFunds
}
return Success
@ -758,7 +783,11 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions TxSlots) ([]Di
reasons = fillDiscardReasons(reasons, newTxs, p.discardReasonsLRU)
for i, reason := range reasons {
if reason == Success {
p.promoted = append(p.promoted, newTxs.txs[i].IdHash[:]...)
txn := newTxs.txs[i]
if txn.traced {
log.Info(fmt.Sprintf("TX TRACING: AddLocalTxs promotes idHash=%x, senderId=%d", txn.IdHash, txn.senderID))
}
p.promoted = append(p.promoted, txn.IdHash[:]...)
}
}
if p.promoted.Len() > 0 {
@ -815,6 +844,9 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
continue
}
discardReasons[i] = NotSet
if txn.traced {
log.Info(fmt.Sprintf("TX TRACING: schedule sendersWithChangedState idHash=%x senderId=%d", txn.IdHash, mt.Tx.senderID))
}
sendersWithChangedState[mt.Tx.senderID] = struct{}{}
}
@ -1058,6 +1090,9 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint
minFeeCap := uint64(math.MaxUint64)
minTip := uint64(math.MaxUint64)
byNonce.ascend(senderID, func(mt *metaTx) bool {
if mt.Tx.traced {
log.Info(fmt.Sprintf("TX TRACING: onSenderStateChange loop iteration idHash=%x senderID=%d, senderNonce=%d, txn.nonce=%d, currentSubPool=%b", mt.Tx.IdHash, senderID, senderNonce, mt.Tx.nonce, mt.currentSubPool))
}
minFeeCap = min(minFeeCap, mt.Tx.feeCap)
mt.minFeeCap = minFeeCap
minTip = min(minTip, mt.Tx.tip)
@ -1122,6 +1157,10 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint
mt.subPool |= EnoughFeeCapBlock
}
if mt.Tx.traced {
log.Info(fmt.Sprintf("TX TRACING: onSenderStateChange loop iteration idHash=%x senderId=%d subPool=%b", mt.Tx.IdHash, mt.Tx.senderID, mt.subPool))
}
// 5. Local transaction. Set to 1 if transaction is local.
// can't change
@ -1466,7 +1505,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
}
txn.rlp = nil // means that we don't need store it in db anymore
txn.senderID = p.senders.getOrCreateID(addr)
txn.senderID, txn.traced = p.senders.getOrCreateID(addr)
binary.BigEndian.Uint64(v)
isLocalTx := p.isLocalLRU.Contains(string(k))
@ -1727,26 +1766,31 @@ type sendersBatch struct {
senderID uint64
senderIDs map[string]uint64
senderID2Addr map[uint64]string
tracedSenders map[string]struct{}
}
func newSendersCache() *sendersBatch {
return &sendersBatch{senderIDs: map[string]uint64{}, senderID2Addr: map[uint64]string{}}
func newSendersCache(tracedSenders map[string]struct{}) *sendersBatch {
return &sendersBatch{senderIDs: map[string]uint64{}, senderID2Addr: map[uint64]string{}, tracedSenders: tracedSenders}
}
func (sc *sendersBatch) getID(addr []byte) (uint64, bool) {
id, ok := sc.senderIDs[string(addr)]
return id, ok
}
func (sc *sendersBatch) getOrCreateID(addr []byte) uint64 {
func (sc *sendersBatch) getOrCreateID(addr []byte) (uint64, bool) {
addrS := string(addr)
_, traced := sc.tracedSenders[addrS]
id, ok := sc.senderIDs[addrS]
if !ok {
sc.senderID++
id = sc.senderID
sc.senderIDs[addrS] = id
sc.senderID2Addr[id] = addrS
if traced {
log.Info(fmt.Sprintf("TX TRACING: allocated senderID %d to sender %x", id, addr))
}
}
return id
return id, traced
}
func (sc *sendersBatch) info(cacheView kvcache.CacheView, id uint64) (nonce uint64, balance uint256.Int, err error) {
addr, ok := sc.senderID2Addr[id]
@ -1769,7 +1813,7 @@ func (sc *sendersBatch) info(cacheView kvcache.CacheView, id uint64) (nonce uint
func (sc *sendersBatch) registerNewSenders(newTxs TxSlots) (err error) {
for i, txn := range newTxs.txs {
txn.senderID = sc.getOrCreateID(newTxs.senders.At(i))
txn.senderID, txn.traced = sc.getOrCreateID(newTxs.senders.At(i))
}
return nil
}
@ -1781,11 +1825,11 @@ func (sc *sendersBatch) onNewBlock(stateChanges *remote.StateChangeBatch, unwind
}
for i, txn := range unwindTxs.txs {
txn.senderID = sc.getOrCreateID(unwindTxs.senders.At(i))
txn.senderID, txn.traced = sc.getOrCreateID(unwindTxs.senders.At(i))
}
for i, txn := range minedTxs.txs {
txn.senderID = sc.getOrCreateID(minedTxs.senders.At(i))
txn.senderID, txn.traced = sc.getOrCreateID(minedTxs.senders.At(i))
}
}
return nil

View File

@ -19,6 +19,7 @@ package txpool
import (
"container/heap"
"context"
"fmt"
"math/rand"
"testing"
@ -305,3 +306,126 @@ func TestReplaceWithHigherFee(t *testing.T) {
assert.Equal(uint64(3), nonce)
}
}
func TestReverseNonces(t *testing.T) {
assert, require := assert.New(t), require.New(t)
ch := make(chan Hashes, 100)
db, coreDB := memdb.NewTestPoolDB(t), memdb.NewTestDB(t)
cfg := DefaultConfig
sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1)
assert.NoError(err)
require.True(pool != nil)
ctx := context.Background()
var txID uint64
_ = coreDB.View(ctx, func(tx kv.Tx) error {
txID = tx.ViewID()
return nil
})
pendingBaseFee := uint64(1_000_000)
// start blocks from 0, set empty hash - then kvcache will also work on this
h1 := gointerfaces.ConvertHashToH256([32]byte{})
change := &remote.StateChangeBatch{
DatabaseViewID: txID,
PendingBlockBaseFee: pendingBaseFee,
ChangeBatch: []*remote.StateChange{
{BlockHeight: 0, BlockHash: h1},
},
}
var addr [20]byte
addr[0] = 1
v := make([]byte, EncodeSenderLengthForStorage(2, *uint256.NewInt(1 * common.Ether)))
EncodeSender(2, *uint256.NewInt(1 * common.Ether), v)
change.ChangeBatch[0].Changes = append(change.ChangeBatch[0].Changes, &remote.AccountChange{
Action: remote.Action_UPSERT,
Address: gointerfaces.ConvertAddressToH160(addr),
Data: v,
})
tx, err := db.BeginRw(ctx)
require.NoError(err)
defer tx.Rollback()
err = pool.OnNewBlock(ctx, change, TxSlots{}, TxSlots{}, tx)
assert.NoError(err)
// 1. Send high fee transaction with nonce gap
{
var txSlots TxSlots
txSlot := &TxSlot{
tip: 500_000,
feeCap: 3_000_000,
gas: 100000,
nonce: 3,
}
txSlot.IdHash[0] = 1
txSlots.Append(txSlot, addr[:], true)
reasons, err := pool.AddLocalTxs(ctx, txSlots)
assert.NoError(err)
for _, reason := range reasons {
assert.Equal(Success, reason, reason.String())
}
}
fmt.Printf("AFTER TX 1\n")
select {
case hashes := <-ch:
for i := 0; i < hashes.Len(); i++ {
fmt.Printf("propagated hash %x\n", hashes.At(i))
}
default:
}
// 2. Send low fee (below base fee) transaction without nonce gap
{
var txSlots TxSlots
txSlot := &TxSlot{
tip: 500_000,
feeCap: 500_000,
gas: 100000,
nonce: 2,
}
txSlot.IdHash[0] = 2
txSlots.Append(txSlot, addr[:], true)
reasons, err := pool.AddLocalTxs(ctx, txSlots)
assert.NoError(err)
for _, reason := range reasons {
assert.Equal(Success, reason, reason.String())
}
}
fmt.Printf("AFTER TX 2\n")
select {
case hashes := <-ch:
for i := 0; i < hashes.Len(); i++ {
fmt.Printf("propagated hash %x\n", hashes.At(i))
}
default:
}
{
var txSlots TxSlots
txSlot := &TxSlot{
tip: 600_000,
feeCap: 3_000_000,
gas: 100000,
nonce: 2,
}
txSlot.IdHash[0] = 3
txSlots.Append(txSlot, addr[:], true)
reasons, err := pool.AddLocalTxs(ctx, txSlots)
assert.NoError(err)
for _, reason := range reasons {
assert.Equal(Success, reason, reason.String())
}
}
fmt.Printf("AFTER TX 3\n")
select {
case hashes := <-ch:
for i := 0; i < hashes.Len(); i++ {
fmt.Printf("propagated hash %x\n", hashes.At(i))
}
default:
}
}

View File

@ -84,6 +84,7 @@ type TxSlot struct {
value uint256.Int // Value transferred by the transaction
IdHash [32]byte // Transaction hash for the purposes of using it as a transaction Id
senderID uint64 // SenderID - require external mapping to it's address
traced bool // Whether transaction needs to be traced throughout transcation pool code and generate debug printing
creation bool // Set to true if "To" field of the transation is not set
dataLen int // Length of transaction's data (for calculation of intrinsic gas)
dataNonZeroLen int