diff --git a/txpool/fetch_test.go b/txpool/fetch_test.go index f07480ac1..86b54b0b2 100644 --- a/txpool/fetch_test.go +++ b/txpool/fetch_test.go @@ -72,7 +72,7 @@ func TestSendTxPropagate(t *testing.T) { ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() - t.Run("few remote txs", func(t *testing.T) { + t.Run("few remote byHash", func(t *testing.T) { m := NewMockSentry(ctx) send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil, logger) send.BroadcastRemotePooledTxs(toHashes([32]byte{1}, [32]byte{42})) @@ -83,7 +83,7 @@ func TestSendTxPropagate(t *testing.T) { assert.Equal(t, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, first.Id) assert.Equal(t, 68, len(first.Data)) }) - t.Run("much remote txs", func(t *testing.T) { + t.Run("much remote byHash", func(t *testing.T) { m := NewMockSentry(ctx) send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil, logger) list := make(Hashes, p2pTxPacketLimit*3) @@ -100,7 +100,7 @@ func TestSendTxPropagate(t *testing.T) { require.True(t, len(call.Data) > 0) } }) - t.Run("few local txs", func(t *testing.T) { + t.Run("few local byHash", func(t *testing.T) { m := NewMockSentry(ctx) m.SendMessageToAllFunc = func(contextMoqParam context.Context, outboundMessageData *sentry.OutboundMessageData) (*sentry.SentPeers, error) { return &sentry.SentPeers{Peers: make([]*types.H512, 5)}, nil diff --git a/txpool/pool.go b/txpool/pool.go index da4745d87..c3d5f4dd7 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -109,13 +109,17 @@ func (i *nonce2TxItem) Less(than btree.Item) bool { return i.MetaTx.Tx.nonce < than.(*nonce2TxItem).MetaTx.Tx.nonce } -// PoolEnv - holds all pool-related data structures, but has no methods (or simple mutex-based methods) -type PoolEnv struct { +// TxPool - holds all pool-related data structures and lock-based tiny methods +// most of logic implemented by pure tests-friendly functions +type TxPool struct { + lock *sync.RWMutex logger log.Logger protocolBaseFee atomic.Uint64 + blockBaseFee atomic.Uint64 senderInfo map[uint64]SenderInfo + byHash map[string]*MetaTx // tx_hash => tx pending, baseFee, queued *SubPool // fields for transaction propagation @@ -123,28 +127,78 @@ type PoolEnv struct { lastTxPropagationTimestamp time.Time } -func loopStep(env *PoolEnv) { - if false { - unwindedTxs := []*TxSlot{} - unwind(env.senderInfo, unwindedTxs, env.pending) - } - if false { - //TODO: get arrived blocks - minedTxs := []*TxSlot{} - blockBasedFee := env.protocolBaseFee.Load() + 123 - onNewBlocks(env.senderInfo, minedTxs, env.protocolBaseFee.Load(), blockBasedFee, env.pending, env.baseFee, env.queued) - } +func (p *TxPool) GetRlp(hash []byte) []byte { + p.lock.RLock() + defer p.lock.RUnlock() + txn, ok := p.byHash[string(hash)] + if !ok { + return nil + } + return txn.Tx.rlp +} +func (p *TxPool) AppendLocalHashes(buf []byte) { + p.lock.RLock() + defer p.lock.RUnlock() + i := 0 + for hash, txn := range p.byHash { + if txn.SubPool&IsLocal == 0 { + continue + } + copy(buf[i*32:(i+1)*32], hash) + i++ + } +} +func (p *TxPool) AppendRemoteHashes(buf []byte) { + p.lock.RLock() + defer p.lock.RUnlock() + + i := 0 + for hash, txn := range p.byHash { + if txn.SubPool&IsLocal != 0 { + continue + } + copy(buf[i*32:(i+1)*32], hash) + i++ + } +} +func (p *TxPool) IdHashKnown(hash []byte) bool { + p.lock.RLock() + defer p.lock.RUnlock() + + _, ok := p.byHash[string(hash)] + return ok +} +func (p *TxPool) OnNewPeer(peerID PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) } +func (p *TxPool) OnNewBlock(unwindTxs, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64) { + p.lock.Lock() + defer p.lock.Unlock() + p.protocolBaseFee.Store(protocolBaseFee) + p.blockBaseFee.Store(blockBaseFee) + + onNewBlock(p.senderInfo, unwindTxs, minedTxs, protocolBaseFee, blockBaseFee, p.pending, p.baseFee, p.queued, p.byHash) } -// onNewBlocks - apply new highest block (or batch of blocks) +func onNewBlock(senderInfo map[uint64]SenderInfo, unwindTxs, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*MetaTx) { + if unwindTxs != nil { + unwind(senderInfo, unwindTxs, pending, func(i *MetaTx) { + delete(byHash, string(i.Tx.idHash[:])) + }) + } + forward(senderInfo, minedTxs, protocolBaseFee, blockBaseFee, pending, baseFee, queued, func(i *MetaTx) { + delete(byHash, string(i.Tx.idHash[:])) + senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i}) + }) +} + +// forward - apply new highest block (or batch of blocks) // // 1. New best block arrives, which potentially changes the balance and the nonce of some senders. // We use senderIds data structure to find relevant senderId values, and then use senders data structure to // modify state_balance and state_nonce, potentially remove some elements (if transaction with some nonce is // included into a block), and finally, walk over the transaction records and update SubPool fields depending on // the actual presence of nonce gaps and what the balance is. -func onNewBlocks(senderInfo map[uint64]SenderInfo, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool) { +func forward(senderInfo map[uint64]SenderInfo, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, discard func(tx *MetaTx)) { // TODO: change sender.nonce for _, tx := range minedTxs { @@ -166,10 +220,13 @@ func onNewBlocks(senderInfo map[uint64]SenderInfo, minedTxs []*TxSlot, protocolB switch it.MetaTx.currentSubPool { case PendingSubPool: pending.UnsafeRemove(it.MetaTx) + discard(it.MetaTx) case BaseFeeSubPool: baseFee.UnsafeRemove(it.MetaTx) + discard(it.MetaTx) case QueuedSubPool: queued.UnsafeRemove(it.MetaTx) + discard(it.MetaTx) default: //already removed } @@ -184,7 +241,7 @@ func onNewBlocks(senderInfo map[uint64]SenderInfo, minedTxs []*TxSlot, protocolB baseFee.EnforceInvariants() queued.EnforceInvariants() - promote(pending, baseFee, queued) + promote(pending, baseFee, queued, discard) } // unwind @@ -197,10 +254,10 @@ func onNewBlocks(senderInfo map[uint64]SenderInfo, minedTxs []*TxSlot, protocolB // they effective lose their priority over the "remote" transactions. In order to prevent that, // somehow the fact that certain transactions were local, needs to be remembered for some // time (up to some "immutability threshold"). -func unwind(senderInfo map[uint64]SenderInfo, unwindedTxs []*TxSlot, pending *SubPool) { +func unwind(senderInfo map[uint64]SenderInfo, unwindTxs []*TxSlot, pending *SubPool, onAdd func(tx *MetaTx)) { // TODO: change sender.nonce - for _, tx := range unwindedTxs { + for _, tx := range unwindTxs { sender, ok := senderInfo[tx.senderID] if !ok { panic("not implemented yet") @@ -215,6 +272,7 @@ func unwind(senderInfo map[uint64]SenderInfo, unwindedTxs []*TxSlot, pending *Su } } sender.txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{mt}) + onAdd(mt) pending.UnsafeAdd(mt, PendingSubPool) } pending.EnforceInvariants() @@ -275,7 +333,7 @@ func onSenderChange(sender SenderInfo, protocolBaseFee, blockBaseFee uint64) { }) } -func promote(pending, baseFee, queued *SubPool) { +func promote(pending, baseFee, queued *SubPool, discard func(tx *MetaTx)) { //1. If top element in the worst green queue has SubPool != 0b1111 (binary), it needs to be removed from the green pool. // If SubPool < 0b1000 (not satisfying minimum fee), discard. // If SubPool == 0b1110, demote to the yellow pool, otherwise demote to the red pool. @@ -291,7 +349,7 @@ func promote(pending, baseFee, queued *SubPool) { queued.Add(pending.PopWorst(), QueuedSubPool) continue } - pending.PopWorst() + discard(pending.PopWorst()) } //2. If top element in the worst green queue has SubPool == 0b1111, but there is not enough room in the pool, discard. @@ -320,7 +378,7 @@ func promote(pending, baseFee, queued *SubPool) { queued.Add(baseFee.PopWorst(), QueuedSubPool) continue } - baseFee.PopWorst() + discard(baseFee.PopWorst()) } //5. If the top element in the worst yellow queue has SubPool == 0x1110, but there is not enough room in the pool, discard. @@ -328,7 +386,7 @@ func promote(pending, baseFee, queued *SubPool) { if worst.SubPool >= 0b11110 { break } - baseFee.PopWorst() + discard(baseFee.PopWorst()) } //6. If the top element in the best red queue has SubPool == 0x1110, promote to the yellow pool. If SubPool == 0x1111, promote to the green pool. @@ -350,12 +408,12 @@ func promote(pending, baseFee, queued *SubPool) { break } - queued.PopWorst() + discard(queued.PopWorst()) } //8. If the top element in the worst red queue has SubPool >= 0b100, but there is not enough room in the pool, discard. for _ = queued.Worst(); queued.Len() > QueuedSubPoolLimit; _ = queued.Worst() { - queued.PopWorst() + discard(queued.PopWorst()) } } @@ -508,10 +566,10 @@ func NewPool() *PoolImpl { } // Loop - does: -// send pending txs to p2p: -// - new txs -// - all pooled txs to recently connected peers -// - all local pooled txs to random peers periodically +// send pending byHash to p2p: +// - new byHash +// - all pooled byHash to recently connected peers +// - all local pooled byHash to random peers periodically // promote/demote transactions // reorgs func (p *PoolImpl) Loop(ctx context.Context, send *Send, timings Timings) { @@ -531,11 +589,11 @@ func (p *PoolImpl) Loop(ctx context.Context, send *Send, timings Timings) { select { case <-ctx.Done(): return - case <-propagateAllNewTxsEvery.C: // new txs + case <-propagateAllNewTxsEvery.C: // new byHash last := p.lastTxPropagationTimestamp p.lastTxPropagationTimestamp = time.Now() - // first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers + // first broadcast all local byHash to all peers, then non-local to random sqrt(peersAmount) peers localTxHashes = localTxHashes[:0] p.FillLocalHashesSince(last, localTxHashes) initialAmount := len(localTxHashes) @@ -543,7 +601,7 @@ func (p *PoolImpl) Loop(ctx context.Context, send *Send, timings Timings) { if initialAmount == 1 { p.logger.Info("local tx propagated", "to_peers_amount", sentToPeers, "tx_hash", localTxHashes) } else { - p.logger.Info("local txs propagated", "to_peers_amount", sentToPeers, "txs_amount", initialAmount) + p.logger.Info("local byHash propagated", "to_peers_amount", sentToPeers, "txs_amount", initialAmount) } remoteTxHashes = remoteTxHashes[:0] @@ -556,7 +614,7 @@ func (p *PoolImpl) Loop(ctx context.Context, send *Send, timings Timings) { } p.FillRemoteHashes(remoteTxHashes[:0]) send.PropagatePooledTxsToPeersList(newPeers, remoteTxHashes) - case <-broadcastLocalTransactionsEvery.C: // periodically broadcast local txs to random peers + case <-broadcastLocalTransactionsEvery.C: // periodically broadcast local byHash to random peers p.FillLocalHashes(localTxHashes[:0]) send.BroadcastLocalPooledTxs(localTxHashes) } diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index e7777b117..e513a2364 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -173,8 +173,9 @@ func FuzzOnNewBlocks3(f *testing.F) { if !ok { t.Skip() } + byHash := map[string]*MetaTx{} pending, baseFee, queued := NewSubPool(), NewSubPool(), NewSubPool() - onNewBlocks(senders, txs, protocolBaseFee, blockBaseFee, pending, baseFee, queued) + onNewBlock(senders, txs, nil, protocolBaseFee, blockBaseFee, pending, baseFee, queued, byHash) best, worst := pending.Best(), pending.Worst() assert.LessOrEqual(pending.Len(), PendingSubPoolLimit) diff --git a/txpool/types.go b/txpool/types.go index 915bc6a51..4c1bcfb9e 100644 --- a/txpool/types.go +++ b/txpool/types.go @@ -78,6 +78,8 @@ type TxSlot struct { //bestIdx int // Index of the transaction in the best priority queue (of whatever pool it currently belongs to) //worstIdx int // Index of the transaction in the worst priority queue (of whatever pook it currently belongs to) //local bool // Whether transaction has been injected locally (and hence needs priority when mining or proposing a block) + + rlp []byte } const ( @@ -94,7 +96,7 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int) (slot *TxSl if len(payload) == 0 { return nil, sender, 0, fmt.Errorf("%s: empty rlp", ParseTransactionErrorPrefix) } - slot = &TxSlot{} + slot = &TxSlot{rlp: payload} // Compute transaction hash ctx.keccak1.Reset() ctx.keccak2.Reset()