txpool broadcasting (#208)

* txpool broadcasting

* Fix lint

* Broadcast transaction to random peers

* Fix broadcast

* Fix panic

* Change terminology

* fix for broadcasting

* Rebroadcast transactions promoted to pending subpool

* Trace moving between subpools

* Deduplicate promoted hashes, fix basefee promotion

* Tx propagation to be more resilient

* Fix dedup

* Change collection of promoted hashes

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
This commit is contained in:
ledgerwatch 2021-12-16 20:58:40 +00:00 committed by GitHub
parent bb6dfef7c8
commit 7f82ddaa75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 251 additions and 202 deletions

View File

@ -37,7 +37,7 @@ import (
"google.golang.org/protobuf/types/known/emptypb"
)
// Fetch connects to sentry and implements eth/65 or eth/66 protocol regarding the transaction
// Fetch connects to sentry and implements eth/66 protocol regarding the transaction
// messages. It tries to "prime" the sentry with StatusData message containing given
// genesis hash and list of forks, but with zero max block and total difficulty
// Sentry should have a logic not to overwrite statusData with messages from tx pool
@ -166,10 +166,6 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := sentryClient.Messages(streamCtx, &sentry.MessagesRequest{Ids: []sentry.MessageId{
sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65,
sentry.MessageId_GET_POOLED_TRANSACTIONS_65,
sentry.MessageId_TRANSACTIONS_65,
sentry.MessageId_POOLED_TRANSACTIONS_65,
sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66,
sentry.MessageId_GET_POOLED_TRANSACTIONS_66,
sentry.MessageId_TRANSACTIONS_66,
@ -230,7 +226,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
defer tx.Rollback()
switch req.Id {
case sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65:
case sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66:
hashCount, pos, err := ParseHashesCount(req.Data, 0)
if err != nil {
return fmt.Errorf("parsing NewPooledTransactionHashes: %w", err)
@ -259,9 +255,6 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
return err
}
messageId = sentry.MessageId_GET_POOLED_TRANSACTIONS_66
case sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65:
encodedRequest = EncodeHashes(unknownHashes, nil)
messageId = sentry.MessageId_GET_POOLED_TRANSACTIONS_65
default:
return fmt.Errorf("unexpected message: %s", req.Id.String())
}
@ -272,7 +265,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
return err
}
}
case sentry.MessageId_GET_POOLED_TRANSACTIONS_66, sentry.MessageId_GET_POOLED_TRANSACTIONS_65:
case sentry.MessageId_GET_POOLED_TRANSACTIONS_66:
//TODO: handleInboundMessage is single-threaded - means it can accept as argument couple buffers (or analog of txParseContext). Protobuf encoding will copy data anyway, but DirectClient doesn't
var encodedRequest []byte
var messageId sentry.MessageId
@ -297,24 +290,6 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
}
encodedRequest = EncodePooledTransactions66(txs, requestID, nil)
case sentry.MessageId_GET_POOLED_TRANSACTIONS_65:
messageId = sentry.MessageId_POOLED_TRANSACTIONS_65
hashes, _, err := ParseGetPooledTransactions65(req.Data, 0, nil)
if err != nil {
return err
}
var txs [][]byte
for i := 0; i < len(hashes); i += 32 {
txn, err := f.pool.GetRlp(tx, hashes[i:i+32])
if err != nil {
return err
}
if txn == nil {
continue
}
txs = append(txs, txn)
}
encodedRequest = EncodePooledTransactions65(txs, nil)
default:
return fmt.Errorf("unexpected message: %s", req.Id.String())
}
@ -325,7 +300,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
}, &grpc.EmptyCallOption{}); err != nil {
return err
}
case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_POOLED_TRANSACTIONS_66, sentry.MessageId_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_66:
case sentry.MessageId_POOLED_TRANSACTIONS_66, sentry.MessageId_TRANSACTIONS_66:
txs := TxSlots{}
if err := f.threadSafeParsePooledTxn(func(parseContext *TxParseContext) error {
parseContext.ValidateHash(func(hash []byte) error {
@ -344,9 +319,9 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
}
switch req.Id {
case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_66:
case sentry.MessageId_TRANSACTIONS_66:
if err := f.threadSafeParsePooledTxn(func(parseContext *TxParseContext) error {
if _, err := ParsePooledTransactions65(req.Data, 0, parseContext, &txs); err != nil {
if _, err := ParseTransactions(req.Data, 0, parseContext, &txs); err != nil {
return err
}
return nil

View File

@ -71,13 +71,18 @@ func TestSendTxPropagate(t *testing.T) {
t.Run("few remote byHash", func(t *testing.T) {
m := NewMockSentry(ctx)
send := NewSend(ctx, []direct.SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil)
send.BroadcastRemotePooledTxs(toHashes(1, 42))
send.BroadcastPooledTxs(testRlps(2), toHashes(1, 42))
calls := m.SendMessageToRandomPeersCalls()
require.Equal(t, 1, len(calls))
first := calls[0].SendMessageToRandomPeersRequest.Data
assert.Equal(t, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, first.Id)
assert.Equal(t, 68, len(first.Data))
calls1 := m.SendMessageToRandomPeersCalls()
require.Equal(t, 1, len(calls1))
calls2 := m.SendMessageToAllCalls()
require.Equal(t, 1, len(calls2))
first := calls1[0].SendMessageToRandomPeersRequest.Data
assert.Equal(t, sentry.MessageId_TRANSACTIONS_66, first.Id)
assert.Equal(t, 5, len(first.Data))
second := calls2[0].OutboundMessageData
assert.Equal(t, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, second.Id)
assert.Equal(t, 68, len(second.Data))
})
t.Run("much remote byHash", func(t *testing.T) {
m := NewMockSentry(ctx)
@ -87,13 +92,18 @@ func TestSendTxPropagate(t *testing.T) {
b := []byte(fmt.Sprintf("%x", i))
copy(list[i:i+32], b)
}
send.BroadcastRemotePooledTxs(list)
calls := m.SendMessageToRandomPeersCalls()
require.Equal(t, 3, len(calls))
send.BroadcastPooledTxs(testRlps(len(list)/32), list)
calls1 := m.SendMessageToRandomPeersCalls()
require.Equal(t, 1, len(calls1))
calls2 := m.SendMessageToAllCalls()
require.Equal(t, 3, len(calls2))
call1 := calls1[0].SendMessageToRandomPeersRequest.Data
require.Equal(t, sentry.MessageId_TRANSACTIONS_66, call1.Id)
require.True(t, len(call1.Data) > 0)
for i := 0; i < 3; i++ {
call := calls[i].SendMessageToRandomPeersRequest.Data
require.Equal(t, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, call.Id)
require.True(t, len(call.Data) > 0)
call2 := calls2[i].OutboundMessageData
require.Equal(t, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, call2.Id)
require.True(t, len(call2.Data) > 0)
}
})
t.Run("few local byHash", func(t *testing.T) {
@ -102,7 +112,7 @@ func TestSendTxPropagate(t *testing.T) {
return &sentry.SentPeers{Peers: make([]*types.H256, 5)}, nil
}
send := NewSend(ctx, []direct.SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil)
send.BroadcastLocalPooledTxs(toHashes(1, 42))
send.BroadcastPooledTxs(testRlps(2), toHashes(1, 42))
calls := m.SendMessageToAllCalls()
require.Equal(t, 1, len(calls))

View File

@ -156,7 +156,7 @@ func EncodePooledTransactions66(txsRlp [][]byte, requestId uint64, encodeBuf []b
_ = pos
return encodeBuf
}
func EncodePooledTransactions65(txsRlp [][]byte, encodeBuf []byte) []byte {
func EncodeTransactions(txsRlp [][]byte, encodeBuf []byte) []byte {
pos := 0
dataLen := 0
for i := range txsRlp {
@ -184,7 +184,7 @@ func EncodePooledTransactions65(txsRlp [][]byte, encodeBuf []byte) []byte {
return encodeBuf
}
func ParsePooledTransactions65(payload []byte, pos int, ctx *TxParseContext, txSlots *TxSlots) (newPos int, err error) {
func ParseTransactions(payload []byte, pos int, ctx *TxParseContext, txSlots *TxSlots) (newPos int, err error) {
pos, _, err = rlp.List(payload, pos)
if err != nil {
return 0, err

View File

@ -235,6 +235,18 @@ const PendingSubPool SubPoolType = 1
const BaseFeeSubPool SubPoolType = 2
const QueuedSubPool SubPoolType = 3
func (sp SubPoolType) String() string {
switch sp {
case PendingSubPool:
return "Pending"
case BaseFeeSubPool:
return "BaseFee"
case QueuedSubPool:
return "Queued"
}
return fmt.Sprintf("Unknown:%d", sp)
}
// sender - immutable structure which stores only nonce and balance of account
type sender struct {
balance uint256.Int
@ -408,11 +420,13 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
//log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "baseFee", baseFee, "blockHeight", blockHeight)
p.pending.captureAddedHashes(&p.promoted)
p.pending.resetAddedHashes()
p.baseFee.resetAddedHashes()
if err := addTxsOnNewBlock(p.lastSeenBlock.Load(), cacheView, stateChanges, p.senders, unwindTxs, pendingBaseFee, baseFeeChanged, p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked); err != nil {
return err
}
p.pending.added = nil
p.promoted = p.pending.appendAddedHashes(p.promoted[:0])
p.promoted = p.baseFee.appendAddedHashes(p.promoted)
if p.started.CAS(false, true) {
log.Info("[txpool] Started")
@ -420,7 +434,7 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang
if p.promoted.Len() > 0 {
select {
case p.newPendingTxs <- common.Copy(p.promoted):
case p.newPendingTxs <- p.promoted.DedupCopy():
default:
}
}
@ -465,17 +479,19 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error {
return err
}
p.pending.captureAddedHashes(&p.promoted)
p.pending.resetAddedHashes()
p.baseFee.resetAddedHashes()
if _, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, p.pendingBaseFee.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked); err != nil {
return err
}
p.pending.added = nil
p.promoted = p.pending.appendAddedHashes(p.promoted[:0])
p.promoted = p.baseFee.appendAddedHashes(p.promoted)
if p.promoted.Len() > 0 {
select {
case <-ctx.Done():
return nil
case p.newPendingTxs <- common.Copy(p.promoted):
case p.newPendingTxs <- p.promoted.DedupCopy():
default:
}
}
@ -768,7 +784,8 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions TxSlots) ([]Di
return nil, err
}
p.pending.captureAddedHashes(&p.promoted)
p.pending.resetAddedHashes()
p.baseFee.resetAddedHashes()
if addReasons, err := addTxs(p.lastSeenBlock.Load(), cacheView, p.senders, newTxs, p.pendingBaseFee.Load(), p.pending, p.baseFee, p.queued, p.all, p.byHash, p.addLocked, p.discardLocked); err == nil {
for i, reason := range addReasons {
if reason != NotSet {
@ -778,7 +795,8 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions TxSlots) ([]Di
} else {
return nil, err
}
p.pending.added = nil
p.promoted = p.pending.appendAddedHashes(p.promoted[:0])
p.promoted = p.baseFee.appendAddedHashes(p.promoted)
reasons = fillDiscardReasons(reasons, newTxs, p.discardReasonsLRU)
for i, reason := range reasons {
@ -792,7 +810,7 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions TxSlots) ([]Di
}
if p.promoted.Len() > 0 {
select {
case p.newPendingTxs <- common.Copy(p.promoted):
case p.newPendingTxs <- p.promoted.DedupCopy():
default:
}
}
@ -858,11 +876,7 @@ func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
onSenderStateChange(senderID, nonce, balance, byNonce, protocolBaseFee, pendingBaseFee, pending, baseFee, queued, false)
}
//pending.EnforceWorstInvariants()
//baseFee.EnforceInvariants()
//queued.EnforceInvariants()
promote(pending, baseFee, queued, discard)
//pending.EnforceWorstInvariants()
pending.EnforceBestInvariants()
return discardReasons, nil
@ -1077,7 +1091,7 @@ func onBaseFeeChange(byNonce *BySenderAndNonce, pendingBaseFee uint64) {
// 4. Dynamic fee requirement. Set to 1 if feeCap of the transaction is no less than
// baseFee of the currently pending block. Set to 0 otherwise.
mt.subPool &^= EnoughFeeCapBlock
if mt.Tx.feeCap >= pendingBaseFee {
if mt.minFeeCap >= pendingBaseFee {
mt.subPool |= EnoughFeeCapBlock
}
return true
@ -1115,7 +1129,7 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint
// parameter of minimal base fee. Set to 0 if feeCap is less than minimum base fee, which means
// this transaction will never be included into this particular chain.
mt.subPool &^= EnoughFeeCapProtocol
if mt.Tx.feeCap >= protocolBaseFee {
if mt.minFeeCap >= protocolBaseFee {
mt.subPool |= EnoughFeeCapProtocol
} else {
mt.subPool = 0 // TODO: we immediately drop all transactions if they have no first bit - then maybe we don't need this bit at all? And don't add such transactions to queue?
@ -1178,6 +1192,8 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint
})
}
// promote reasserts invariants of the subpool and returns the list of transactions that ended up
// being promoted to the pending or basefee pool, for re-broadcasting
func promote(pending *PendingPool, baseFee, queued *SubPool, discard func(*metaTx, DiscardReason)) {
//1. If top element in the worst green queue has subPool != 0b1111 (binary), it needs to be removed from the green pool.
// If subPool < 0b1000 (not satisfying minimum fee), discard.
@ -1245,6 +1261,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, discard func(*metaT
}
pending.Add(queued.PopBest())
}
//7. If the top element in the worst red queue has subPool < 0b1000 (not satisfying minimum fee), discard.
@ -1278,8 +1295,10 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
logEvery := time.NewTicker(p.cfg.LogEvery)
defer logEvery.Stop()
localTxHashes := make([]byte, 0, 128)
remoteTxHashes := make([]byte, 0, 128)
localTxHashes := make(Hashes, 0, 128)
localTxRlps := make([][]byte, 0, 4)
remoteTxHashes := make(Hashes, 0, 128)
remoteTxRlps := make([][]byte, 0, 4)
for {
select {
@ -1320,15 +1339,27 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
case h := <-newTxs:
t := time.Now()
notifyMiningAboutNewSlots()
localTxHashes = localTxHashes[:0]
localTxRlps = localTxRlps[:0]
remoteTxHashes = remoteTxHashes[:0]
remoteTxRlps = remoteTxRlps[:0]
if h.Len() > 0 {
if err := db.View(ctx, func(tx kv.Tx) error {
slotsRlp := make([][]byte, 0, h.Len())
for i := 0; i < h.Len(); i++ {
slotRlp, err := p.GetRlp(tx, h.At(i))
hash := h.At(i)
slotRlp, err := p.GetRlp(tx, hash)
if err != nil {
return err
}
slotsRlp = append(slotsRlp, slotRlp)
if p.IsLocal(h.At(i)) {
localTxHashes = append(localTxHashes, hash...)
localTxRlps = append(localTxRlps, slotRlp)
} else {
remoteTxHashes = append(localTxHashes, hash...)
remoteTxRlps = append(remoteTxRlps, slotRlp)
}
}
newSlotsStreams.Broadcast(&proto_txpool.OnAddReply{RplTxs: slotsRlp})
return nil
@ -1338,26 +1369,12 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
}
// first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers
localTxHashes = localTxHashes[:0]
remoteTxHashes = remoteTxHashes[:0]
for i := 0; i < h.Len(); i++ {
if p.IsLocal(h.At(i)) {
localTxHashes = append(localTxHashes, h.At(i)...)
} else {
remoteTxHashes = append(localTxHashes, h.At(i)...)
}
hashSentTo, txSentTo := send.BroadcastPooledTxs(localTxRlps, localTxHashes)
for i := 0; i < localTxHashes.Len(); i++ {
hash := localTxHashes.At(i)
log.Info("local tx propagated", "tx_hash", fmt.Sprintf("%x", hash), "announced to peers", hashSentTo[i], "broadcast to peers", txSentTo[i], "baseFee", p.pendingBaseFee.Load())
}
sentTo := send.BroadcastLocalPooledTxs(localTxHashes)
if len(localTxHashes)/32 > 0 {
if len(localTxHashes)/32 == 1 {
log.Info("local tx propagated", "to_peers_amount", sentTo, "tx_hash", fmt.Sprintf("%x", localTxHashes), "baseFee", p.pendingBaseFee.Load())
} else {
log.Info("local txs propagated", "to_peers_amount", sentTo, "txs_amount", len(localTxHashes)/32, "baseFee", p.pendingBaseFee.Load())
}
}
send.BroadcastRemotePooledTxs(remoteTxHashes)
send.BroadcastPooledTxs(remoteTxRlps, remoteTxHashes)
propagateNewTxsTimer.UpdateDuration(t)
case <-syncToNewPeersEvery.C: // new peer
newPeers := p.recentlyConnectedPeers.GetAndClean()
@ -1943,20 +1960,26 @@ func (b *BySenderAndNonce) replaceOrInsert(mt *metaTx) *metaTx {
// It's more expensive to maintain "slice sort" invariant, but it allow do cheap copy of
// pending.best slice for mining (because we consider txs and metaTx are immutable)
type PendingPool struct {
limit int
t SubPoolType
best bestSlice
worst *WorstQueue
added *Hashes
limit int
t SubPoolType
best bestSlice
worst *WorstQueue
adding bool
added Hashes
}
func NewPendingSubPool(t SubPoolType, limit int) *PendingPool {
return &PendingPool{limit: limit, t: t, best: []*metaTx{}, worst: &WorstQueue{}}
}
func (p *PendingPool) captureAddedHashes(to *Hashes) {
p.added = to
*p.added = (*p.added)[:0]
func (p *PendingPool) resetAddedHashes() {
p.added = p.added[:0]
p.adding = true
}
func (p *PendingPool) appendAddedHashes(h Hashes) Hashes {
h = append(h, p.added...)
p.adding = false
return h
}
// bestSlice - is similar to best queue, but with O(n log n) complexity and
@ -2029,16 +2052,22 @@ func (p *PendingPool) UnsafeRemove(i *metaTx) {
p.best = p.best.UnsafeRemove(i)
}
func (p *PendingPool) UnsafeAdd(i *metaTx) {
if p.added != nil {
*p.added = append(*p.added, i.Tx.IdHash[:]...)
if p.adding {
p.added = append(p.added, i.Tx.IdHash[:]...)
}
if i.Tx.traced {
log.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IdHash, i.Tx.senderID))
}
i.currentSubPool = p.t
p.worst.Push(i)
p.best = p.best.UnsafeAdd(i)
}
func (p *PendingPool) Add(i *metaTx) {
if p.added != nil {
*p.added = append(*p.added, i.Tx.IdHash[:]...)
if p.adding {
p.added = append(p.added, i.Tx.IdHash[:]...)
}
if i.Tx.traced {
log.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IdHash, i.Tx.senderID))
}
i.currentSubPool = p.t
heap.Push(p.worst, i)
@ -2054,16 +2083,28 @@ func (p *PendingPool) DebugPrint(prefix string) {
}
type SubPool struct {
limit int
t SubPoolType
best *BestQueue
worst *WorstQueue
limit int
t SubPoolType
best *BestQueue
worst *WorstQueue
adding bool
added Hashes
}
func NewSubPool(t SubPoolType, limit int) *SubPool {
return &SubPool{limit: limit, t: t, best: &BestQueue{}, worst: &WorstQueue{}}
}
func (p *SubPool) resetAddedHashes() {
p.added = p.added[:0]
p.adding = true
}
func (p *SubPool) appendAddedHashes(h Hashes) Hashes {
h = append(h, p.added...)
p.adding = false
return h
}
func (p *SubPool) EnforceInvariants() {
heap.Init(p.worst)
heap.Init(p.best)
@ -2092,6 +2133,12 @@ func (p *SubPool) PopWorst() *metaTx {
}
func (p *SubPool) Len() int { return p.best.Len() }
func (p *SubPool) Add(i *metaTx) {
if p.adding {
p.added = append(p.added, i.Tx.IdHash[:]...)
}
if i.Tx.traced {
log.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IdHash, i.Tx.senderID))
}
i.currentSubPool = p.t
heap.Push(p.best, i)
heap.Push(p.worst, i)
@ -2126,6 +2173,12 @@ func (p *SubPool) UnsafeRemove(i *metaTx) {
p.best.Pop()
}
func (p *SubPool) UnsafeAdd(i *metaTx) {
if p.adding {
p.added = append(p.added, i.Tx.IdHash[:]...)
}
if i.Tx.traced {
log.Info(fmt.Sprintf("TX TRACING: moved to subpool %s, IdHash=%x, sender=%d", p.t, i.Tx.IdHash, i.Tx.senderID))
}
i.currentSubPool = p.t
p.worst.Push(i)
p.best.Push(i)

View File

@ -65,120 +65,88 @@ func (f *Send) notifyTests() {
}
}
func (f *Send) BroadcastLocalPooledTxs(txs Hashes) (sentToPeers int) {
func (f *Send) BroadcastPooledTxs(rlps [][]byte, hashes Hashes) (hashSentTo, txSentTo []int) {
defer f.notifyTests()
if len(txs) == 0 {
if len(hashes) == 0 {
return
}
avgPeersPerSent65 := 0
avgPeersPerSent66 := 0
for len(txs) > 0 {
txSentTo = make([]int, len(rlps))
var prev, size int
for i, l := 0, len(rlps); i < len(rlps); i++ {
size += len(rlps[i])
if i == l-1 || size >= p2pTxPacketLimit {
txsData := EncodeTransactions(rlps[prev:i+1], nil)
var txs66 *sentry.SendMessageToRandomPeersRequest
for _, sentryClient := range f.sentryClients {
if !sentryClient.Ready() {
continue
}
switch sentryClient.Protocol() {
case direct.ETH66:
if txs66 == nil {
txs66 = &sentry.SendMessageToRandomPeersRequest{
Data: &sentry.OutboundMessageData{
Id: sentry.MessageId_TRANSACTIONS_66,
Data: txsData,
},
MaxPeers: 100,
}
}
peers, err := sentryClient.SendMessageToRandomPeers(f.ctx, txs66)
if err != nil {
log.Warn("[txpool.send] BroadcastLocalTxs", "err", err)
}
if peers != nil {
for j := prev; j <= i; j++ {
txSentTo[j] = len(peers.Peers)
}
}
}
}
prev = i + 1
size = 0
}
}
hashSentTo = make([]int, len(hashes)/32)
prev = 0
for len(hashes) > 0 {
var pending Hashes
if len(txs) > p2pTxPacketLimit {
pending = txs[:p2pTxPacketLimit]
txs = txs[p2pTxPacketLimit:]
if len(hashes) > p2pTxPacketLimit {
pending = hashes[:p2pTxPacketLimit]
hashes = hashes[p2pTxPacketLimit:]
} else {
pending = txs[:]
txs = txs[:0]
pending = hashes[:]
hashes = hashes[:0]
}
data := EncodeHashes(pending, nil)
var req66, req65 *sentry.OutboundMessageData
hashesData := EncodeHashes(pending, nil)
var hashes66 *sentry.OutboundMessageData
for _, sentryClient := range f.sentryClients {
if !sentryClient.Ready() {
continue
}
switch sentryClient.Protocol() {
case direct.ETH65:
if req65 == nil {
req65 = &sentry.OutboundMessageData{
Id: sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65,
Data: data,
}
}
peers, err := sentryClient.SendMessageToAll(f.ctx, req65, &grpc.EmptyCallOption{})
if err != nil {
log.Warn("[txpool.send] BroadcastLocalPooledTxs", "err", err)
} else if peers != nil {
avgPeersPerSent65 += len(peers.Peers)
}
case direct.ETH66:
if req66 == nil {
req66 = &sentry.OutboundMessageData{
if hashes66 == nil {
hashes66 = &sentry.OutboundMessageData{
Id: sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66,
Data: data,
Data: hashesData,
}
}
peers, err := sentryClient.SendMessageToAll(f.ctx, req66, &grpc.EmptyCallOption{})
peers, err := sentryClient.SendMessageToAll(f.ctx, hashes66, &grpc.EmptyCallOption{})
if err != nil {
log.Warn("[txpool.send] BroadcastLocalPooledTxs", "err", err)
} else if peers != nil {
avgPeersPerSent66 += len(peers.Peers)
}
}
}
}
return avgPeersPerSent65 + avgPeersPerSent66
}
func (f *Send) BroadcastRemotePooledTxs(txs Hashes) {
defer f.notifyTests()
if len(txs) == 0 {
return
}
for len(txs) > 0 {
var pending Hashes
if len(txs) > p2pTxPacketLimit {
pending = txs[:p2pTxPacketLimit]
txs = txs[p2pTxPacketLimit:]
} else {
pending = txs[:]
txs = txs[:0]
}
data := EncodeHashes(pending, nil)
var req66, req65 *sentry.SendMessageToRandomPeersRequest
for _, sentryClient := range f.sentryClients {
if !sentryClient.Ready() {
continue
}
switch sentryClient.Protocol() {
case direct.ETH65:
if req65 == nil {
req65 = &sentry.SendMessageToRandomPeersRequest{
MaxPeers: 1024,
Data: &sentry.OutboundMessageData{
Id: sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65,
Data: data,
},
if peers != nil {
for j, l := prev, pending.Len(); j < prev+l; j++ {
hashSentTo[j] = len(peers.Peers)
}
}
if _, err := sentryClient.SendMessageToRandomPeers(f.ctx, req65, &grpc.EmptyCallOption{}); err != nil {
log.Warn("[txpool.send] BroadcastRemotePooledTxs", "err", err)
}
case direct.ETH66:
if req66 == nil {
req66 = &sentry.SendMessageToRandomPeersRequest{
MaxPeers: 1024,
Data: &sentry.OutboundMessageData{
Id: sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66,
Data: data,
},
}
}
if _, err := sentryClient.SendMessageToRandomPeers(f.ctx, req66, &grpc.EmptyCallOption{}); err != nil {
log.Warn("[txpool.send] BroadcastRemotePooledTxs", "err", err)
}
}
}
prev += pending.Len()
}
return
}
func (f *Send) PropagatePooledTxsToPeersList(peers []PeerID, txs []byte) {
@ -206,19 +174,6 @@ func (f *Send) PropagatePooledTxsToPeersList(peers []PeerID, txs []byte) {
for _, peer := range peers {
switch sentryClient.Protocol() {
case direct.ETH65:
req65 := &sentry.SendMessageByIdRequest{
PeerId: peer,
Data: &sentry.OutboundMessageData{
Id: sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65,
Data: data,
},
}
if _, err := sentryClient.SendMessageById(f.ctx, req65, &grpc.EmptyCallOption{}); err != nil {
log.Warn("[txpool.send] PropagatePooledTxsToPeersList", "err", err)
}
case direct.ETH66:
req66 := &sentry.SendMessageByIdRequest{
PeerId: peer,

View File

@ -98,6 +98,14 @@ func toHashes(h ...byte) (out Hashes) {
return out
}
func testRlps(num int) [][]byte {
rlps := make([][]byte, num)
for i := 0; i < num; i++ {
rlps[i] = []byte{1}
}
return rlps
}
func toPeerIDs(h ...byte) (out []PeerID) {
for i := range h {
hash := [32]byte{h[i]}

View File

@ -17,12 +17,14 @@
package txpool
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"hash"
"io"
"math/bits"
"sort"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/common/length"
@ -440,6 +442,44 @@ type Hashes []byte // flatten list of 32-byte hashes
func (h Hashes) At(i int) []byte { return h[i*length.Hash : (i+1)*length.Hash] }
func (h Hashes) Len() int { return len(h) / length.Hash }
func (h Hashes) Less(i, j int) bool {
return bytes.Compare(h[i*length.Hash:(i+1)*length.Hash], h[j*length.Hash:(j+1)*length.Hash]) < 0
}
func (h Hashes) Swap(i, j int) {
ii := i * length.Hash
jj := j * length.Hash
for k := 0; k < length.Hash; k++ {
h[ii], h[jj] = h[jj], h[ii]
ii++
jj++
}
}
// DedupCopy sorts hashes, and creates deduplicated copy
func (h Hashes) DedupCopy() Hashes {
if len(h) == 0 {
return h
}
sort.Sort(h)
unique := 1
for i := length.Hash; i < len(h); i += length.Hash {
if !bytes.Equal(h[i:i+length.Hash], h[i-length.Hash:i]) {
unique++
}
}
c := make(Hashes, unique*length.Hash)
copy(c[:], h[0:length.Hash])
dest := length.Hash
for i := dest; i < len(h); i += length.Hash {
if !bytes.Equal(h[i:i+length.Hash], h[i-length.Hash:i]) {
if dest != i {
copy(c[dest:dest+length.Hash], h[i:i+length.Hash])
}
dest += length.Hash
}
}
return c
}
type Addresses []byte // flatten list of 20-byte addresses

View File

@ -152,3 +152,11 @@ func TestTxSlotsGrowth(t *testing.T) {
assert.Equal(2, len(s.txs))
assert.Equal(2, s.senders.Len())
}
func TestDedupHashes(t *testing.T) {
assert := assert.New(t)
h := toHashes(2, 6, 2, 5, 2, 4)
c := h.DedupCopy()
assert.Equal(4, c.Len())
assert.Equal(toHashes(2, 4, 5, 6), c)
}