diff --git a/txpool/pool.go b/txpool/pool.go index c52fd479a..3e69dfef7 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -1324,50 +1324,53 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs log.Debug("[txpool] Commit", "written_kb", written/1024, "in", time.Since(t)) } case h := <-newTxs: - t := time.Now() - notifyMiningAboutNewSlots() - localTxHashes = localTxHashes[:0] - localTxRlps = localTxRlps[:0] - remoteTxHashes = remoteTxHashes[:0] - remoteTxRlps = remoteTxRlps[:0] - if h.Len() > 0 { - if err := db.View(ctx, func(tx kv.Tx) error { - slotsRlp := make([][]byte, 0, h.Len()) - for i := 0; i < h.Len(); i++ { - hash := h.At(i) - slotRlp, err := p.GetRlp(tx, hash) - if err != nil { - return err - } - if len(slotRlp) > 0 { - // Empty rlp can happen if a transaction we want to broadcase has just been mined, for example - slotsRlp = append(slotsRlp, slotRlp) - if p.IsLocal(h.At(i)) { - localTxHashes = append(localTxHashes, hash...) - localTxRlps = append(localTxRlps, slotRlp) - } else { - remoteTxHashes = append(localTxHashes, hash...) - remoteTxRlps = append(remoteTxRlps, slotRlp) + go func() { + t := time.Now() + + notifyMiningAboutNewSlots() + localTxHashes = localTxHashes[:0] + localTxRlps = localTxRlps[:0] + remoteTxHashes = remoteTxHashes[:0] + remoteTxRlps = remoteTxRlps[:0] + if h.Len() > 0 { + if err := db.View(ctx, func(tx kv.Tx) error { + slotsRlp := make([][]byte, 0, h.Len()) + for i := 0; i < h.Len(); i++ { + hash := h.At(i) + slotRlp, err := p.GetRlp(tx, hash) + if err != nil { + return err + } + if len(slotRlp) > 0 { + // Empty rlp can happen if a transaction we want to broadcase has just been mined, for example + slotsRlp = append(slotsRlp, slotRlp) + if p.IsLocal(h.At(i)) { + localTxHashes = append(localTxHashes, hash...) + localTxRlps = append(localTxRlps, slotRlp) + } else { + remoteTxHashes = append(localTxHashes, hash...) + remoteTxRlps = append(remoteTxRlps, slotRlp) + } } } + newSlotsStreams.Broadcast(&proto_txpool.OnAddReply{RplTxs: slotsRlp}) + return nil + }); err != nil { + log.Error("[txpool] send new slots by grpc", "err", err) } - newSlotsStreams.Broadcast(&proto_txpool.OnAddReply{RplTxs: slotsRlp}) - return nil - }); err != nil { - log.Error("[txpool] send new slots by grpc", "err", err) } - } - // first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers - txSentTo := send.BroadcastPooledTxs(localTxRlps) - hashSentTo := send.AnnouncePooledTxs(localTxHashes) - for i := 0; i < localTxHashes.Len(); i++ { - hash := localTxHashes.At(i) - log.Info("local tx propagated", "tx_hash", fmt.Sprintf("%x", hash), "announced to peers", hashSentTo[i], "broadcast to peers", txSentTo[i], "baseFee", p.pendingBaseFee.Load()) - } - send.BroadcastPooledTxs(remoteTxRlps) - send.AnnouncePooledTxs(remoteTxHashes) - propagateNewTxsTimer.UpdateDuration(t) + // first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers + txSentTo := send.BroadcastPooledTxs(localTxRlps) + hashSentTo := send.AnnouncePooledTxs(localTxHashes) + for i := 0; i < localTxHashes.Len(); i++ { + hash := localTxHashes.At(i) + log.Info("local tx propagated", "tx_hash", fmt.Sprintf("%x", hash), "announced to peers", hashSentTo[i], "broadcast to peers", txSentTo[i], "baseFee", p.pendingBaseFee.Load()) + } + send.BroadcastPooledTxs(remoteTxRlps) + send.AnnouncePooledTxs(remoteTxHashes) + propagateNewTxsTimer.UpdateDuration(t) + }() case <-syncToNewPeersEvery.C: // new peer newPeers := p.recentlyConnectedPeers.GetAndClean() if len(newPeers) == 0 {