From 7f6eb71c4c7740967df266fb9cad2d3fedd83921 Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Thu, 16 Dec 2021 21:16:04 +0000 Subject: [PATCH] Split up txpool Broadcast and Announce (#209) * Split up Broadcast and Announce * Split up Broadcast and Announce Co-authored-by: Alexey Sharp --- txpool/fetch_test.go | 9 ++++++--- txpool/pool.go | 6 ++++-- txpool/send.go | 15 ++++++++++----- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/txpool/fetch_test.go b/txpool/fetch_test.go index 3eb58e3d5..80add8dab 100644 --- a/txpool/fetch_test.go +++ b/txpool/fetch_test.go @@ -71,7 +71,8 @@ func TestSendTxPropagate(t *testing.T) { t.Run("few remote byHash", func(t *testing.T) { m := NewMockSentry(ctx) send := NewSend(ctx, []direct.SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil) - send.BroadcastPooledTxs(testRlps(2), toHashes(1, 42)) + send.BroadcastPooledTxs(testRlps(2)) + send.AnnouncePooledTxs(toHashes(1, 42)) calls1 := m.SendMessageToRandomPeersCalls() require.Equal(t, 1, len(calls1)) @@ -92,7 +93,8 @@ func TestSendTxPropagate(t *testing.T) { b := []byte(fmt.Sprintf("%x", i)) copy(list[i:i+32], b) } - send.BroadcastPooledTxs(testRlps(len(list)/32), list) + send.BroadcastPooledTxs(testRlps(len(list) / 32)) + send.AnnouncePooledTxs(list) calls1 := m.SendMessageToRandomPeersCalls() require.Equal(t, 1, len(calls1)) calls2 := m.SendMessageToAllCalls() @@ -112,7 +114,8 @@ func TestSendTxPropagate(t *testing.T) { return &sentry.SentPeers{Peers: make([]*types.H256, 5)}, nil } send := NewSend(ctx, []direct.SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil) - send.BroadcastPooledTxs(testRlps(2), toHashes(1, 42)) + send.BroadcastPooledTxs(testRlps(2)) + send.AnnouncePooledTxs(toHashes(1, 42)) calls := m.SendMessageToAllCalls() require.Equal(t, 1, len(calls)) diff --git a/txpool/pool.go b/txpool/pool.go index 0bebd7d22..151eb9255 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -1369,12 +1369,14 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs } // first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers - hashSentTo, txSentTo := send.BroadcastPooledTxs(localTxRlps, localTxHashes) + txSentTo := send.BroadcastPooledTxs(localTxRlps) + hashSentTo := send.AnnouncePooledTxs(localTxHashes) for i := 0; i < localTxHashes.Len(); i++ { hash := localTxHashes.At(i) log.Info("local tx propagated", "tx_hash", fmt.Sprintf("%x", hash), "announced to peers", hashSentTo[i], "broadcast to peers", txSentTo[i], "baseFee", p.pendingBaseFee.Load()) } - send.BroadcastPooledTxs(remoteTxRlps, remoteTxHashes) + send.BroadcastPooledTxs(remoteTxRlps) + send.AnnouncePooledTxs(remoteTxHashes) propagateNewTxsTimer.UpdateDuration(t) case <-syncToNewPeersEvery.C: // new peer newPeers := p.recentlyConnectedPeers.GetAndClean() diff --git a/txpool/send.go b/txpool/send.go index 485b9db91..4c7875648 100644 --- a/txpool/send.go +++ b/txpool/send.go @@ -65,9 +65,9 @@ func (f *Send) notifyTests() { } } -func (f *Send) BroadcastPooledTxs(rlps [][]byte, hashes Hashes) (hashSentTo, txSentTo []int) { +func (f *Send) BroadcastPooledTxs(rlps [][]byte) (txSentTo []int) { defer f.notifyTests() - if len(hashes) == 0 { + if len(rlps) == 0 { return } txSentTo = make([]int, len(rlps)) @@ -94,7 +94,7 @@ func (f *Send) BroadcastPooledTxs(rlps [][]byte, hashes Hashes) (hashSentTo, txS } peers, err := sentryClient.SendMessageToRandomPeers(f.ctx, txs66) if err != nil { - log.Warn("[txpool.send] BroadcastLocalTxs", "err", err) + log.Debug("[txpool.send] BroadcastPooledTxs", "err", err) } if peers != nil { for j := prev; j <= i; j++ { @@ -107,8 +107,13 @@ func (f *Send) BroadcastPooledTxs(rlps [][]byte, hashes Hashes) (hashSentTo, txS size = 0 } } + return +} + +func (f *Send) AnnouncePooledTxs(hashes Hashes) (hashSentTo []int) { + defer f.notifyTests() hashSentTo = make([]int, len(hashes)/32) - prev = 0 + prev := 0 for len(hashes) > 0 { var pending Hashes if len(hashes) > p2pTxPacketLimit { @@ -135,7 +140,7 @@ func (f *Send) BroadcastPooledTxs(rlps [][]byte, hashes Hashes) (hashSentTo, txS } peers, err := sentryClient.SendMessageToAll(f.ctx, hashes66, &grpc.EmptyCallOption{}) if err != nil { - log.Warn("[txpool.send] BroadcastLocalPooledTxs", "err", err) + log.Debug("[txpool.send] AnnouncePooledTxs", "err", err) } if peers != nil { for j, l := prev, pending.Len(); j < prev+l; j++ {