From 5f3c7beca3d2610d963046ed57e34220f0034a3c Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Mon, 23 Aug 2021 20:03:09 +0700 Subject: [PATCH] persistence --- txpool/pool.go | 52 ++++++++++++++++++++++++---------------- txpool/pool_fuzz_test.go | 51 +++++++++++++++++++-------------------- txpool/types.go | 3 ++- 3 files changed, 59 insertions(+), 47 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index fc596e96d..badc07a07 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -131,7 +131,7 @@ func NewSendersCache() *SendersCache { func (sc *SendersCache) printDebug(prefix string) { fmt.Printf("%s.SendersCache.senderInfo\n", prefix) for i, j := range sc.senderInfo { - fmt.Printf("\tid=%d, nonce=%d, txs.len=%d\n", i, j.nonce, j.txNonce2Tx.Len()) + fmt.Printf("\tid=%d,nonce=%d,balance=%d,txs.len=%d\n", i, j.nonce, j.balance.Uint64(), j.txNonce2Tx.Len()) } } @@ -466,9 +466,9 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) { byHash: map[string]*metaTx{}, localsHistory: localsHistory, recentlyConnectedPeers: &recentlyConnectedPeers{}, - pending: NewSubPool(), - baseFee: NewSubPool(), - queued: NewSubPool(), + pending: NewSubPool(PendingSubPool), + baseFee: NewSubPool(BaseFeeSubPool), + queued: NewSubPool(QueuedSubPool), newTxs: newTxs, db: db, senderID: 1, @@ -481,6 +481,12 @@ func (p *TxPool) printDebug(prefix string) { fmt.Printf("\tsenderID=%d, nonce=%d, tip=%d\n", j.Tx.senderID, j.Tx.nonce, j.Tx.tip) } fmt.Printf("%s.pool.queues.len: %d,%d,%d\n", prefix, p.pending.Len(), p.baseFee.Len(), p.queued.Len()) + for i := range *p.pending.best { + (*p.pending.best)[i].Tx.printDebug(fmt.Sprintf("%s.pending: %b", prefix, (*p.pending.best)[i].subPool)) + } + for i := range *p.queued.best { + (*p.queued.best)[i].Tx.printDebug(fmt.Sprintf("%s.queued : %b", prefix, (*p.queued.best)[i].subPool)) + } } func (p *TxPool) logStats() { protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load() @@ -977,7 +983,7 @@ func unsafeAddToPool(senders *SendersCache, newTxs TxSlots, to *SubPool, subPool } func onSenderChange(sender *senderInfo, protocolBaseFee, pendingBaseFee uint64) { - prevNonce := sender.nonce + noGapsNonce := sender.nonce + 1 cumulativeRequiredBalance := uint256.NewInt(0) minFeeCap := uint64(math.MaxUint64) minTip := uint64(math.MaxUint64) @@ -1003,15 +1009,18 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, pendingBaseFee uint64) //fmt.Printf("alex1: %d,%d,%d,%d\n", it.metaTx.NeedBalance.Uint64(), it.metaTx.Tx.gas, it.metaTx.Tx.feeCap, it.metaTx.Tx.value.Uint64()) //fmt.Printf("alex2: %d,%t\n", sender.balance.Uint64(), it.metaTx.SenderHasEnoughBalance) it.metaTx.subPool |= EnoughFeeCapProtocol + } else { + it.metaTx.subPool = 0 // TODO: we immediately drop all transactions if they have no first bit - then maybe we don't need this bit at all? And don't add such transactions to queue? + return true } // 2. Absence of nonce gaps. Set to 1 for transactions whose nonce is N, state nonce for // the sender is M, and there are transactions for all nonces between M and N from the same // sender. Set to 0 is the transaction's nonce is divided from the state nonce by one or more nonce gaps. it.metaTx.subPool &^= NoNonceGaps - if uint64(prevNonce)+1 == it.metaTx.Tx.nonce { + if noGapsNonce == it.metaTx.Tx.nonce { it.metaTx.subPool |= NoNonceGaps - prevNonce = it.Tx.nonce + noGapsNonce++ } // 3. Sufficient balance for gas. Set to 1 if the balance of sender's account in the @@ -1020,10 +1029,12 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, pendingBaseFee uint64) // nonces N+1 ... M is no more than B. Set to 0 otherwise. In other words, this bit is // set if there is currently a guarantee that the transaction and all its required prior // transactions will be able to pay for gas. - cumulativeRequiredBalance = cumulativeRequiredBalance.Add(cumulativeRequiredBalance, needBalance) // already deleted all transactions with nonce <= sender.nonce it.metaTx.subPool &^= EnoughBalance - if sender.balance.Gt(cumulativeRequiredBalance) || sender.balance.Eq(cumulativeRequiredBalance) { - it.metaTx.subPool |= EnoughBalance + if it.metaTx.Tx.nonce > sender.nonce { + cumulativeRequiredBalance = cumulativeRequiredBalance.Add(cumulativeRequiredBalance, needBalance) // already deleted all transactions with nonce <= sender.nonce + if sender.balance.Gt(cumulativeRequiredBalance) || sender.balance.Eq(cumulativeRequiredBalance) { + it.metaTx.subPool |= EnoughBalance + } } // 4. Dynamic fee requirement. Set to 1 if feeCap of the transaction is no less than @@ -1049,11 +1060,11 @@ func promote(pending, baseFee, queued *SubPool, discard func(tx *metaTx)) { break } if worst.subPool >= 0b11100 { - baseFee.Add(pending.PopWorst(), BaseFeeSubPool) + baseFee.Add(pending.PopWorst()) continue } if worst.subPool >= 0b10000 { - queued.Add(pending.PopWorst(), QueuedSubPool) + queued.Add(pending.PopWorst()) continue } discard(pending.PopWorst()) @@ -1072,7 +1083,7 @@ func promote(pending, baseFee, queued *SubPool, discard func(tx *metaTx)) { if best.subPool < 0b11110 { break } - pending.Add(baseFee.PopBest(), PendingSubPool) + pending.Add(baseFee.PopBest()) } //4. If the top element in the worst yellow queue has subPool != 0x1110, it needs to be removed from the yellow pool. @@ -1082,7 +1093,7 @@ func promote(pending, baseFee, queued *SubPool, discard func(tx *metaTx)) { break } if worst.subPool >= 0b10000 { - queued.Add(baseFee.PopWorst(), QueuedSubPool) + queued.Add(baseFee.PopWorst()) continue } discard(baseFee.PopWorst()) @@ -1102,11 +1113,11 @@ func promote(pending, baseFee, queued *SubPool, discard func(tx *metaTx)) { break } if best.subPool < 0b11110 { - baseFee.Add(queued.PopBest(), BaseFeeSubPool) + baseFee.Add(queued.PopBest()) continue } - pending.Add(queued.PopBest(), PendingSubPool) + pending.Add(queued.PopBest()) } //7. If the top element in the worst red queue has subPool < 0b1000 (not satisfying minimum fee), discard. @@ -1125,12 +1136,13 @@ func promote(pending, baseFee, queued *SubPool, discard func(tx *metaTx)) { } type SubPool struct { + t SubPoolType best *BestQueue worst *WorstQueue } -func NewSubPool() *SubPool { - return &SubPool{best: &BestQueue{}, worst: &WorstQueue{}} +func NewSubPool(t SubPoolType) *SubPool { + return &SubPool{t: t, best: &BestQueue{}, worst: &WorstQueue{}} } func (p *SubPool) EnforceInvariants() { @@ -1160,8 +1172,8 @@ func (p *SubPool) PopWorst() *metaTx { return i } func (p *SubPool) Len() int { return p.best.Len() } -func (p *SubPool) Add(i *metaTx, subPoolType SubPoolType) { - i.currentSubPool = subPoolType +func (p *SubPool) Add(i *metaTx) { + i.currentSubPool = p.t heap.Push(p.best, i) heap.Push(p.worst, i) } diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index f06842efd..581bae494 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -7,6 +7,7 @@ import ( "bytes" "context" "encoding/binary" + "fmt" "testing" "github.com/google/btree" @@ -44,9 +45,9 @@ func FuzzTwoQueue(f *testing.F) { } assert := assert.New(t) { - sub := NewSubPool() + sub := NewSubPool(PendingSubPool) for _, i := range in { - sub.Add(&metaTx{subPool: SubPoolMarker(i & 0b11111), Tx: &TxSlot{nonce: 1, value: *uint256.NewInt(1)}}, PendingSubPool) + sub.Add(&metaTx{subPool: SubPoolMarker(i & 0b11111), Tx: &TxSlot{nonce: 1, value: *uint256.NewInt(1)}}) } assert.Equal(len(in), sub.best.Len()) assert.Equal(len(in), sub.worst.Len()) @@ -70,9 +71,9 @@ func FuzzTwoQueue(f *testing.F) { } { - sub := NewSubPool() + sub := NewSubPool(PendingSubPool) for _, i := range in { - sub.Add(&metaTx{subPool: SubPoolMarker(i & 0b11111), Tx: &TxSlot{nonce: 1, value: *uint256.NewInt(1)}}, PendingSubPool) + sub.Add(&metaTx{subPool: SubPoolMarker(i & 0b11111), Tx: &TxSlot{nonce: 1, value: *uint256.NewInt(1)}}) } var prev *uint8 i := sub.Len() @@ -506,7 +507,10 @@ func FuzzOnNewBlocks11(f *testing.F) { db := mdbx.NewMDBX(log.New()).InMem().WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }).MustOpen() t.Cleanup(db.Close) - err = db.Update(context.Background(), func(tx kv.RwTx) error { return pool.flush(tx, sendersCache) }) + tx, err := db.BeginRw(context.Background()) + require.NoError(t, err) + defer tx.Rollback() + err = pool.flush(tx, sendersCache) require.NoError(t, err) check(p2pReceived, TxSlots{}, "after_flush") //checkNotify(p2pReceived, TxSlots{}, "after_flush") @@ -514,35 +518,30 @@ func FuzzOnNewBlocks11(f *testing.F) { p2, err := New(ch, nil) assert.NoError(err) s2 := NewSendersCache() - err = db.View(context.Background(), func(tx kv.Tx) error { return p2.fromDB(tx, s2) }) + err = p2.fromDB(tx, s2) require.NoError(t, err) check(txs2, TxSlots{}, "fromDB") //checkNotify(txs2, TxSlots{}, "fromDB") - //fmt.Printf("bef: %d, %d, %d, %d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len(), len(pool.byHash)) - //fmt.Printf("bef2: %d, %d, %d, %d\n", p2.pending.Len(), p2.baseFee.Len(), p2.queued.Len(), len(p2.byHash)) - //for i, b := range pool.byHash { - // c, ok := p2.byHash[i] - // if !ok { - // _ = c - // fmt.Printf("nno:%x\n", i) - // for k, _ := range p2.byHash { - // fmt.Printf("nno2:%x\n", k) - // } - // sc := sendersCache.senderInfo[b.Tx.senderID] - // sc2 := s2.senderInfo[b.Tx.senderID] - // fmt.Printf("no: %d,%d,%d,%d\n", b.Tx.senderID, b.Tx.nonce, sc.nonce, sc2.nonce) - // } - //} assert.Equal(sendersCache.senderID, s2.senderID) assert.Equal(sendersCache.blockHeight.Load(), s2.blockHeight.Load()) require.Equal(t, len(sendersCache.senderIDs), len(s2.senderIDs)) require.Equal(t, len(sendersCache.senderInfo), len(s2.senderInfo)) require.Equal(t, len(pool.byHash), len(p2.byHash)) - //assert.Equal(pool.pending.Len(), p2.pending.Len()) - //assert.Equal(pool.baseFee.Len(), p2.baseFee.Len()) - //assert.Equal(pool.queued.Len(), p2.queued.Len()) - //assert.Equal(pool.pendingBaseFee.Load(), p2.pendingBaseFee.Load()) - //assert.Equal(pool.protocolBaseFee.Load(), p2.protocolBaseFee.Load()) + if pool.pending.Len() != p2.pending.Len() { + pool.printDebug("p1") + p2.printDebug("p2") + sendersCache.printDebug("s1") + s2.printDebug("s2") + + fmt.Printf("bef: %d, %d, %d, %d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len(), len(pool.byHash)) + fmt.Printf("bef2: %d, %d, %d, %d\n", p2.pending.Len(), p2.baseFee.Len(), p2.queued.Len(), len(p2.byHash)) + } + + assert.Equal(pool.pending.Len(), p2.pending.Len()) + assert.Equal(pool.baseFee.Len(), p2.baseFee.Len()) + assert.Equal(pool.queued.Len(), p2.queued.Len()) + assert.Equal(pool.pendingBaseFee.Load(), p2.pendingBaseFee.Load()) + assert.Equal(pool.protocolBaseFee.Load(), p2.protocolBaseFee.Load()) }) } diff --git a/txpool/types.go b/txpool/types.go index 30e21080e..1968c53fc 100644 --- a/txpool/types.go +++ b/txpool/types.go @@ -490,5 +490,6 @@ func bytesToUint64(buf []byte) (x uint64) { } func (tx *TxSlot) printDebug(prefix string) { - fmt.Printf("%s: senderID=%d,nonce=%d,tip=%d,hash=%x\n", prefix, tx.senderID, tx.nonce, tx.tip, tx.idHash) + fmt.Printf("%s: senderID=%d,nonce=%d,tip=%d,v=%d\n", prefix, tx.senderID, tx.nonce, tx.tip, tx.value.Uint64()) + //fmt.Printf("%s: senderID=%d,nonce=%d,tip=%d,hash=%x\n", prefix, tx.senderID, tx.nonce, tx.tip, tx.idHash) }