broadcast loop

This commit is contained in:
alex.sharov 2021-08-05 09:00:00 +07:00
parent 729fb20d98
commit d7a911dd0f
3 changed files with 78 additions and 37 deletions

View File

@ -26,7 +26,6 @@ import (
"github.com/google/btree"
lru "github.com/hashicorp/golang-lru"
"github.com/holiman/uint256"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic"
)
@ -41,6 +40,10 @@ type Pool interface {
NotifyNewPeer(peerID PeerID)
}
type Broadcaster interface {
OnNew(hashes Hashes)
}
// SubPoolMarker ordered bitset responsible to sort transactions by sub-pools. Bits meaning:
// 1. Minimum fee requirement. Set to 1 if feeCap of the transaction is no less than in-protocol parameter of minimal base fee. Set to 0 if feeCap is less than minimum base fee, which means this transaction will never be included into this particular chain.
// 2. Absence of nonce gaps. Set to 1 for transactions whose nonce is N, state nonce for the sender is M, and there are transactions for all nonces between M and N from the same sender. Set to 0 is the transaction's nonce is divided from the state nonce by one or more nonce gaps.
@ -124,10 +127,12 @@ type TxPool struct {
// fields for transaction propagation
recentlyConnectedPeers *recentlyConnectedPeers
newTxs chan Hashes
broadcaster Broadcaster
//lastTxPropagationTimestamp time.Time
}
func New() *TxPool {
func New(newTxs chan Hashes) *TxPool {
localsHistory, _ := lru.New(1024)
return &TxPool{
lock: &sync.RWMutex{},
@ -138,6 +143,7 @@ func New() *TxPool {
pending: NewSubPool(),
baseFee: NewSubPool(),
queued: NewSubPool(),
newTxs: newTxs,
}
}
@ -176,6 +182,10 @@ func (p *TxPool) AppendRemoteHashes(buf []byte) {
i++
}
}
func (p *TxPool) AppendAllHashes(buf []byte) {
p.AppendLocalHashes(buf)
p.AppendRemoteHashes(buf[len(buf):])
}
func (p *TxPool) IdHashKnown(hash []byte) bool {
p.lock.RLock()
defer p.lock.RUnlock()
@ -183,6 +193,16 @@ func (p *TxPool) IdHashKnown(hash []byte) bool {
_, ok := p.byHash[string(hash)]
return ok
}
func (p *TxPool) IdHashIsLocal(hash []byte) bool {
p.lock.RLock()
defer p.lock.RUnlock()
txn, ok := p.byHash[string(hash)]
if !ok {
return false
}
return txn.SubPool&IsLocal != 0
}
func (p *TxPool) OnNewPeer(peerID PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) }
func (p *TxPool) OnNewBlock(unwindTxs, minedTxs TxSlots, protocolBaseFee, blockBaseFee uint64) error {
p.lock.Lock()
@ -200,8 +220,10 @@ func (p *TxPool) OnNewBlock(unwindTxs, minedTxs TxSlots, protocolBaseFee, blockB
}
id++
p.senderIDs[string(unwindTxs.senders[i*20:(i+1)*20])] = id
unwindTxs.txs[i].senderID = id
}
}
/*
for i := range unwindTxs {
unwindTxs[i].senderID == 0
@ -213,7 +235,39 @@ func (p *TxPool) OnNewBlock(unwindTxs, minedTxs TxSlots, protocolBaseFee, blockB
}
*/
// TODO: assign senderID to all transactions
return onNewBlock(p.senderInfo, unwindTxs.txs, minedTxs.txs, protocolBaseFee, blockBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory)
if err := onNewBlock(p.senderInfo, unwindTxs.txs, minedTxs.txs, protocolBaseFee, blockBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil {
return err
}
newTxs := make(Hashes, 0, 32*len(unwindTxs.txs))
for i := range unwindTxs.txs {
_, ok := p.byHash[string(unwindTxs.txs[i].idHash[:])]
if !ok {
continue
}
newTxs = append(newTxs, unwindTxs.txs[i].idHash[:]...)
}
select {
case p.newTxs <- newTxs:
default:
}
return nil
/*
localTxHashes = localTxHashes[:0]
p.AppendLocalHashes(last, localTxHashes)
initialAmount := len(localTxHashes)
sentToPeers := send.BroadcastLocalPooledTxs(localTxHashes)
if initialAmount == 1 {
p.logger.Info("local tx propagated", "to_peers_amount", sentToPeers, "tx_hash", localTxHashes)
} else {
p.logger.Info("local byHash propagated", "to_peers_amount", sentToPeers, "txs_amount", initialAmount)
}
remoteTxHashes = remoteTxHashes[:0]
p.FillRemoteHashesSince(last, remoteTxHashes)
send.BroadcastRemotePooledTxs(remoteTxHashes)
*/
}
func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*MetaTx, localsHistory *lru.Cache) error {
@ -609,22 +663,14 @@ func (p *WorstQueue) Pop() interface{} {
return item
}
// Below is a draft code, will convert it to Loop and LoopStep funcs later
type PoolImpl struct {
recentlyConnectedPeers *recentlyConnectedPeers
lastTxPropagationTimestamp time.Time
logger log.Logger
}
// Loop - does:
// BroadcastLoop - does:
// send pending byHash to p2p:
// - new byHash
// - all pooled byHash to recently connected peers
// - all local pooled byHash to random peers periodically
// promote/demote transactions
// reorgs
func (p *PoolImpl) Loop(ctx context.Context, send *Send, timings Timings) {
func BroadcastLoop(ctx context.Context, p *TxPool, newTxs chan Hashes, send *Send, timings Timings) {
propagateAllNewTxsEvery := time.NewTicker(timings.propagateAllNewTxsEvery)
defer propagateAllNewTxsEvery.Stop()
@ -641,43 +687,32 @@ func (p *PoolImpl) Loop(ctx context.Context, send *Send, timings Timings) {
select {
case <-ctx.Done():
return
case <-propagateAllNewTxsEvery.C: // new byHash
last := p.lastTxPropagationTimestamp
p.lastTxPropagationTimestamp = time.Now()
// first broadcast all local byHash to all peers, then non-local to random sqrt(peersAmount) peers
case h := <-newTxs:
// first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers
localTxHashes = localTxHashes[:0]
p.FillLocalHashesSince(last, localTxHashes)
initialAmount := len(localTxHashes)
sentToPeers := send.BroadcastLocalPooledTxs(localTxHashes)
if initialAmount == 1 {
p.logger.Info("local tx propagated", "to_peers_amount", sentToPeers, "tx_hash", localTxHashes)
} else {
p.logger.Info("local byHash propagated", "to_peers_amount", sentToPeers, "txs_amount", initialAmount)
remoteTxHashes = remoteTxHashes[:0]
for i := 0; i < h.Len(); i++ {
if p.IdHashIsLocal(h.At(i)) {
localTxHashes = append(localTxHashes, h.At(i)...)
} else {
remoteTxHashes = append(localTxHashes, h.At(i)...)
}
}
remoteTxHashes = remoteTxHashes[:0]
p.FillRemoteHashesSince(last, remoteTxHashes)
send.BroadcastLocalPooledTxs(localTxHashes)
send.BroadcastRemotePooledTxs(remoteTxHashes)
case <-syncToNewPeersEvery.C: // new peer
newPeers := p.recentlyConnectedPeers.GetAndClean()
if len(newPeers) == 0 {
continue
}
p.FillRemoteHashes(remoteTxHashes[:0])
p.AppendAllHashes(remoteTxHashes[:0])
send.PropagatePooledTxsToPeersList(newPeers, remoteTxHashes)
case <-broadcastLocalTransactionsEvery.C: // periodically broadcast local byHash to random peers
p.FillLocalHashes(localTxHashes[:0])
send.BroadcastLocalPooledTxs(localTxHashes)
}
}
}
func (p *PoolImpl) FillLocalHashesSince(since time.Time, to []byte) {}
func (p *PoolImpl) FillRemoteHashesSince(since time.Time, to []byte) {}
func (p *PoolImpl) FillLocalHashes(to []byte) {}
func (p *PoolImpl) FillRemoteHashes(to []byte) {}
// recentlyConnectedPeers does buffer IDs of recently connected good peers
// then sync of pooled Transaction can happen to all of then at once
// DoS protection and performance saving

View File

@ -176,7 +176,8 @@ func FuzzOnNewBlocks3(f *testing.F) {
minedTxs.senders = txs.senders[len(txs.txs)-3:]
}
pool := New()
ch := make(chan Hashes, 100)
pool := New(ch)
pool.senderInfo = senders
pool.senderIDs = senderIDs
err := pool.OnNewBlock(unwindTxs, minedTxs, protocolBaseFee, blockBaseFee)
@ -287,6 +288,8 @@ func FuzzOnNewBlocks3(f *testing.F) {
_, ok = pool.byHash[string(minedTxs.txs[i].idHash[:])]
assert.False(ok)
}
newHashes := <-ch
assert.Equal(len(unwindTxs.txs), newHashes.Len())
})
}

View File

@ -34,6 +34,9 @@ type PeerID *types.H512
type Hashes []byte // flatten list of 32-byte hashes
func (h Hashes) At(i int) []byte { return h[i*32 : (i+1)*32] }
func (h Hashes) Len() int { return len(h) / 32 }
// TxContext is object that is required to parse transactions and turn transaction payload into TxSlot objects
// usage of TxContext helps avoid extra memory allocations
type TxParseContext struct {