diff --git a/txpool/fetch.go b/txpool/fetch.go index 0af5b2d96..40aa7424a 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -37,7 +37,7 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) -// Fetch connects to sentry and implements eth/65 or eth/66 protocol regarding the transaction +// Fetch connects to sentry and implements eth/66 protocol regarding the transaction // messages. It tries to "prime" the sentry with StatusData message containing given // genesis hash and list of forks, but with zero max block and total difficulty // Sentry should have a logic not to overwrite statusData with messages from tx pool @@ -166,10 +166,6 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl streamCtx, cancel := context.WithCancel(ctx) defer cancel() stream, err := sentryClient.Messages(streamCtx, &sentry.MessagesRequest{Ids: []sentry.MessageId{ - sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65, - sentry.MessageId_GET_POOLED_TRANSACTIONS_65, - sentry.MessageId_TRANSACTIONS_65, - sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, sentry.MessageId_GET_POOLED_TRANSACTIONS_66, sentry.MessageId_TRANSACTIONS_66, @@ -230,7 +226,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes defer tx.Rollback() switch req.Id { - case sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65: + case sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66: hashCount, pos, err := ParseHashesCount(req.Data, 0) if err != nil { return fmt.Errorf("parsing NewPooledTransactionHashes: %w", err) @@ -259,9 +255,6 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes return err } messageId = sentry.MessageId_GET_POOLED_TRANSACTIONS_66 - case sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65: - encodedRequest = EncodeHashes(unknownHashes, nil) - messageId = sentry.MessageId_GET_POOLED_TRANSACTIONS_65 default: return fmt.Errorf("unexpected message: %s", req.Id.String()) } @@ -272,7 +265,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes return err } } - case sentry.MessageId_GET_POOLED_TRANSACTIONS_66, sentry.MessageId_GET_POOLED_TRANSACTIONS_65: + case sentry.MessageId_GET_POOLED_TRANSACTIONS_66: //TODO: handleInboundMessage is single-threaded - means it can accept as argument couple buffers (or analog of txParseContext). Protobuf encoding will copy data anyway, but DirectClient doesn't var encodedRequest []byte var messageId sentry.MessageId @@ -297,24 +290,6 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes } encodedRequest = EncodePooledTransactions66(txs, requestID, nil) - case sentry.MessageId_GET_POOLED_TRANSACTIONS_65: - messageId = sentry.MessageId_POOLED_TRANSACTIONS_65 - hashes, _, err := ParseGetPooledTransactions65(req.Data, 0, nil) - if err != nil { - return err - } - var txs [][]byte - for i := 0; i < len(hashes); i += 32 { - txn, err := f.pool.GetRlp(tx, hashes[i:i+32]) - if err != nil { - return err - } - if txn == nil { - continue - } - txs = append(txs, txn) - } - encodedRequest = EncodePooledTransactions65(txs, nil) default: return fmt.Errorf("unexpected message: %s", req.Id.String()) } @@ -325,7 +300,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes }, &grpc.EmptyCallOption{}); err != nil { return err } - case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_POOLED_TRANSACTIONS_66, sentry.MessageId_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_66: + case sentry.MessageId_POOLED_TRANSACTIONS_66, sentry.MessageId_TRANSACTIONS_66: txs := TxSlots{} if err := f.threadSafeParsePooledTxn(func(parseContext *TxParseContext) error { parseContext.ValidateHash(func(hash []byte) error { @@ -344,9 +319,9 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes } switch req.Id { - case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_66: + case sentry.MessageId_TRANSACTIONS_66: if err := f.threadSafeParsePooledTxn(func(parseContext *TxParseContext) error { - if _, err := ParsePooledTransactions65(req.Data, 0, parseContext, &txs); err != nil { + if _, err := ParseTransactions(req.Data, 0, parseContext, &txs); err != nil { return err } return nil diff --git a/txpool/fetch_test.go b/txpool/fetch_test.go index 04d5610f6..3eb58e3d5 100644 --- a/txpool/fetch_test.go +++ b/txpool/fetch_test.go @@ -71,13 +71,18 @@ func TestSendTxPropagate(t *testing.T) { t.Run("few remote byHash", func(t *testing.T) { m := NewMockSentry(ctx) send := NewSend(ctx, []direct.SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil) - send.BroadcastRemotePooledTxs(toHashes(1, 42)) + send.BroadcastPooledTxs(testRlps(2), toHashes(1, 42)) - calls := m.SendMessageToRandomPeersCalls() - require.Equal(t, 1, len(calls)) - first := calls[0].SendMessageToRandomPeersRequest.Data - assert.Equal(t, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, first.Id) - assert.Equal(t, 68, len(first.Data)) + calls1 := m.SendMessageToRandomPeersCalls() + require.Equal(t, 1, len(calls1)) + calls2 := m.SendMessageToAllCalls() + require.Equal(t, 1, len(calls2)) + first := calls1[0].SendMessageToRandomPeersRequest.Data + assert.Equal(t, sentry.MessageId_TRANSACTIONS_66, first.Id) + assert.Equal(t, 5, len(first.Data)) + second := calls2[0].OutboundMessageData + assert.Equal(t, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, second.Id) + assert.Equal(t, 68, len(second.Data)) }) t.Run("much remote byHash", func(t *testing.T) { m := NewMockSentry(ctx) @@ -87,13 +92,18 @@ func TestSendTxPropagate(t *testing.T) { b := []byte(fmt.Sprintf("%x", i)) copy(list[i:i+32], b) } - send.BroadcastRemotePooledTxs(list) - calls := m.SendMessageToRandomPeersCalls() - require.Equal(t, 3, len(calls)) + send.BroadcastPooledTxs(testRlps(len(list)/32), list) + calls1 := m.SendMessageToRandomPeersCalls() + require.Equal(t, 1, len(calls1)) + calls2 := m.SendMessageToAllCalls() + require.Equal(t, 3, len(calls2)) + call1 := calls1[0].SendMessageToRandomPeersRequest.Data + require.Equal(t, sentry.MessageId_TRANSACTIONS_66, call1.Id) + require.True(t, len(call1.Data) > 0) for i := 0; i < 3; i++ { - call := calls[i].SendMessageToRandomPeersRequest.Data - require.Equal(t, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, call.Id) - require.True(t, len(call.Data) > 0) + call2 := calls2[i].OutboundMessageData + require.Equal(t, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, call2.Id) + require.True(t, len(call2.Data) > 0) } }) t.Run("few local byHash", func(t *testing.T) { @@ -102,7 +112,7 @@ func TestSendTxPropagate(t *testing.T) { return &sentry.SentPeers{Peers: make([]*types.H256, 5)}, nil } send := NewSend(ctx, []direct.SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil) - send.BroadcastLocalPooledTxs(toHashes(1, 42)) + send.BroadcastPooledTxs(testRlps(2), toHashes(1, 42)) calls := m.SendMessageToAllCalls() require.Equal(t, 1, len(calls)) diff --git a/txpool/packets.go b/txpool/packets.go index 2f824f85b..cbc13c30c 100644 --- a/txpool/packets.go +++ b/txpool/packets.go @@ -156,7 +156,7 @@ func EncodePooledTransactions66(txsRlp [][]byte, requestId uint64, encodeBuf []b _ = pos return encodeBuf } -func EncodePooledTransactions65(txsRlp [][]byte, encodeBuf []byte) []byte { +func EncodeTransactions(txsRlp [][]byte, encodeBuf []byte) []byte { pos := 0 dataLen := 0 for i := range txsRlp { @@ -184,7 +184,7 @@ func EncodePooledTransactions65(txsRlp [][]byte, encodeBuf []byte) []byte { return encodeBuf } -func ParsePooledTransactions65(payload []byte, pos int, ctx *TxParseContext, txSlots *TxSlots) (newPos int, err error) { +func ParseTransactions(payload []byte, pos int, ctx *TxParseContext, txSlots *TxSlots) (newPos int, err error) { pos, _, err = rlp.List(payload, pos) if err != nil { return 0, err diff --git a/txpool/pool.go b/txpool/pool.go index 2699adb66..0bebd7d22 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -235,6 +235,18 @@ const PendingSubPool SubPoolType = 1 const BaseFeeSubPool SubPoolType = 2 const QueuedSubPool SubPoolType = 3 +func (sp SubPoolType) String() string { + switch sp { + case PendingSubPool: + return "Pending" + case BaseFeeSubPool: + return "BaseFee" + case QueuedSubPool: + return "Queued" + } + return fmt.Sprintf("Unknown:%d", sp) +} + // sender - immutable structure which stores only nonce and balance of account type sender struct { balance uint256.Int @@ -408,11 +420,13 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang //log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "baseFee", baseFee, "blockHeight", blockHeight) - p.pending.captureAddedHashes(&p.promoted) + p.pending.resetAddedHashes() + p.baseFee.resetAddedHashes() if err := addTxsOnNewBlock(p.lastSeenBlock.Load(), cacheView, stateChanges, p.senders, unwindTxs, pendingBaseFee, baseFeeChanged, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked); err != nil { return err } - p.pending.added = nil + p.promoted = p.pending.appendAddedHashes(p.promoted[:0]) + p.promoted = p.baseFee.appendAddedHashes(p.promoted) if p.started.CAS(false, true) { log.Info("[txpool] Started") @@ -420,7 +434,7 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang if p.promoted.Len() > 0 { select { - case p.newPendingTxs <- common.Copy(p.promoted): + case p.newPendingTxs <- p.promoted.DedupCopy(): default: } } @@ -465,17 +479,19 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error { return err } - p.pending.captureAddedHashes(&p.promoted) + p.pending.resetAddedHashes() + p.baseFee.resetAddedHashes() if _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, p.pendingBaseFee.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked); err != nil { return err } - p.pending.added = nil + p.promoted = p.pending.appendAddedHashes(p.promoted[:0]) + p.promoted = p.baseFee.appendAddedHashes(p.promoted) if p.promoted.Len() > 0 { select { case <-ctx.Done(): return nil - case p.newPendingTxs <- common.Copy(p.promoted): + case p.newPendingTxs <- p.promoted.DedupCopy(): default: } } @@ -768,7 +784,8 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions TxSlots) ([]Di return nil, err } - p.pending.captureAddedHashes(&p.promoted) + p.pending.resetAddedHashes() + p.baseFee.resetAddedHashes() if addReasons, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, p.pendingBaseFee.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked); err == nil { for i, reason := range addReasons { if reason != NotSet { @@ -778,7 +795,8 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions TxSlots) ([]Di } else { return nil, err } - p.pending.added = nil + p.promoted = p.pending.appendAddedHashes(p.promoted[:0]) + p.promoted = p.baseFee.appendAddedHashes(p.promoted) reasons = fillDiscardReasons(reasons, newTxs, p.discardReasonsLRU) for i, reason := range reasons { @@ -792,7 +810,7 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions TxSlots) ([]Di } if p.promoted.Len() > 0 { select { - case p.newPendingTxs <- common.Copy(p.promoted): + case p.newPendingTxs <- p.promoted.DedupCopy(): default: } } @@ -858,11 +876,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, onSenderStateChange(senderID, nonce, balance, byNonce, protocolBaseFee, pendingBaseFee, pending, baseFee, queued, false) } - //pending.EnforceWorstInvariants() - //baseFee.EnforceInvariants() - //queued.EnforceInvariants() promote(pending, baseFee, queued, discard) - //pending.EnforceWorstInvariants() pending.EnforceBestInvariants() return discardReasons, nil @@ -1077,7 +1091,7 @@ func onBaseFeeChange(byNonce *BySenderAndNonce, pendingBaseFee uint64) { // 4. Dynamic fee requirement. Set to 1 if feeCap of the transaction is no less than // baseFee of the currently pending block. Set to 0 otherwise. mt.subPool &^= EnoughFeeCapBlock - if mt.Tx.feeCap >= pendingBaseFee { + if mt.minFeeCap >= pendingBaseFee { mt.subPool |= EnoughFeeCapBlock } return true @@ -1115,7 +1129,7 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint // parameter of minimal base fee. Set to 0 if feeCap is less than minimum base fee, which means // this transaction will never be included into this particular chain. mt.subPool &^= EnoughFeeCapProtocol - if mt.Tx.feeCap >= protocolBaseFee { + if mt.minFeeCap >= protocolBaseFee { mt.subPool |= EnoughFeeCapProtocol } else { mt.subPool = 0 // TODO: we immediately drop all transactions if they have no first bit - then maybe we don't need this bit at all? And don't add such transactions to queue? @@ -1178,6 +1192,8 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint }) } +// promote reasserts invariants of the subpool and returns the list of transactions that ended up +// being promoted to the pending or basefee pool, for re-broadcasting func promote(pending *PendingPool, baseFee, queued *SubPool, discard func(*metaTx, DiscardReason)) { //1. If top element in the worst green queue has subPool != 0b1111 (binary), it needs to be removed from the green pool. // If subPool < 0b1000 (not satisfying minimum fee), discard. @@ -1245,6 +1261,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, discard func(*metaT } pending.Add(queued.PopBest()) + } //7. If the top element in the worst red queue has subPool < 0b1000 (not satisfying minimum fee), discard. @@ -1278,8 +1295,10 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs logEvery := time.NewTicker(p.cfg.LogEvery) defer logEvery.Stop() - localTxHashes := make([]byte, 0, 128) - remoteTxHashes := make([]byte, 0, 128) + localTxHashes := make(Hashes, 0, 128) + localTxRlps := make([][]byte, 0, 4) + remoteTxHashes := make(Hashes, 0, 128) + remoteTxRlps := make([][]byte, 0, 4) for { select { @@ -1320,15 +1339,27 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs case h := <-newTxs: t := time.Now() notifyMiningAboutNewSlots() + localTxHashes = localTxHashes[:0] + localTxRlps = localTxRlps[:0] + remoteTxHashes = remoteTxHashes[:0] + remoteTxRlps = remoteTxRlps[:0] if h.Len() > 0 { if err := db.View(ctx, func(tx kv.Tx) error { slotsRlp := make([][]byte, 0, h.Len()) for i := 0; i < h.Len(); i++ { - slotRlp, err := p.GetRlp(tx, h.At(i)) + hash := h.At(i) + slotRlp, err := p.GetRlp(tx, hash) if err != nil { return err } slotsRlp = append(slotsRlp, slotRlp) + if p.IsLocal(h.At(i)) { + localTxHashes = append(localTxHashes, hash...) + localTxRlps = append(localTxRlps, slotRlp) + } else { + remoteTxHashes = append(localTxHashes, hash...) + remoteTxRlps = append(remoteTxRlps, slotRlp) + } } newSlotsStreams.Broadcast(&proto_txpool.OnAddReply{RplTxs: slotsRlp}) return nil @@ -1338,26 +1369,12 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs } // first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers - localTxHashes = localTxHashes[:0] - remoteTxHashes = remoteTxHashes[:0] - - for i := 0; i < h.Len(); i++ { - if p.IsLocal(h.At(i)) { - localTxHashes = append(localTxHashes, h.At(i)...) - } else { - remoteTxHashes = append(localTxHashes, h.At(i)...) - } + hashSentTo, txSentTo := send.BroadcastPooledTxs(localTxRlps, localTxHashes) + for i := 0; i < localTxHashes.Len(); i++ { + hash := localTxHashes.At(i) + log.Info("local tx propagated", "tx_hash", fmt.Sprintf("%x", hash), "announced to peers", hashSentTo[i], "broadcast to peers", txSentTo[i], "baseFee", p.pendingBaseFee.Load()) } - - sentTo := send.BroadcastLocalPooledTxs(localTxHashes) - if len(localTxHashes)/32 > 0 { - if len(localTxHashes)/32 == 1 { - log.Info("local tx propagated", "to_peers_amount", sentTo, "tx_hash", fmt.Sprintf("%x", localTxHashes), "baseFee", p.pendingBaseFee.Load()) - } else { - log.Info("local txs propagated", "to_peers_amount", sentTo, "txs_amount", len(localTxHashes)/32, "baseFee", p.pendingBaseFee.Load()) - } - } - send.BroadcastRemotePooledTxs(remoteTxHashes) + send.BroadcastPooledTxs(remoteTxRlps, remoteTxHashes) propagateNewTxsTimer.UpdateDuration(t) case <-syncToNewPeersEvery.C: // new peer newPeers := p.recentlyConnectedPeers.GetAndClean() @@ -1943,20 +1960,26 @@ func (b *BySenderAndNonce) replaceOrInsert(mt *metaTx) *metaTx { // It's more expensive to maintain "slice sort" invariant, but it allow do cheap copy of // pending.best slice for mining (because we consider txs and metaTx are immutable) type PendingPool struct { - limit int - t SubPoolType - best bestSlice - worst *WorstQueue - added *Hashes + limit int + t SubPoolType + best bestSlice + worst *WorstQueue + adding bool + added Hashes } func NewPendingSubPool(t SubPoolType, limit int) *PendingPool { return &PendingPool{limit: limit, t: t, best: []*metaTx{}, worst: &WorstQueue{}} } -func (p *PendingPool) captureAddedHashes(to *Hashes) { - p.added = to - *p.added = (*p.added)[:0] +func (p *PendingPool) resetAddedHashes() { + p.added = p.added[:0] + p.adding = true +} +func (p *PendingPool) appendAddedHashes(h Hashes) Hashes { + h = append(h, p.added...) + p.adding = false + return h } // bestSlice - is similar to best queue, but with O(n log n) complexity and @@ -2029,16 +2052,22 @@ func (p *PendingPool) UnsafeRemove(i *metaTx) { p.best = p.best.UnsafeRemove(i) } func (p *PendingPool) UnsafeAdd(i *metaTx) { - if p.added != nil { - *p.added = append(*p.added, i.Tx.IdHash[:]...) + if p.adding { + p.added = append(p.added, i.Tx.IdHash[:]...) + } + if i.Tx.traced { + log.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IdHash, i.Tx.senderID)) } i.currentSubPool = p.t p.worst.Push(i) p.best = p.best.UnsafeAdd(i) } func (p *PendingPool) Add(i *metaTx) { - if p.added != nil { - *p.added = append(*p.added, i.Tx.IdHash[:]...) + if p.adding { + p.added = append(p.added, i.Tx.IdHash[:]...) + } + if i.Tx.traced { + log.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IdHash, i.Tx.senderID)) } i.currentSubPool = p.t heap.Push(p.worst, i) @@ -2054,16 +2083,28 @@ func (p *PendingPool) DebugPrint(prefix string) { } type SubPool struct { - limit int - t SubPoolType - best *BestQueue - worst *WorstQueue + limit int + t SubPoolType + best *BestQueue + worst *WorstQueue + adding bool + added Hashes } func NewSubPool(t SubPoolType, limit int) *SubPool { return &SubPool{limit: limit, t: t, best: &BestQueue{}, worst: &WorstQueue{}} } +func (p *SubPool) resetAddedHashes() { + p.added = p.added[:0] + p.adding = true +} +func (p *SubPool) appendAddedHashes(h Hashes) Hashes { + h = append(h, p.added...) + p.adding = false + return h +} + func (p *SubPool) EnforceInvariants() { heap.Init(p.worst) heap.Init(p.best) @@ -2092,6 +2133,12 @@ func (p *SubPool) PopWorst() *metaTx { } func (p *SubPool) Len() int { return p.best.Len() } func (p *SubPool) Add(i *metaTx) { + if p.adding { + p.added = append(p.added, i.Tx.IdHash[:]...) + } + if i.Tx.traced { + log.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IdHash, i.Tx.senderID)) + } i.currentSubPool = p.t heap.Push(p.best, i) heap.Push(p.worst, i) @@ -2126,6 +2173,12 @@ func (p *SubPool) UnsafeRemove(i *metaTx) { p.best.Pop() } func (p *SubPool) UnsafeAdd(i *metaTx) { + if p.adding { + p.added = append(p.added, i.Tx.IdHash[:]...) + } + if i.Tx.traced { + log.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IdHash, i.Tx.senderID)) + } i.currentSubPool = p.t p.worst.Push(i) p.best.Push(i) diff --git a/txpool/send.go b/txpool/send.go index b0823327d..485b9db91 100644 --- a/txpool/send.go +++ b/txpool/send.go @@ -65,120 +65,88 @@ func (f *Send) notifyTests() { } } -func (f *Send) BroadcastLocalPooledTxs(txs Hashes) (sentToPeers int) { +func (f *Send) BroadcastPooledTxs(rlps [][]byte, hashes Hashes) (hashSentTo, txSentTo []int) { defer f.notifyTests() - if len(txs) == 0 { + if len(hashes) == 0 { return } - - avgPeersPerSent65 := 0 - avgPeersPerSent66 := 0 - for len(txs) > 0 { + txSentTo = make([]int, len(rlps)) + var prev, size int + for i, l := 0, len(rlps); i < len(rlps); i++ { + size += len(rlps[i]) + if i == l-1 || size >= p2pTxPacketLimit { + txsData := EncodeTransactions(rlps[prev:i+1], nil) + var txs66 *sentry.SendMessageToRandomPeersRequest + for _, sentryClient := range f.sentryClients { + if !sentryClient.Ready() { + continue + } + switch sentryClient.Protocol() { + case direct.ETH66: + if txs66 == nil { + txs66 = &sentry.SendMessageToRandomPeersRequest{ + Data: &sentry.OutboundMessageData{ + Id: sentry.MessageId_TRANSACTIONS_66, + Data: txsData, + }, + MaxPeers: 100, + } + } + peers, err := sentryClient.SendMessageToRandomPeers(f.ctx, txs66) + if err != nil { + log.Warn("[txpool.send] BroadcastLocalTxs", "err", err) + } + if peers != nil { + for j := prev; j <= i; j++ { + txSentTo[j] = len(peers.Peers) + } + } + } + } + prev = i + 1 + size = 0 + } + } + hashSentTo = make([]int, len(hashes)/32) + prev = 0 + for len(hashes) > 0 { var pending Hashes - if len(txs) > p2pTxPacketLimit { - pending = txs[:p2pTxPacketLimit] - txs = txs[p2pTxPacketLimit:] + if len(hashes) > p2pTxPacketLimit { + pending = hashes[:p2pTxPacketLimit] + hashes = hashes[p2pTxPacketLimit:] } else { - pending = txs[:] - txs = txs[:0] + pending = hashes[:] + hashes = hashes[:0] } - data := EncodeHashes(pending, nil) - var req66, req65 *sentry.OutboundMessageData + hashesData := EncodeHashes(pending, nil) + var hashes66 *sentry.OutboundMessageData for _, sentryClient := range f.sentryClients { if !sentryClient.Ready() { continue } switch sentryClient.Protocol() { - case direct.ETH65: - if req65 == nil { - req65 = &sentry.OutboundMessageData{ - Id: sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65, - Data: data, - } - } - - peers, err := sentryClient.SendMessageToAll(f.ctx, req65, &grpc.EmptyCallOption{}) - if err != nil { - log.Warn("[txpool.send] BroadcastLocalPooledTxs", "err", err) - } else if peers != nil { - avgPeersPerSent65 += len(peers.Peers) - } case direct.ETH66: - if req66 == nil { - req66 = &sentry.OutboundMessageData{ + if hashes66 == nil { + hashes66 = &sentry.OutboundMessageData{ Id: sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, - Data: data, + Data: hashesData, } } - peers, err := sentryClient.SendMessageToAll(f.ctx, req66, &grpc.EmptyCallOption{}) + peers, err := sentryClient.SendMessageToAll(f.ctx, hashes66, &grpc.EmptyCallOption{}) if err != nil { log.Warn("[txpool.send] BroadcastLocalPooledTxs", "err", err) - } else if peers != nil { - avgPeersPerSent66 += len(peers.Peers) } - } - } - } - return avgPeersPerSent65 + avgPeersPerSent66 -} - -func (f *Send) BroadcastRemotePooledTxs(txs Hashes) { - defer f.notifyTests() - - if len(txs) == 0 { - return - } - - for len(txs) > 0 { - var pending Hashes - if len(txs) > p2pTxPacketLimit { - pending = txs[:p2pTxPacketLimit] - txs = txs[p2pTxPacketLimit:] - } else { - pending = txs[:] - txs = txs[:0] - } - - data := EncodeHashes(pending, nil) - var req66, req65 *sentry.SendMessageToRandomPeersRequest - for _, sentryClient := range f.sentryClients { - if !sentryClient.Ready() { - continue - } - - switch sentryClient.Protocol() { - case direct.ETH65: - if req65 == nil { - req65 = &sentry.SendMessageToRandomPeersRequest{ - MaxPeers: 1024, - Data: &sentry.OutboundMessageData{ - Id: sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65, - Data: data, - }, + if peers != nil { + for j, l := prev, pending.Len(); j < prev+l; j++ { + hashSentTo[j] = len(peers.Peers) } } - - if _, err := sentryClient.SendMessageToRandomPeers(f.ctx, req65, &grpc.EmptyCallOption{}); err != nil { - log.Warn("[txpool.send] BroadcastRemotePooledTxs", "err", err) - } - - case direct.ETH66: - if req66 == nil { - req66 = &sentry.SendMessageToRandomPeersRequest{ - MaxPeers: 1024, - Data: &sentry.OutboundMessageData{ - Id: sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, - Data: data, - }, - } - } - if _, err := sentryClient.SendMessageToRandomPeers(f.ctx, req66, &grpc.EmptyCallOption{}); err != nil { - log.Warn("[txpool.send] BroadcastRemotePooledTxs", "err", err) - } } } + prev += pending.Len() } + return } func (f *Send) PropagatePooledTxsToPeersList(peers []PeerID, txs []byte) { @@ -206,19 +174,6 @@ func (f *Send) PropagatePooledTxsToPeersList(peers []PeerID, txs []byte) { for _, peer := range peers { switch sentryClient.Protocol() { - case direct.ETH65: - req65 := &sentry.SendMessageByIdRequest{ - PeerId: peer, - Data: &sentry.OutboundMessageData{ - Id: sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65, - Data: data, - }, - } - - if _, err := sentryClient.SendMessageById(f.ctx, req65, &grpc.EmptyCallOption{}); err != nil { - log.Warn("[txpool.send] PropagatePooledTxsToPeersList", "err", err) - } - case direct.ETH66: req66 := &sentry.SendMessageByIdRequest{ PeerId: peer, diff --git a/txpool/test_util.go b/txpool/test_util.go index b3b037c2f..1ee5c9a90 100644 --- a/txpool/test_util.go +++ b/txpool/test_util.go @@ -98,6 +98,14 @@ func toHashes(h ...byte) (out Hashes) { return out } +func testRlps(num int) [][]byte { + rlps := make([][]byte, num) + for i := 0; i < num; i++ { + rlps[i] = []byte{1} + } + return rlps +} + func toPeerIDs(h ...byte) (out []PeerID) { for i := range h { hash := [32]byte{h[i]} diff --git a/txpool/types.go b/txpool/types.go index ec20130c0..e81d2c714 100644 --- a/txpool/types.go +++ b/txpool/types.go @@ -17,12 +17,14 @@ package txpool import ( + "bytes" "encoding/binary" "errors" "fmt" "hash" "io" "math/bits" + "sort" "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/common/length" @@ -440,6 +442,44 @@ type Hashes []byte // flatten list of 32-byte hashes func (h Hashes) At(i int) []byte { return h[i*length.Hash : (i+1)*length.Hash] } func (h Hashes) Len() int { return len(h) / length.Hash } +func (h Hashes) Less(i, j int) bool { + return bytes.Compare(h[i*length.Hash:(i+1)*length.Hash], h[j*length.Hash:(j+1)*length.Hash]) < 0 +} +func (h Hashes) Swap(i, j int) { + ii := i * length.Hash + jj := j * length.Hash + for k := 0; k < length.Hash; k++ { + h[ii], h[jj] = h[jj], h[ii] + ii++ + jj++ + } +} + +// DedupCopy sorts hashes, and creates deduplicated copy +func (h Hashes) DedupCopy() Hashes { + if len(h) == 0 { + return h + } + sort.Sort(h) + unique := 1 + for i := length.Hash; i < len(h); i += length.Hash { + if !bytes.Equal(h[i:i+length.Hash], h[i-length.Hash:i]) { + unique++ + } + } + c := make(Hashes, unique*length.Hash) + copy(c[:], h[0:length.Hash]) + dest := length.Hash + for i := dest; i < len(h); i += length.Hash { + if !bytes.Equal(h[i:i+length.Hash], h[i-length.Hash:i]) { + if dest != i { + copy(c[dest:dest+length.Hash], h[i:i+length.Hash]) + } + dest += length.Hash + } + } + return c +} type Addresses []byte // flatten list of 20-byte addresses diff --git a/txpool/types_test.go b/txpool/types_test.go index 9365a1d6e..1e2537083 100644 --- a/txpool/types_test.go +++ b/txpool/types_test.go @@ -152,3 +152,11 @@ func TestTxSlotsGrowth(t *testing.T) { assert.Equal(2, len(s.txs)) assert.Equal(2, s.senders.Len()) } + +func TestDedupHashes(t *testing.T) { + assert := assert.New(t) + h := toHashes(2, 6, 2, 5, 2, 4) + c := h.DedupCopy() + assert.Equal(4, c.Len()) + assert.Equal(toHashes(2, 4, 5, 6), c) +}