non blocking broadcast of new txs (#292)

This commit is contained in:
Alex Sharov 2022-02-01 19:16:43 +07:00 committed by GitHub
parent 1cc07746a4
commit 74d49a9de1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1324,50 +1324,53 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
log.Debug("[txpool] Commit", "written_kb", written/1024, "in", time.Since(t))
}
case h := <-newTxs:
t := time.Now()
notifyMiningAboutNewSlots()
localTxHashes = localTxHashes[:0]
localTxRlps = localTxRlps[:0]
remoteTxHashes = remoteTxHashes[:0]
remoteTxRlps = remoteTxRlps[:0]
if h.Len() > 0 {
if err := db.View(ctx, func(tx kv.Tx) error {
slotsRlp := make([][]byte, 0, h.Len())
for i := 0; i < h.Len(); i++ {
hash := h.At(i)
slotRlp, err := p.GetRlp(tx, hash)
if err != nil {
return err
}
if len(slotRlp) > 0 {
// Empty rlp can happen if a transaction we want to broadcase has just been mined, for example
slotsRlp = append(slotsRlp, slotRlp)
if p.IsLocal(h.At(i)) {
localTxHashes = append(localTxHashes, hash...)
localTxRlps = append(localTxRlps, slotRlp)
} else {
remoteTxHashes = append(localTxHashes, hash...)
remoteTxRlps = append(remoteTxRlps, slotRlp)
go func() {
t := time.Now()
notifyMiningAboutNewSlots()
localTxHashes = localTxHashes[:0]
localTxRlps = localTxRlps[:0]
remoteTxHashes = remoteTxHashes[:0]
remoteTxRlps = remoteTxRlps[:0]
if h.Len() > 0 {
if err := db.View(ctx, func(tx kv.Tx) error {
slotsRlp := make([][]byte, 0, h.Len())
for i := 0; i < h.Len(); i++ {
hash := h.At(i)
slotRlp, err := p.GetRlp(tx, hash)
if err != nil {
return err
}
if len(slotRlp) > 0 {
// Empty rlp can happen if a transaction we want to broadcase has just been mined, for example
slotsRlp = append(slotsRlp, slotRlp)
if p.IsLocal(h.At(i)) {
localTxHashes = append(localTxHashes, hash...)
localTxRlps = append(localTxRlps, slotRlp)
} else {
remoteTxHashes = append(localTxHashes, hash...)
remoteTxRlps = append(remoteTxRlps, slotRlp)
}
}
}
newSlotsStreams.Broadcast(&proto_txpool.OnAddReply{RplTxs: slotsRlp})
return nil
}); err != nil {
log.Error("[txpool] send new slots by grpc", "err", err)
}
newSlotsStreams.Broadcast(&proto_txpool.OnAddReply{RplTxs: slotsRlp})
return nil
}); err != nil {
log.Error("[txpool] send new slots by grpc", "err", err)
}
}
// first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers
txSentTo := send.BroadcastPooledTxs(localTxRlps)
hashSentTo := send.AnnouncePooledTxs(localTxHashes)
for i := 0; i < localTxHashes.Len(); i++ {
hash := localTxHashes.At(i)
log.Info("local tx propagated", "tx_hash", fmt.Sprintf("%x", hash), "announced to peers", hashSentTo[i], "broadcast to peers", txSentTo[i], "baseFee", p.pendingBaseFee.Load())
}
send.BroadcastPooledTxs(remoteTxRlps)
send.AnnouncePooledTxs(remoteTxHashes)
propagateNewTxsTimer.UpdateDuration(t)
// first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers
txSentTo := send.BroadcastPooledTxs(localTxRlps)
hashSentTo := send.AnnouncePooledTxs(localTxHashes)
for i := 0; i < localTxHashes.Len(); i++ {
hash := localTxHashes.At(i)
log.Info("local tx propagated", "tx_hash", fmt.Sprintf("%x", hash), "announced to peers", hashSentTo[i], "broadcast to peers", txSentTo[i], "baseFee", p.pendingBaseFee.Load())
}
send.BroadcastPooledTxs(remoteTxRlps)
send.AnnouncePooledTxs(remoteTxHashes)
propagateNewTxsTimer.UpdateDuration(t)
}()
case <-syncToNewPeersEvery.C: // new peer
newPeers := p.recentlyConnectedPeers.GetAndClean()
if len(newPeers) == 0 {