add Pool object, unwind and forward now done by 1 function

This commit is contained in:
alex.sharov 2021-08-02 16:34:16 +07:00
parent 630f692051
commit b3adf70e40
4 changed files with 98 additions and 37 deletions

View File

@ -72,7 +72,7 @@ func TestSendTxPropagate(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
t.Run("few remote txs", func(t *testing.T) {
t.Run("few remote byHash", func(t *testing.T) {
m := NewMockSentry(ctx)
send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil, logger)
send.BroadcastRemotePooledTxs(toHashes([32]byte{1}, [32]byte{42}))
@ -83,7 +83,7 @@ func TestSendTxPropagate(t *testing.T) {
assert.Equal(t, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, first.Id)
assert.Equal(t, 68, len(first.Data))
})
t.Run("much remote txs", func(t *testing.T) {
t.Run("much remote byHash", func(t *testing.T) {
m := NewMockSentry(ctx)
send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil, logger)
list := make(Hashes, p2pTxPacketLimit*3)
@ -100,7 +100,7 @@ func TestSendTxPropagate(t *testing.T) {
require.True(t, len(call.Data) > 0)
}
})
t.Run("few local txs", func(t *testing.T) {
t.Run("few local byHash", func(t *testing.T) {
m := NewMockSentry(ctx)
m.SendMessageToAllFunc = func(contextMoqParam context.Context, outboundMessageData *sentry.OutboundMessageData) (*sentry.SentPeers, error) {
return &sentry.SentPeers{Peers: make([]*types.H512, 5)}, nil

View File

@ -109,13 +109,17 @@ func (i *nonce2TxItem) Less(than btree.Item) bool {
return i.MetaTx.Tx.nonce < than.(*nonce2TxItem).MetaTx.Tx.nonce
}
// PoolEnv - holds all pool-related data structures, but has no methods (or simple mutex-based methods)
type PoolEnv struct {
// TxPool - holds all pool-related data structures and lock-based tiny methods
// most of logic implemented by pure tests-friendly functions
type TxPool struct {
lock *sync.RWMutex
logger log.Logger
protocolBaseFee atomic.Uint64
blockBaseFee atomic.Uint64
senderInfo map[uint64]SenderInfo
byHash map[string]*MetaTx // tx_hash => tx
pending, baseFee, queued *SubPool
// fields for transaction propagation
@ -123,28 +127,78 @@ type PoolEnv struct {
lastTxPropagationTimestamp time.Time
}
func loopStep(env *PoolEnv) {
if false {
unwindedTxs := []*TxSlot{}
unwind(env.senderInfo, unwindedTxs, env.pending)
}
if false {
//TODO: get arrived blocks
minedTxs := []*TxSlot{}
blockBasedFee := env.protocolBaseFee.Load() + 123
onNewBlocks(env.senderInfo, minedTxs, env.protocolBaseFee.Load(), blockBasedFee, env.pending, env.baseFee, env.queued)
}
func (p *TxPool) GetRlp(hash []byte) []byte {
p.lock.RLock()
defer p.lock.RUnlock()
txn, ok := p.byHash[string(hash)]
if !ok {
return nil
}
return txn.Tx.rlp
}
func (p *TxPool) AppendLocalHashes(buf []byte) {
p.lock.RLock()
defer p.lock.RUnlock()
i := 0
for hash, txn := range p.byHash {
if txn.SubPool&IsLocal == 0 {
continue
}
copy(buf[i*32:(i+1)*32], hash)
i++
}
}
func (p *TxPool) AppendRemoteHashes(buf []byte) {
p.lock.RLock()
defer p.lock.RUnlock()
i := 0
for hash, txn := range p.byHash {
if txn.SubPool&IsLocal != 0 {
continue
}
copy(buf[i*32:(i+1)*32], hash)
i++
}
}
func (p *TxPool) IdHashKnown(hash []byte) bool {
p.lock.RLock()
defer p.lock.RUnlock()
_, ok := p.byHash[string(hash)]
return ok
}
func (p *TxPool) OnNewPeer(peerID PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) }
func (p *TxPool) OnNewBlock(unwindTxs, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64) {
p.lock.Lock()
defer p.lock.Unlock()
p.protocolBaseFee.Store(protocolBaseFee)
p.blockBaseFee.Store(blockBaseFee)
onNewBlock(p.senderInfo, unwindTxs, minedTxs, protocolBaseFee, blockBaseFee, p.pending, p.baseFee, p.queued, p.byHash)
}
// onNewBlocks - apply new highest block (or batch of blocks)
func onNewBlock(senderInfo map[uint64]SenderInfo, unwindTxs, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*MetaTx) {
if unwindTxs != nil {
unwind(senderInfo, unwindTxs, pending, func(i *MetaTx) {
delete(byHash, string(i.Tx.idHash[:]))
})
}
forward(senderInfo, minedTxs, protocolBaseFee, blockBaseFee, pending, baseFee, queued, func(i *MetaTx) {
delete(byHash, string(i.Tx.idHash[:]))
senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i})
})
}
// forward - apply new highest block (or batch of blocks)
//
// 1. New best block arrives, which potentially changes the balance and the nonce of some senders.
// We use senderIds data structure to find relevant senderId values, and then use senders data structure to
// modify state_balance and state_nonce, potentially remove some elements (if transaction with some nonce is
// included into a block), and finally, walk over the transaction records and update SubPool fields depending on
// the actual presence of nonce gaps and what the balance is.
func onNewBlocks(senderInfo map[uint64]SenderInfo, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool) {
func forward(senderInfo map[uint64]SenderInfo, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, discard func(tx *MetaTx)) {
// TODO: change sender.nonce
for _, tx := range minedTxs {
@ -166,10 +220,13 @@ func onNewBlocks(senderInfo map[uint64]SenderInfo, minedTxs []*TxSlot, protocolB
switch it.MetaTx.currentSubPool {
case PendingSubPool:
pending.UnsafeRemove(it.MetaTx)
discard(it.MetaTx)
case BaseFeeSubPool:
baseFee.UnsafeRemove(it.MetaTx)
discard(it.MetaTx)
case QueuedSubPool:
queued.UnsafeRemove(it.MetaTx)
discard(it.MetaTx)
default:
//already removed
}
@ -184,7 +241,7 @@ func onNewBlocks(senderInfo map[uint64]SenderInfo, minedTxs []*TxSlot, protocolB
baseFee.EnforceInvariants()
queued.EnforceInvariants()
promote(pending, baseFee, queued)
promote(pending, baseFee, queued, discard)
}
// unwind
@ -197,10 +254,10 @@ func onNewBlocks(senderInfo map[uint64]SenderInfo, minedTxs []*TxSlot, protocolB
// they effective lose their priority over the "remote" transactions. In order to prevent that,
// somehow the fact that certain transactions were local, needs to be remembered for some
// time (up to some "immutability threshold").
func unwind(senderInfo map[uint64]SenderInfo, unwindedTxs []*TxSlot, pending *SubPool) {
func unwind(senderInfo map[uint64]SenderInfo, unwindTxs []*TxSlot, pending *SubPool, onAdd func(tx *MetaTx)) {
// TODO: change sender.nonce
for _, tx := range unwindedTxs {
for _, tx := range unwindTxs {
sender, ok := senderInfo[tx.senderID]
if !ok {
panic("not implemented yet")
@ -215,6 +272,7 @@ func unwind(senderInfo map[uint64]SenderInfo, unwindedTxs []*TxSlot, pending *Su
}
}
sender.txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{mt})
onAdd(mt)
pending.UnsafeAdd(mt, PendingSubPool)
}
pending.EnforceInvariants()
@ -275,7 +333,7 @@ func onSenderChange(sender SenderInfo, protocolBaseFee, blockBaseFee uint64) {
})
}
func promote(pending, baseFee, queued *SubPool) {
func promote(pending, baseFee, queued *SubPool, discard func(tx *MetaTx)) {
//1. If top element in the worst green queue has SubPool != 0b1111 (binary), it needs to be removed from the green pool.
// If SubPool < 0b1000 (not satisfying minimum fee), discard.
// If SubPool == 0b1110, demote to the yellow pool, otherwise demote to the red pool.
@ -291,7 +349,7 @@ func promote(pending, baseFee, queued *SubPool) {
queued.Add(pending.PopWorst(), QueuedSubPool)
continue
}
pending.PopWorst()
discard(pending.PopWorst())
}
//2. If top element in the worst green queue has SubPool == 0b1111, but there is not enough room in the pool, discard.
@ -320,7 +378,7 @@ func promote(pending, baseFee, queued *SubPool) {
queued.Add(baseFee.PopWorst(), QueuedSubPool)
continue
}
baseFee.PopWorst()
discard(baseFee.PopWorst())
}
//5. If the top element in the worst yellow queue has SubPool == 0x1110, but there is not enough room in the pool, discard.
@ -328,7 +386,7 @@ func promote(pending, baseFee, queued *SubPool) {
if worst.SubPool >= 0b11110 {
break
}
baseFee.PopWorst()
discard(baseFee.PopWorst())
}
//6. If the top element in the best red queue has SubPool == 0x1110, promote to the yellow pool. If SubPool == 0x1111, promote to the green pool.
@ -350,12 +408,12 @@ func promote(pending, baseFee, queued *SubPool) {
break
}
queued.PopWorst()
discard(queued.PopWorst())
}
//8. If the top element in the worst red queue has SubPool >= 0b100, but there is not enough room in the pool, discard.
for _ = queued.Worst(); queued.Len() > QueuedSubPoolLimit; _ = queued.Worst() {
queued.PopWorst()
discard(queued.PopWorst())
}
}
@ -508,10 +566,10 @@ func NewPool() *PoolImpl {
}
// Loop - does:
// send pending txs to p2p:
// - new txs
// - all pooled txs to recently connected peers
// - all local pooled txs to random peers periodically
// send pending byHash to p2p:
// - new byHash
// - all pooled byHash to recently connected peers
// - all local pooled byHash to random peers periodically
// promote/demote transactions
// reorgs
func (p *PoolImpl) Loop(ctx context.Context, send *Send, timings Timings) {
@ -531,11 +589,11 @@ func (p *PoolImpl) Loop(ctx context.Context, send *Send, timings Timings) {
select {
case <-ctx.Done():
return
case <-propagateAllNewTxsEvery.C: // new txs
case <-propagateAllNewTxsEvery.C: // new byHash
last := p.lastTxPropagationTimestamp
p.lastTxPropagationTimestamp = time.Now()
// first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers
// first broadcast all local byHash to all peers, then non-local to random sqrt(peersAmount) peers
localTxHashes = localTxHashes[:0]
p.FillLocalHashesSince(last, localTxHashes)
initialAmount := len(localTxHashes)
@ -543,7 +601,7 @@ func (p *PoolImpl) Loop(ctx context.Context, send *Send, timings Timings) {
if initialAmount == 1 {
p.logger.Info("local tx propagated", "to_peers_amount", sentToPeers, "tx_hash", localTxHashes)
} else {
p.logger.Info("local txs propagated", "to_peers_amount", sentToPeers, "txs_amount", initialAmount)
p.logger.Info("local byHash propagated", "to_peers_amount", sentToPeers, "txs_amount", initialAmount)
}
remoteTxHashes = remoteTxHashes[:0]
@ -556,7 +614,7 @@ func (p *PoolImpl) Loop(ctx context.Context, send *Send, timings Timings) {
}
p.FillRemoteHashes(remoteTxHashes[:0])
send.PropagatePooledTxsToPeersList(newPeers, remoteTxHashes)
case <-broadcastLocalTransactionsEvery.C: // periodically broadcast local txs to random peers
case <-broadcastLocalTransactionsEvery.C: // periodically broadcast local byHash to random peers
p.FillLocalHashes(localTxHashes[:0])
send.BroadcastLocalPooledTxs(localTxHashes)
}

View File

@ -173,8 +173,9 @@ func FuzzOnNewBlocks3(f *testing.F) {
if !ok {
t.Skip()
}
byHash := map[string]*MetaTx{}
pending, baseFee, queued := NewSubPool(), NewSubPool(), NewSubPool()
onNewBlocks(senders, txs, protocolBaseFee, blockBaseFee, pending, baseFee, queued)
onNewBlock(senders, txs, nil, protocolBaseFee, blockBaseFee, pending, baseFee, queued, byHash)
best, worst := pending.Best(), pending.Worst()
assert.LessOrEqual(pending.Len(), PendingSubPoolLimit)

View File

@ -78,6 +78,8 @@ type TxSlot struct {
//bestIdx int // Index of the transaction in the best priority queue (of whatever pool it currently belongs to)
//worstIdx int // Index of the transaction in the worst priority queue (of whatever pook it currently belongs to)
//local bool // Whether transaction has been injected locally (and hence needs priority when mining or proposing a block)
rlp []byte
}
const (
@ -94,7 +96,7 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int) (slot *TxSl
if len(payload) == 0 {
return nil, sender, 0, fmt.Errorf("%s: empty rlp", ParseTransactionErrorPrefix)
}
slot = &TxSlot{}
slot = &TxSlot{rlp: payload}
// Compute transaction hash
ctx.keccak1.Reset()
ctx.keccak2.Reset()