From 0404744146708d7df88c8bc438c62b99c3f51994 Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Fri, 26 Nov 2021 13:55:58 +0000 Subject: [PATCH] Introduce PriceBump, change tx replacement logic, add test (#192) * Introduce PriceBump, change tx replacement logic, add test * Fix TestNonceFromAddress Co-authored-by: Alexey Sharp --- txpool/pool.go | 48 ++++++++++++++------ txpool/pool_test.go | 108 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 139 insertions(+), 17 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 8aaf6051a..8a678430d 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -75,6 +75,7 @@ type Config struct { MinFeeCap uint64 AccountSlots uint64 // Number of executable transaction slots guaranteed per account + PriceBump uint64 // Price bump percentage to replace an already existing transaction } var DefaultConfig = Config{ @@ -89,6 +90,7 @@ var DefaultConfig = Config{ MinFeeCap: 1, AccountSlots: 16, //TODO: to choose right value (16 to be compat with Geth) + PriceBump: 10, // Price bump percentage to replace an already existing transaction } // Pool is interface for the transaction pool @@ -151,6 +153,8 @@ const ( RLPTooLong DiscardReason = 17 NonceTooLow DiscardReason = 18 InsufficientFunds DiscardReason = 19 + NotReplaced DiscardReason = 20 // There was an existing transaction with the same sender and nonce, not enough price bump to replace + DuplicateHash DiscardReason = 21 // There was an existing transaction with the same hash ) func (r DiscardReason) String() string { @@ -193,6 +197,10 @@ func (r DiscardReason) String() string { return "nonce too low" case InsufficientFunds: return "insufficient funds" + case NotReplaced: + return "could not replace existing tx" + case DuplicateHash: + return "existing tx with same hash" default: panic(fmt.Sprintf("discard reason: %d", r)) } @@ -454,7 +462,7 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error { } p.pending.captureAddedHashes(&p.promoted) - if err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, p.pendingBaseFee.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked); err != nil { + if _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, p.pendingBaseFee.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked); err != nil { return err } p.pending.added = nil @@ -736,7 +744,13 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions TxSlots) ([]Di } p.pending.captureAddedHashes(&p.promoted) - if err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, p.pendingBaseFee.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked); err != nil { + if addReasons, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, p.pendingBaseFee.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked); err == nil { + for i, reason := range addReasons { + if reason != NotSet { + reasons[i] = reason + } + } + } else { return nil, err } p.pending.added = nil @@ -770,7 +784,7 @@ func (p *TxPool) cache() kvcache.Cache { func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, newTxs TxSlots, pendingBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, - byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx) bool, discard func(*metaTx, DiscardReason)) error { + byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx) DiscardReason, discard func(*metaTx, DiscardReason)) ([]DiscardReason, error) { protocolBaseFee := calcProtocolBaseFee(pendingBaseFee) if ASSERT { for _, txn := range newTxs.txs { @@ -789,21 +803,25 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, // somehow the fact that certain transactions were local, needs to be remembered for some // time (up to some "immutability threshold"). sendersWithChangedState := map[uint64]struct{}{} + discardReasons := make([]DiscardReason, len(newTxs.txs)) for i, txn := range newTxs.txs { if _, ok := byHash[string(txn.IdHash[:])]; ok { + discardReasons[i] = DuplicateHash continue } mt := newMetaTx(txn, newTxs.isLocal[i], blockNum) - if !add(mt) { + if reason := add(mt); reason != NotSet { + discardReasons[i] = reason continue } + discardReasons[i] = NotSet sendersWithChangedState[mt.Tx.senderID] = struct{}{} } for senderID := range sendersWithChangedState { nonce, balance, err := senders.info(cacheView, senderID) if err != nil { - return err + return discardReasons, err } onSenderStateChange(senderID, nonce, balance, byNonce, protocolBaseFee, pendingBaseFee, pending, baseFee, queued, false) } @@ -815,12 +833,12 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, //pending.EnforceWorstInvariants() pending.EnforceBestInvariants() - return nil + return discardReasons, nil } func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges *remote.StateChangeBatch, senders *sendersBatch, newTxs TxSlots, pendingBaseFee uint64, baseFeeChanged bool, pending *PendingPool, baseFee, queued *SubPool, - byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx) bool, discard func(*metaTx, DiscardReason)) error { + byNonce *BySenderAndNonce, byHash map[string]*metaTx, add func(*metaTx) DiscardReason, discard func(*metaTx, DiscardReason)) error { protocolBaseFee := calcProtocolBaseFee(pendingBaseFee) if ASSERT { for _, txn := range newTxs.txs { @@ -844,7 +862,8 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges continue } mt := newMetaTx(txn, newTxs.isLocal[i], blockNum) - if !add(mt) { + if reason := add(mt); reason != NotSet { + discard(mt, reason) continue } sendersWithChangedState[mt.Tx.senderID] = struct{}{} @@ -897,12 +916,15 @@ func (p *TxPool) setBaseFee(baseFee uint64) (uint64, bool) { return p.pendingBaseFee.Load(), changed } -func (p *TxPool) addLocked(mt *metaTx) bool { +func (p *TxPool) addLocked(mt *metaTx) DiscardReason { // Insert to pending pool, if pool doesn't have txn with same Nonce and bigger Tip found := p.all.get(mt.Tx.senderID, mt.Tx.nonce) if found != nil { - if mt.Tx.tip <= found.Tx.tip { - return false + tipThreshold := found.Tx.tip * (100 + p.cfg.PriceBump) / 100 + feecapThreshold := found.Tx.feeCap * (100 + p.cfg.PriceBump) / 100 + if mt.Tx.tip <= tipThreshold || mt.Tx.feeCap <= feecapThreshold { + // Both tip and feecap need to be larger than previously to replace the transaction + return NotReplaced } switch found.currentSubPool { @@ -931,7 +953,7 @@ func (p *TxPool) addLocked(mt *metaTx) bool { p.isLocalLRU.Add(string(mt.Tx.IdHash[:]), struct{}{}) } p.queued.Add(mt) - return true + return NotSet } // dropping transaction from all sub-structures and from db @@ -1481,7 +1503,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { if err != nil { return err } - if err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs, pendingBaseFee, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked); err != nil { + if _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, txs, pendingBaseFee, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked); err != nil { return err } p.pendingBaseFee.Store(pendingBaseFee) diff --git a/txpool/pool_test.go b/txpool/pool_test.go index 681bdbfb8..e74e8e4cd 100644 --- a/txpool/pool_test.go +++ b/txpool/pool_test.go @@ -157,7 +157,7 @@ func TestNonceFromAddress(t *testing.T) { gas: 100000, nonce: 6, } - txSlot3.IdHash[0] = 2 + txSlot3.IdHash[0] = 3 txSlots.Append(txSlot2, addr[:], true) txSlots.Append(txSlot3, addr[:], true) reasons, err := pool.AddLocalTxs(ctx, txSlots) @@ -167,7 +167,7 @@ func TestNonceFromAddress(t *testing.T) { } nonce, ok := pool.NonceFromAddress(addr) assert.True(ok) - assert.Equal(uint64(4), nonce) + assert.Equal(uint64(6), nonce) } // test too expencive tx { @@ -178,7 +178,7 @@ func TestNonceFromAddress(t *testing.T) { gas: 100000, nonce: 3, } - txSlot1.IdHash[0] = 1 + txSlot1.IdHash[0] = 4 txSlots.Append(txSlot1, addr[:], true) reasons, err := pool.AddLocalTxs(ctx, txSlots) assert.NoError(err) @@ -196,7 +196,7 @@ func TestNonceFromAddress(t *testing.T) { gas: 100000, nonce: 1, } - txSlot1.IdHash[0] = 1 + txSlot1.IdHash[0] = 5 txSlots.Append(txSlot1, addr[:], true) reasons, err := pool.AddLocalTxs(ctx, txSlots) assert.NoError(err) @@ -205,3 +205,103 @@ func TestNonceFromAddress(t *testing.T) { } } } + +func TestReplaceWithHigherFee(t *testing.T) { + assert, require := assert.New(t), require.New(t) + ch := make(chan Hashes, 100) + db, coreDB := memdb.NewTestPoolDB(t), memdb.NewTestDB(t) + + cfg := DefaultConfig + sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) + pool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1) + assert.NoError(err) + require.True(pool != nil) + ctx := context.Background() + var txID uint64 + _ = coreDB.View(ctx, func(tx kv.Tx) error { + txID = tx.ViewID() + return nil + }) + pendingBaseFee := uint64(200000) + // start blocks from 0, set empty hash - then kvcache will also work on this + h1 := gointerfaces.ConvertHashToH256([32]byte{}) + change := &remote.StateChangeBatch{ + DatabaseViewID: txID, + PendingBlockBaseFee: pendingBaseFee, + ChangeBatch: []*remote.StateChange{ + {BlockHeight: 0, BlockHash: h1}, + }, + } + var addr [20]byte + addr[0] = 1 + v := make([]byte, EncodeSenderLengthForStorage(2, *uint256.NewInt(1 * common.Ether))) + EncodeSender(2, *uint256.NewInt(1 * common.Ether), v) + change.ChangeBatch[0].Changes = append(change.ChangeBatch[0].Changes, &remote.AccountChange{ + Action: remote.Action_UPSERT, + Address: gointerfaces.ConvertAddressToH160(addr), + Data: v, + }) + tx, err := db.BeginRw(ctx) + require.NoError(err) + defer tx.Rollback() + err = pool.OnNewBlock(ctx, change, TxSlots{}, TxSlots{}, tx) + assert.NoError(err) + + { + var txSlots TxSlots + txSlot := &TxSlot{ + tip: 300000, + feeCap: 300000, + gas: 100000, + nonce: 3, + } + txSlot.IdHash[0] = 1 + txSlots.Append(txSlot, addr[:], true) + + reasons, err := pool.AddLocalTxs(ctx, txSlots) + assert.NoError(err) + for _, reason := range reasons { + assert.Equal(Success, reason, reason.String()) + } + } + // Bumped only feeCap, transaction not accepted + { + txSlots := TxSlots{} + txSlot := &TxSlot{ + tip: 300000, + feeCap: 3000000, + gas: 100000, + nonce: 3, + } + txSlot.IdHash[0] = 2 + txSlots.Append(txSlot, addr[:], true) + reasons, err := pool.AddLocalTxs(ctx, txSlots) + assert.NoError(err) + for _, reason := range reasons { + assert.Equal(NotReplaced, reason, reason.String()) + } + nonce, ok := pool.NonceFromAddress(addr) + assert.True(ok) + assert.Equal(uint64(3), nonce) + } + // Bumped only tip and feeCap by 10%, tx accepted + { + txSlots := TxSlots{} + txSlot := &TxSlot{ + tip: 330001, + feeCap: 330001, + gas: 100000, + nonce: 3, + } + txSlot.IdHash[0] = 3 + txSlots.Append(txSlot, addr[:], true) + reasons, err := pool.AddLocalTxs(ctx, txSlots) + assert.NoError(err) + for _, reason := range reasons { + assert.Equal(Success, reason, reason.String()) + } + nonce, ok := pool.NonceFromAddress(addr) + assert.True(ok) + assert.Equal(uint64(3), nonce) + } +}