Split up txpool Broadcast and Announce (#209)

* Split up Broadcast and Announce

* Split up Broadcast and Announce

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
ledgerwatch 2021-12-16 21:16:04 +00:00 committed by GitHub
parent 7f82ddaa75
commit 7f6eb71c4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 10 deletions

View File

@ -71,7 +71,8 @@ func TestSendTxPropagate(t *testing.T) {
t.Run("few remote byHash", func(t *testing.T) { t.Run("few remote byHash", func(t *testing.T) {
m := NewMockSentry(ctx) m := NewMockSentry(ctx)
send := NewSend(ctx, []direct.SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil) send := NewSend(ctx, []direct.SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil)
send.BroadcastPooledTxs(testRlps(2), toHashes(1, 42)) send.BroadcastPooledTxs(testRlps(2))
send.AnnouncePooledTxs(toHashes(1, 42))
calls1 := m.SendMessageToRandomPeersCalls() calls1 := m.SendMessageToRandomPeersCalls()
require.Equal(t, 1, len(calls1)) require.Equal(t, 1, len(calls1))
@ -92,7 +93,8 @@ func TestSendTxPropagate(t *testing.T) {
b := []byte(fmt.Sprintf("%x", i)) b := []byte(fmt.Sprintf("%x", i))
copy(list[i:i+32], b) copy(list[i:i+32], b)
} }
send.BroadcastPooledTxs(testRlps(len(list)/32), list) send.BroadcastPooledTxs(testRlps(len(list) / 32))
send.AnnouncePooledTxs(list)
calls1 := m.SendMessageToRandomPeersCalls() calls1 := m.SendMessageToRandomPeersCalls()
require.Equal(t, 1, len(calls1)) require.Equal(t, 1, len(calls1))
calls2 := m.SendMessageToAllCalls() calls2 := m.SendMessageToAllCalls()
@ -112,7 +114,8 @@ func TestSendTxPropagate(t *testing.T) {
return &sentry.SentPeers{Peers: make([]*types.H256, 5)}, nil return &sentry.SentPeers{Peers: make([]*types.H256, 5)}, nil
} }
send := NewSend(ctx, []direct.SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil) send := NewSend(ctx, []direct.SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil)
send.BroadcastPooledTxs(testRlps(2), toHashes(1, 42)) send.BroadcastPooledTxs(testRlps(2))
send.AnnouncePooledTxs(toHashes(1, 42))
calls := m.SendMessageToAllCalls() calls := m.SendMessageToAllCalls()
require.Equal(t, 1, len(calls)) require.Equal(t, 1, len(calls))

View File

@ -1369,12 +1369,14 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
} }
// first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers // first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers
hashSentTo, txSentTo := send.BroadcastPooledTxs(localTxRlps, localTxHashes) txSentTo := send.BroadcastPooledTxs(localTxRlps)
hashSentTo := send.AnnouncePooledTxs(localTxHashes)
for i := 0; i < localTxHashes.Len(); i++ { for i := 0; i < localTxHashes.Len(); i++ {
hash := localTxHashes.At(i) hash := localTxHashes.At(i)
log.Info("local tx propagated", "tx_hash", fmt.Sprintf("%x", hash), "announced to peers", hashSentTo[i], "broadcast to peers", txSentTo[i], "baseFee", p.pendingBaseFee.Load()) log.Info("local tx propagated", "tx_hash", fmt.Sprintf("%x", hash), "announced to peers", hashSentTo[i], "broadcast to peers", txSentTo[i], "baseFee", p.pendingBaseFee.Load())
} }
send.BroadcastPooledTxs(remoteTxRlps, remoteTxHashes) send.BroadcastPooledTxs(remoteTxRlps)
send.AnnouncePooledTxs(remoteTxHashes)
propagateNewTxsTimer.UpdateDuration(t) propagateNewTxsTimer.UpdateDuration(t)
case <-syncToNewPeersEvery.C: // new peer case <-syncToNewPeersEvery.C: // new peer
newPeers := p.recentlyConnectedPeers.GetAndClean() newPeers := p.recentlyConnectedPeers.GetAndClean()

View File

@ -65,9 +65,9 @@ func (f *Send) notifyTests() {
} }
} }
func (f *Send) BroadcastPooledTxs(rlps [][]byte, hashes Hashes) (hashSentTo, txSentTo []int) { func (f *Send) BroadcastPooledTxs(rlps [][]byte) (txSentTo []int) {
defer f.notifyTests() defer f.notifyTests()
if len(hashes) == 0 { if len(rlps) == 0 {
return return
} }
txSentTo = make([]int, len(rlps)) txSentTo = make([]int, len(rlps))
@ -94,7 +94,7 @@ func (f *Send) BroadcastPooledTxs(rlps [][]byte, hashes Hashes) (hashSentTo, txS
} }
peers, err := sentryClient.SendMessageToRandomPeers(f.ctx, txs66) peers, err := sentryClient.SendMessageToRandomPeers(f.ctx, txs66)
if err != nil { if err != nil {
log.Warn("[txpool.send] BroadcastLocalTxs", "err", err) log.Debug("[txpool.send] BroadcastPooledTxs", "err", err)
} }
if peers != nil { if peers != nil {
for j := prev; j <= i; j++ { for j := prev; j <= i; j++ {
@ -107,8 +107,13 @@ func (f *Send) BroadcastPooledTxs(rlps [][]byte, hashes Hashes) (hashSentTo, txS
size = 0 size = 0
} }
} }
return
}
func (f *Send) AnnouncePooledTxs(hashes Hashes) (hashSentTo []int) {
defer f.notifyTests()
hashSentTo = make([]int, len(hashes)/32) hashSentTo = make([]int, len(hashes)/32)
prev = 0 prev := 0
for len(hashes) > 0 { for len(hashes) > 0 {
var pending Hashes var pending Hashes
if len(hashes) > p2pTxPacketLimit { if len(hashes) > p2pTxPacketLimit {
@ -135,7 +140,7 @@ func (f *Send) BroadcastPooledTxs(rlps [][]byte, hashes Hashes) (hashSentTo, txS
} }
peers, err := sentryClient.SendMessageToAll(f.ctx, hashes66, &grpc.EmptyCallOption{}) peers, err := sentryClient.SendMessageToAll(f.ctx, hashes66, &grpc.EmptyCallOption{})
if err != nil { if err != nil {
log.Warn("[txpool.send] BroadcastLocalPooledTxs", "err", err) log.Debug("[txpool.send] AnnouncePooledTxs", "err", err)
} }
if peers != nil { if peers != nil {
for j, l := prev, pending.Len(); j < prev+l; j++ { for j, l := prev, pending.Len(); j < prev+l; j++ {