From d7a911dd0f9a11d6d2f71a41071d7f76b3158322 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 5 Aug 2021 09:00:00 +0700 Subject: [PATCH] broadcast loop --- txpool/pool.go | 107 ++++++++++++++++++++++++++------------- txpool/pool_fuzz_test.go | 5 +- txpool/types.go | 3 ++ 3 files changed, 78 insertions(+), 37 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 3a678759a..61c0c69b6 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -26,7 +26,6 @@ import ( "github.com/google/btree" lru "github.com/hashicorp/golang-lru" "github.com/holiman/uint256" - "github.com/ledgerwatch/log/v3" "go.uber.org/atomic" ) @@ -41,6 +40,10 @@ type Pool interface { NotifyNewPeer(peerID PeerID) } +type Broadcaster interface { + OnNew(hashes Hashes) +} + // SubPoolMarker ordered bitset responsible to sort transactions by sub-pools. Bits meaning: // 1. Minimum fee requirement. Set to 1 if feeCap of the transaction is no less than in-protocol parameter of minimal base fee. Set to 0 if feeCap is less than minimum base fee, which means this transaction will never be included into this particular chain. // 2. Absence of nonce gaps. Set to 1 for transactions whose nonce is N, state nonce for the sender is M, and there are transactions for all nonces between M and N from the same sender. Set to 0 is the transaction's nonce is divided from the state nonce by one or more nonce gaps. @@ -124,10 +127,12 @@ type TxPool struct { // fields for transaction propagation recentlyConnectedPeers *recentlyConnectedPeers + newTxs chan Hashes + broadcaster Broadcaster //lastTxPropagationTimestamp time.Time } -func New() *TxPool { +func New(newTxs chan Hashes) *TxPool { localsHistory, _ := lru.New(1024) return &TxPool{ lock: &sync.RWMutex{}, @@ -138,6 +143,7 @@ func New() *TxPool { pending: NewSubPool(), baseFee: NewSubPool(), queued: NewSubPool(), + newTxs: newTxs, } } @@ -176,6 +182,10 @@ func (p *TxPool) AppendRemoteHashes(buf []byte) { i++ } } +func (p *TxPool) AppendAllHashes(buf []byte) { + p.AppendLocalHashes(buf) + p.AppendRemoteHashes(buf[len(buf):]) +} func (p *TxPool) IdHashKnown(hash []byte) bool { p.lock.RLock() defer p.lock.RUnlock() @@ -183,6 +193,16 @@ func (p *TxPool) IdHashKnown(hash []byte) bool { _, ok := p.byHash[string(hash)] return ok } +func (p *TxPool) IdHashIsLocal(hash []byte) bool { + p.lock.RLock() + defer p.lock.RUnlock() + + txn, ok := p.byHash[string(hash)] + if !ok { + return false + } + return txn.SubPool&IsLocal != 0 +} func (p *TxPool) OnNewPeer(peerID PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) } func (p *TxPool) OnNewBlock(unwindTxs, minedTxs TxSlots, protocolBaseFee, blockBaseFee uint64) error { p.lock.Lock() @@ -200,8 +220,10 @@ func (p *TxPool) OnNewBlock(unwindTxs, minedTxs TxSlots, protocolBaseFee, blockB } id++ p.senderIDs[string(unwindTxs.senders[i*20:(i+1)*20])] = id + unwindTxs.txs[i].senderID = id } } + /* for i := range unwindTxs { unwindTxs[i].senderID == 0 @@ -213,7 +235,39 @@ func (p *TxPool) OnNewBlock(unwindTxs, minedTxs TxSlots, protocolBaseFee, blockB } */ // TODO: assign senderID to all transactions - return onNewBlock(p.senderInfo, unwindTxs.txs, minedTxs.txs, protocolBaseFee, blockBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory) + if err := onNewBlock(p.senderInfo, unwindTxs.txs, minedTxs.txs, protocolBaseFee, blockBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil { + return err + } + + newTxs := make(Hashes, 0, 32*len(unwindTxs.txs)) + for i := range unwindTxs.txs { + _, ok := p.byHash[string(unwindTxs.txs[i].idHash[:])] + if !ok { + continue + } + newTxs = append(newTxs, unwindTxs.txs[i].idHash[:]...) + } + select { + case p.newTxs <- newTxs: + default: + } + + return nil + /* + localTxHashes = localTxHashes[:0] + p.AppendLocalHashes(last, localTxHashes) + initialAmount := len(localTxHashes) + sentToPeers := send.BroadcastLocalPooledTxs(localTxHashes) + if initialAmount == 1 { + p.logger.Info("local tx propagated", "to_peers_amount", sentToPeers, "tx_hash", localTxHashes) + } else { + p.logger.Info("local byHash propagated", "to_peers_amount", sentToPeers, "txs_amount", initialAmount) + } + + remoteTxHashes = remoteTxHashes[:0] + p.FillRemoteHashesSince(last, remoteTxHashes) + send.BroadcastRemotePooledTxs(remoteTxHashes) + */ } func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*MetaTx, localsHistory *lru.Cache) error { @@ -609,22 +663,14 @@ func (p *WorstQueue) Pop() interface{} { return item } -// Below is a draft code, will convert it to Loop and LoopStep funcs later - -type PoolImpl struct { - recentlyConnectedPeers *recentlyConnectedPeers - lastTxPropagationTimestamp time.Time - logger log.Logger -} - -// Loop - does: +// BroadcastLoop - does: // send pending byHash to p2p: // - new byHash // - all pooled byHash to recently connected peers // - all local pooled byHash to random peers periodically // promote/demote transactions // reorgs -func (p *PoolImpl) Loop(ctx context.Context, send *Send, timings Timings) { +func BroadcastLoop(ctx context.Context, p *TxPool, newTxs chan Hashes, send *Send, timings Timings) { propagateAllNewTxsEvery := time.NewTicker(timings.propagateAllNewTxsEvery) defer propagateAllNewTxsEvery.Stop() @@ -641,43 +687,32 @@ func (p *PoolImpl) Loop(ctx context.Context, send *Send, timings Timings) { select { case <-ctx.Done(): return - case <-propagateAllNewTxsEvery.C: // new byHash - last := p.lastTxPropagationTimestamp - p.lastTxPropagationTimestamp = time.Now() - - // first broadcast all local byHash to all peers, then non-local to random sqrt(peersAmount) peers + case h := <-newTxs: + // first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers localTxHashes = localTxHashes[:0] - p.FillLocalHashesSince(last, localTxHashes) - initialAmount := len(localTxHashes) - sentToPeers := send.BroadcastLocalPooledTxs(localTxHashes) - if initialAmount == 1 { - p.logger.Info("local tx propagated", "to_peers_amount", sentToPeers, "tx_hash", localTxHashes) - } else { - p.logger.Info("local byHash propagated", "to_peers_amount", sentToPeers, "txs_amount", initialAmount) + remoteTxHashes = remoteTxHashes[:0] + + for i := 0; i < h.Len(); i++ { + if p.IdHashIsLocal(h.At(i)) { + localTxHashes = append(localTxHashes, h.At(i)...) + } else { + remoteTxHashes = append(localTxHashes, h.At(i)...) + } } - remoteTxHashes = remoteTxHashes[:0] - p.FillRemoteHashesSince(last, remoteTxHashes) + send.BroadcastLocalPooledTxs(localTxHashes) send.BroadcastRemotePooledTxs(remoteTxHashes) case <-syncToNewPeersEvery.C: // new peer newPeers := p.recentlyConnectedPeers.GetAndClean() if len(newPeers) == 0 { continue } - p.FillRemoteHashes(remoteTxHashes[:0]) + p.AppendAllHashes(remoteTxHashes[:0]) send.PropagatePooledTxsToPeersList(newPeers, remoteTxHashes) - case <-broadcastLocalTransactionsEvery.C: // periodically broadcast local byHash to random peers - p.FillLocalHashes(localTxHashes[:0]) - send.BroadcastLocalPooledTxs(localTxHashes) } } } -func (p *PoolImpl) FillLocalHashesSince(since time.Time, to []byte) {} -func (p *PoolImpl) FillRemoteHashesSince(since time.Time, to []byte) {} -func (p *PoolImpl) FillLocalHashes(to []byte) {} -func (p *PoolImpl) FillRemoteHashes(to []byte) {} - // recentlyConnectedPeers does buffer IDs of recently connected good peers // then sync of pooled Transaction can happen to all of then at once // DoS protection and performance saving diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index f042737fe..8e17892bc 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -176,7 +176,8 @@ func FuzzOnNewBlocks3(f *testing.F) { minedTxs.senders = txs.senders[len(txs.txs)-3:] } - pool := New() + ch := make(chan Hashes, 100) + pool := New(ch) pool.senderInfo = senders pool.senderIDs = senderIDs err := pool.OnNewBlock(unwindTxs, minedTxs, protocolBaseFee, blockBaseFee) @@ -287,6 +288,8 @@ func FuzzOnNewBlocks3(f *testing.F) { _, ok = pool.byHash[string(minedTxs.txs[i].idHash[:])] assert.False(ok) } + newHashes := <-ch + assert.Equal(len(unwindTxs.txs), newHashes.Len()) }) } diff --git a/txpool/types.go b/txpool/types.go index 89e63f94e..22c0bee96 100644 --- a/txpool/types.go +++ b/txpool/types.go @@ -34,6 +34,9 @@ type PeerID *types.H512 type Hashes []byte // flatten list of 32-byte hashes +func (h Hashes) At(i int) []byte { return h[i*32 : (i+1)*32] } +func (h Hashes) Len() int { return len(h) / 32 } + // TxContext is object that is required to parse transactions and turn transaction payload into TxSlot objects // usage of TxContext helps avoid extra memory allocations type TxParseContext struct {