mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-19 09:04:13 +00:00
449 lines
14 KiB
Go
449 lines
14 KiB
Go
/*
|
|
Copyright 2021 Erigon contributors
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package txpool
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
_ "github.com/google/btree"
|
|
"github.com/ledgerwatch/log/v3"
|
|
)
|
|
|
|
// Pool is interface for the transaction pool
|
|
// This interface exists for the convinience of testing, and not yet because
|
|
// there are multiple implementations
|
|
type Pool interface {
|
|
// IdHashKnown check whether transaction with given Id hash is known to the pool
|
|
IdHashKnown(hash []byte) bool
|
|
|
|
NotifyNewPeer(peerID PeerID)
|
|
}
|
|
|
|
// SubPoolMarker ordered bitset responsible to sort transactions by sub-pools. Bits meaning:
|
|
// 1. Minimum fee requirement. Set to 1 if feeCap of the transaction is no less than in-protocol parameter of minimal base fee. Set to 0 if feeCap is less than minimum base fee, which means this transaction will never be included into this particular chain.
|
|
// 2. Absence of nonce gaps. Set to 1 for transactions whose nonce is N, state nonce for the sender is M, and there are transactions for all nonces between M and N from the same sender. Set to 0 is the transaction's nonce is divided from the state nonce by one or more nonce gaps.
|
|
// 3. Sufficient balance for gas. Set to 1 if the balance of sender's account in the state is B, nonce of the sender in the state is M, nonce of the transaction is N, and the sum of feeCap x gasLimit + transferred_value of all transactions from this sender with nonces N+1 ... M is no more than B. Set to 0 otherwise. In other words, this bit is set if there is currently a guarantee that the transaction and all its required prior transactions will be able to pay for gas.
|
|
// 4. Dynamic fee requirement. Set to 1 if feeCap of the transaction is no less than baseFee of the currently pending block. Set to 0 otherwise.
|
|
// 5. Local transaction. Set to 1 if transaction is local.
|
|
type SubPoolMarker uint8
|
|
|
|
func NewSubPoolMarker(enoughFeeCapProtocol, noNonCeGaps, enoughBalance, enoughFeeCapBlock, isLocal bool) SubPoolMarker {
|
|
var s SubPoolMarker
|
|
if enoughFeeCapProtocol {
|
|
s |= 1 << 4
|
|
}
|
|
if noNonCeGaps {
|
|
s |= 1 << 3
|
|
}
|
|
if enoughBalance {
|
|
s |= 1 << 2
|
|
}
|
|
if enoughFeeCapBlock {
|
|
s |= 1 << 1
|
|
}
|
|
if isLocal {
|
|
s |= 1 << 0
|
|
}
|
|
return s
|
|
}
|
|
|
|
// MetaTx holds transaction and some metadata
|
|
type MetaTx struct {
|
|
SubPool SubPoolMarker
|
|
Tx *TxSlot
|
|
bestIndex int
|
|
worstIndex int
|
|
}
|
|
|
|
type BestQueue []*MetaTx
|
|
|
|
func (p BestQueue) Len() int { return len(p) }
|
|
func (p BestQueue) Less(i, j int) bool { return p[i].SubPool > p[j].SubPool }
|
|
func (p BestQueue) Swap(i, j int) {
|
|
p[i], p[j] = p[j], p[i]
|
|
p[i].bestIndex = i
|
|
p[j].bestIndex = j
|
|
}
|
|
func (p *BestQueue) Push(x interface{}) {
|
|
n := len(*p)
|
|
item := x.(*MetaTx)
|
|
item.bestIndex = n
|
|
*p = append(*p, x.(*MetaTx))
|
|
}
|
|
func (p *BestQueue) Pop() interface{} {
|
|
old := *p
|
|
n := len(old)
|
|
item := old[n-1]
|
|
old[n-1] = nil // avoid memory leak
|
|
item.bestIndex = -1 // for safety
|
|
*p = old[0 : n-1]
|
|
return item
|
|
}
|
|
|
|
type WorstQueue []*MetaTx
|
|
|
|
func (p WorstQueue) Len() int { return len(p) }
|
|
func (p WorstQueue) Less(i, j int) bool { return p[i].SubPool < p[j].SubPool }
|
|
func (p WorstQueue) Swap(i, j int) {
|
|
p[i], p[j] = p[j], p[i]
|
|
p[i].worstIndex = i
|
|
p[j].worstIndex = j
|
|
}
|
|
func (p *WorstQueue) Push(x interface{}) {
|
|
n := len(*p)
|
|
item := x.(*MetaTx)
|
|
item.worstIndex = n
|
|
*p = append(*p, x.(*MetaTx))
|
|
}
|
|
func (p *WorstQueue) Pop() interface{} {
|
|
old := *p
|
|
n := len(old)
|
|
item := old[n-1]
|
|
old[n-1] = nil // avoid memory leak
|
|
item.worstIndex = -1 // for safety
|
|
*p = old[0 : n-1]
|
|
return item
|
|
}
|
|
|
|
type SubPool struct {
|
|
best *BestQueue
|
|
worst *WorstQueue
|
|
}
|
|
|
|
func NewSubPool() *SubPool {
|
|
p := &SubPool{best: &BestQueue{}, worst: &WorstQueue{}}
|
|
heap.Init(p.worst)
|
|
heap.Init(p.best)
|
|
return p
|
|
}
|
|
|
|
func (p *SubPool) Best() *MetaTx {
|
|
if len(*p.best) == 0 {
|
|
return nil
|
|
}
|
|
return (*p.best)[0]
|
|
}
|
|
func (p *SubPool) Worst() *MetaTx {
|
|
if len(*p.worst) == 0 {
|
|
return nil
|
|
}
|
|
return (*p.worst)[0]
|
|
}
|
|
func (p *SubPool) PopBest() *MetaTx {
|
|
i := p.best.Pop().(*MetaTx)
|
|
heap.Remove(p.worst, i.worstIndex)
|
|
return i
|
|
}
|
|
func (p *SubPool) PopWorst() *MetaTx {
|
|
i := p.worst.Pop().(*MetaTx)
|
|
heap.Remove(p.best, i.bestIndex)
|
|
return i
|
|
}
|
|
func (p *SubPool) Len() int { return p.best.Len() }
|
|
func (p *SubPool) Add(i *MetaTx) {
|
|
heap.Push(p.best, i)
|
|
heap.Push(p.worst, i)
|
|
}
|
|
|
|
const PendingSubPoolLimit = 1024
|
|
const BaseFeeSubPoolLimit = 1024
|
|
const QueuedSubPoolLimit = 1024
|
|
|
|
func PromoteStep(pending, baseFee, queued *SubPool) {
|
|
heap.Init(pending.worst)
|
|
heap.Init(pending.best)
|
|
heap.Init(baseFee.worst)
|
|
heap.Init(baseFee.best)
|
|
heap.Init(queued.worst)
|
|
heap.Init(queued.best)
|
|
|
|
//1. If top element in the worst green queue has SubPool != 0b1111 (binary), it needs to be removed from the green pool.
|
|
// If SubPool < 0b1000 (not satisfying minimum fee), discard.
|
|
// If SubPool == 0b1110, demote to the yellow pool, otherwise demote to the red pool.
|
|
for worst := pending.Worst(); pending.Len() > 0; worst = pending.Worst() {
|
|
if worst.SubPool >= 0b11110 {
|
|
break
|
|
}
|
|
if worst.SubPool >= 0b11100 {
|
|
baseFee.Add(pending.PopWorst())
|
|
continue
|
|
}
|
|
if worst.SubPool >= 0b11000 {
|
|
queued.Add(pending.PopWorst())
|
|
continue
|
|
}
|
|
pending.PopWorst()
|
|
}
|
|
|
|
//2. If top element in the worst green queue has SubPool == 0b1111, but there is not enough room in the pool, discard.
|
|
for worst := pending.Worst(); pending.Len() > PendingSubPoolLimit; worst = pending.Worst() {
|
|
if worst.SubPool >= 0b11110 { // TODO: here must 'SubPool == 0b1111' or 'SubPool <= 0b1111' ?
|
|
break
|
|
}
|
|
pending.PopWorst()
|
|
}
|
|
|
|
//3. If the top element in the best yellow queue has SubPool == 0b1111, promote to the green pool.
|
|
for best := baseFee.Best(); baseFee.Len() > 0; best = baseFee.Best() {
|
|
if best.SubPool < 0b11110 {
|
|
break
|
|
}
|
|
pending.Add(baseFee.PopWorst())
|
|
}
|
|
|
|
//4. If the top element in the worst yellow queue has SubPool != 0x1110, it needs to be removed from the yellow pool.
|
|
// If SubPool < 0b1000 (not satisfying minimum fee), discard. Otherwise, demote to the red pool.
|
|
for worst := baseFee.Worst(); baseFee.Len() > 0; worst = baseFee.Worst() {
|
|
if worst.SubPool >= 0b11100 {
|
|
break
|
|
}
|
|
if worst.SubPool >= 0b11000 {
|
|
queued.Add(baseFee.PopWorst())
|
|
continue
|
|
}
|
|
baseFee.PopWorst()
|
|
}
|
|
|
|
//5. If the top element in the worst yellow queue has SubPool == 0x1110, but there is not enough room in the pool, discard.
|
|
for worst := baseFee.Worst(); baseFee.Len() > BaseFeeSubPoolLimit; worst = baseFee.Worst() {
|
|
if worst.SubPool >= 0b11110 {
|
|
break
|
|
}
|
|
baseFee.PopWorst()
|
|
}
|
|
|
|
//6. If the top element in the best red queue has SubPool == 0x1110, promote to the yellow pool. If SubPool == 0x1111, promote to the green pool.
|
|
for best := queued.Best(); queued.Len() > 0; best = queued.Best() {
|
|
if best.SubPool < 0b11100 {
|
|
break
|
|
}
|
|
if best.SubPool < 0b11110 {
|
|
baseFee.Add(queued.PopWorst())
|
|
continue
|
|
}
|
|
|
|
pending.Add(queued.PopWorst())
|
|
}
|
|
|
|
//7. If the top element in the worst red queue has SubPool < 0b1000 (not satisfying minimum fee), discard.
|
|
for worst := queued.Worst(); queued.Len() > 0; worst = queued.Worst() {
|
|
if worst.SubPool >= 0b10000 {
|
|
break
|
|
}
|
|
|
|
queued.PopWorst()
|
|
}
|
|
|
|
//8. If the top element in the worst red queue has SubPool >= 0b100, but there is not enough room in the pool, discard.
|
|
for worst := queued.Worst(); queued.Len() > QueuedSubPoolLimit; worst = queued.Worst() {
|
|
if worst.SubPool >= 0b10000 {
|
|
break
|
|
}
|
|
|
|
queued.PopWorst()
|
|
}
|
|
}
|
|
|
|
func CheckInvariants(pending, baseFee, queued *SubPool) {
|
|
//1. If top element in the worst green queue has SubPool != 0b1111 (binary), it needs to be removed from the green pool.
|
|
// If SubPool < 0b1000 (not satisfying minimum fee), discard.
|
|
// If SubPool == 0b1110, demote to the yellow pool, otherwise demote to the red pool.
|
|
for worst := pending.Worst(); pending.Len() > 0; worst = pending.Worst() {
|
|
if worst.SubPool >= 0b11110 {
|
|
break
|
|
}
|
|
if worst.SubPool >= 0b11100 {
|
|
baseFee.Add(pending.PopWorst())
|
|
continue
|
|
}
|
|
if worst.SubPool >= 0b11000 {
|
|
queued.Add(pending.PopWorst())
|
|
continue
|
|
}
|
|
pending.PopWorst()
|
|
}
|
|
|
|
//2. If top element in the worst green queue has SubPool == 0b1111, but there is not enough room in the pool, discard.
|
|
for worst := pending.Worst(); pending.Len() > PendingSubPoolLimit; worst = pending.Worst() {
|
|
if worst.SubPool >= 0b11110 { // TODO: here must 'SubPool == 0b1111' or 'SubPool <= 0b1111' ?
|
|
break
|
|
}
|
|
pending.PopWorst()
|
|
}
|
|
|
|
//3. If the top element in the best yellow queue has SubPool == 0b1111, promote to the green pool.
|
|
for best := baseFee.Best(); baseFee.Len() > 0; best = baseFee.Best() {
|
|
if best.SubPool < 0b11110 {
|
|
break
|
|
}
|
|
pending.Add(baseFee.PopWorst())
|
|
}
|
|
|
|
//4. If the top element in the worst yellow queue has SubPool != 0x1110, it needs to be removed from the yellow pool.
|
|
// If SubPool < 0b1000 (not satisfying minimum fee), discard. Otherwise, demote to the red pool.
|
|
for worst := baseFee.Worst(); baseFee.Len() > 0; worst = baseFee.Worst() {
|
|
if worst.SubPool >= 0b11100 {
|
|
break
|
|
}
|
|
if worst.SubPool >= 0b11000 {
|
|
queued.Add(baseFee.PopWorst())
|
|
continue
|
|
}
|
|
baseFee.PopWorst()
|
|
}
|
|
|
|
//5. If the top element in the worst yellow queue has SubPool == 0x1110, but there is not enough room in the pool, discard.
|
|
for worst := baseFee.Worst(); baseFee.Len() > BaseFeeSubPoolLimit; worst = baseFee.Worst() {
|
|
if worst.SubPool >= 0b11110 {
|
|
break
|
|
}
|
|
baseFee.PopWorst()
|
|
}
|
|
|
|
//6. If the top element in the best red queue has SubPool == 0x1110, promote to the yellow pool. If SubPool == 0x1111, promote to the green pool.
|
|
for best := queued.Best(); queued.Len() > 0; best = queued.Best() {
|
|
if best.SubPool < 0b11100 {
|
|
break
|
|
}
|
|
if best.SubPool < 0b11110 {
|
|
baseFee.Add(queued.PopWorst())
|
|
continue
|
|
}
|
|
|
|
pending.Add(queued.PopWorst())
|
|
}
|
|
|
|
//7. If the top element in the worst red queue has SubPool < 0b1000 (not satisfying minimum fee), discard.
|
|
for worst := queued.Worst(); queued.Len() > 0; worst = queued.Worst() {
|
|
if worst.SubPool >= 0b10000 {
|
|
break
|
|
}
|
|
|
|
queued.PopWorst()
|
|
}
|
|
|
|
//8. If the top element in the worst red queue has SubPool >= 0b100, but there is not enough room in the pool, discard.
|
|
for worst := queued.Worst(); queued.Len() > QueuedSubPoolLimit; worst = queued.Worst() {
|
|
if worst.SubPool >= 0b10000 {
|
|
break
|
|
}
|
|
|
|
queued.PopWorst()
|
|
}
|
|
}
|
|
|
|
// Below is a draft code, will convert it to Loop and LoopStep funcs later
|
|
|
|
type PoolImpl struct {
|
|
recentlyConnectedPeers *recentlyConnectedPeers
|
|
lastTxPropagationTimestamp time.Time
|
|
logger log.Logger
|
|
}
|
|
|
|
func NewPool() *PoolImpl {
|
|
return &PoolImpl{
|
|
recentlyConnectedPeers: &recentlyConnectedPeers{},
|
|
}
|
|
}
|
|
|
|
// Loop - does:
|
|
// send pending txs to p2p:
|
|
// - new txs
|
|
// - all pooled txs to recently connected peers
|
|
// - all local pooled txs to random peers periodically
|
|
// promote/demote transactions
|
|
// reorgs
|
|
func (p *PoolImpl) Loop(ctx context.Context, send *Send, timings Timings) {
|
|
propagateAllNewTxsEvery := time.NewTicker(timings.propagateAllNewTxsEvery)
|
|
defer propagateAllNewTxsEvery.Stop()
|
|
|
|
syncToNewPeersEvery := time.NewTicker(timings.syncToNewPeersEvery)
|
|
defer syncToNewPeersEvery.Stop()
|
|
|
|
broadcastLocalTransactionsEvery := time.NewTicker(timings.broadcastLocalTransactionsEvery)
|
|
defer broadcastLocalTransactionsEvery.Stop()
|
|
|
|
localTxHashes := make([]byte, 0, 128)
|
|
remoteTxHashes := make([]byte, 0, 128)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-propagateAllNewTxsEvery.C: // new txs
|
|
last := p.lastTxPropagationTimestamp
|
|
p.lastTxPropagationTimestamp = time.Now()
|
|
|
|
// first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers
|
|
localTxHashes = localTxHashes[:0]
|
|
p.FillLocalHashesSince(last, localTxHashes)
|
|
initialAmount := len(localTxHashes)
|
|
sentToPeers := send.BroadcastLocalPooledTxs(localTxHashes)
|
|
if initialAmount == 1 {
|
|
p.logger.Info("local tx propagated", "to_peers_amount", sentToPeers, "tx_hash", localTxHashes)
|
|
} else {
|
|
p.logger.Info("local txs propagated", "to_peers_amount", sentToPeers, "txs_amount", initialAmount)
|
|
}
|
|
|
|
remoteTxHashes = remoteTxHashes[:0]
|
|
p.FillRemoteHashesSince(last, remoteTxHashes)
|
|
send.BroadcastRemotePooledTxs(remoteTxHashes)
|
|
case <-syncToNewPeersEvery.C: // new peer
|
|
newPeers := p.recentlyConnectedPeers.GetAndClean()
|
|
if len(newPeers) == 0 {
|
|
continue
|
|
}
|
|
p.FillRemoteHashes(remoteTxHashes[:0])
|
|
send.PropagatePooledTxsToPeersList(newPeers, remoteTxHashes)
|
|
case <-broadcastLocalTransactionsEvery.C: // periodically broadcast local txs to random peers
|
|
p.FillLocalHashes(localTxHashes[:0])
|
|
send.BroadcastLocalPooledTxs(localTxHashes)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *PoolImpl) FillLocalHashesSince(since time.Time, to []byte) {}
|
|
func (p *PoolImpl) FillRemoteHashesSince(since time.Time, to []byte) {}
|
|
func (p *PoolImpl) FillLocalHashes(to []byte) {}
|
|
func (p *PoolImpl) FillRemoteHashes(to []byte) {}
|
|
|
|
// recentlyConnectedPeers does buffer IDs of recently connected good peers
|
|
// then sync of pooled Transaction can happen to all of then at once
|
|
// DoS protection and performance saving
|
|
// it doesn't track if peer disconnected, it's fine
|
|
type recentlyConnectedPeers struct {
|
|
lock sync.RWMutex
|
|
peers []PeerID
|
|
}
|
|
|
|
func (l *recentlyConnectedPeers) AddPeer(p PeerID) {
|
|
l.lock.Lock()
|
|
defer l.lock.Unlock()
|
|
l.peers = append(l.peers, p)
|
|
}
|
|
|
|
func (l *recentlyConnectedPeers) GetAndClean() []PeerID {
|
|
l.lock.Lock()
|
|
defer l.lock.Unlock()
|
|
peers := l.peers
|
|
l.peers = nil
|
|
return peers
|
|
}
|