persistence

This commit is contained in:
alex.sharov 2021-08-23 20:03:09 +07:00
parent 8133abfd56
commit 5f3c7beca3
3 changed files with 59 additions and 47 deletions

View File

@ -131,7 +131,7 @@ func NewSendersCache() *SendersCache {
func (sc *SendersCache) printDebug(prefix string) {
fmt.Printf("%s.SendersCache.senderInfo\n", prefix)
for i, j := range sc.senderInfo {
fmt.Printf("\tid=%d, nonce=%d, txs.len=%d\n", i, j.nonce, j.txNonce2Tx.Len())
fmt.Printf("\tid=%d,nonce=%d,balance=%d,txs.len=%d\n", i, j.nonce, j.balance.Uint64(), j.txNonce2Tx.Len())
}
}
@ -466,9 +466,9 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) {
byHash: map[string]*metaTx{},
localsHistory: localsHistory,
recentlyConnectedPeers: &recentlyConnectedPeers{},
pending: NewSubPool(),
baseFee: NewSubPool(),
queued: NewSubPool(),
pending: NewSubPool(PendingSubPool),
baseFee: NewSubPool(BaseFeeSubPool),
queued: NewSubPool(QueuedSubPool),
newTxs: newTxs,
db: db,
senderID: 1,
@ -481,6 +481,12 @@ func (p *TxPool) printDebug(prefix string) {
fmt.Printf("\tsenderID=%d, nonce=%d, tip=%d\n", j.Tx.senderID, j.Tx.nonce, j.Tx.tip)
}
fmt.Printf("%s.pool.queues.len: %d,%d,%d\n", prefix, p.pending.Len(), p.baseFee.Len(), p.queued.Len())
for i := range *p.pending.best {
(*p.pending.best)[i].Tx.printDebug(fmt.Sprintf("%s.pending: %b", prefix, (*p.pending.best)[i].subPool))
}
for i := range *p.queued.best {
(*p.queued.best)[i].Tx.printDebug(fmt.Sprintf("%s.queued : %b", prefix, (*p.queued.best)[i].subPool))
}
}
func (p *TxPool) logStats() {
protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load()
@ -977,7 +983,7 @@ func unsafeAddToPool(senders *SendersCache, newTxs TxSlots, to *SubPool, subPool
}
func onSenderChange(sender *senderInfo, protocolBaseFee, pendingBaseFee uint64) {
prevNonce := sender.nonce
noGapsNonce := sender.nonce + 1
cumulativeRequiredBalance := uint256.NewInt(0)
minFeeCap := uint64(math.MaxUint64)
minTip := uint64(math.MaxUint64)
@ -1003,15 +1009,18 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, pendingBaseFee uint64)
//fmt.Printf("alex1: %d,%d,%d,%d\n", it.metaTx.NeedBalance.Uint64(), it.metaTx.Tx.gas, it.metaTx.Tx.feeCap, it.metaTx.Tx.value.Uint64())
//fmt.Printf("alex2: %d,%t\n", sender.balance.Uint64(), it.metaTx.SenderHasEnoughBalance)
it.metaTx.subPool |= EnoughFeeCapProtocol
} else {
it.metaTx.subPool = 0 // TODO: we immediately drop all transactions if they have no first bit - then maybe we don't need this bit at all? And don't add such transactions to queue?
return true
}
// 2. Absence of nonce gaps. Set to 1 for transactions whose nonce is N, state nonce for
// the sender is M, and there are transactions for all nonces between M and N from the same
// sender. Set to 0 is the transaction's nonce is divided from the state nonce by one or more nonce gaps.
it.metaTx.subPool &^= NoNonceGaps
if uint64(prevNonce)+1 == it.metaTx.Tx.nonce {
if noGapsNonce == it.metaTx.Tx.nonce {
it.metaTx.subPool |= NoNonceGaps
prevNonce = it.Tx.nonce
noGapsNonce++
}
// 3. Sufficient balance for gas. Set to 1 if the balance of sender's account in the
@ -1020,10 +1029,12 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, pendingBaseFee uint64)
// nonces N+1 ... M is no more than B. Set to 0 otherwise. In other words, this bit is
// set if there is currently a guarantee that the transaction and all its required prior
// transactions will be able to pay for gas.
cumulativeRequiredBalance = cumulativeRequiredBalance.Add(cumulativeRequiredBalance, needBalance) // already deleted all transactions with nonce <= sender.nonce
it.metaTx.subPool &^= EnoughBalance
if sender.balance.Gt(cumulativeRequiredBalance) || sender.balance.Eq(cumulativeRequiredBalance) {
it.metaTx.subPool |= EnoughBalance
if it.metaTx.Tx.nonce > sender.nonce {
cumulativeRequiredBalance = cumulativeRequiredBalance.Add(cumulativeRequiredBalance, needBalance) // already deleted all transactions with nonce <= sender.nonce
if sender.balance.Gt(cumulativeRequiredBalance) || sender.balance.Eq(cumulativeRequiredBalance) {
it.metaTx.subPool |= EnoughBalance
}
}
// 4. Dynamic fee requirement. Set to 1 if feeCap of the transaction is no less than
@ -1049,11 +1060,11 @@ func promote(pending, baseFee, queued *SubPool, discard func(tx *metaTx)) {
break
}
if worst.subPool >= 0b11100 {
baseFee.Add(pending.PopWorst(), BaseFeeSubPool)
baseFee.Add(pending.PopWorst())
continue
}
if worst.subPool >= 0b10000 {
queued.Add(pending.PopWorst(), QueuedSubPool)
queued.Add(pending.PopWorst())
continue
}
discard(pending.PopWorst())
@ -1072,7 +1083,7 @@ func promote(pending, baseFee, queued *SubPool, discard func(tx *metaTx)) {
if best.subPool < 0b11110 {
break
}
pending.Add(baseFee.PopBest(), PendingSubPool)
pending.Add(baseFee.PopBest())
}
//4. If the top element in the worst yellow queue has subPool != 0x1110, it needs to be removed from the yellow pool.
@ -1082,7 +1093,7 @@ func promote(pending, baseFee, queued *SubPool, discard func(tx *metaTx)) {
break
}
if worst.subPool >= 0b10000 {
queued.Add(baseFee.PopWorst(), QueuedSubPool)
queued.Add(baseFee.PopWorst())
continue
}
discard(baseFee.PopWorst())
@ -1102,11 +1113,11 @@ func promote(pending, baseFee, queued *SubPool, discard func(tx *metaTx)) {
break
}
if best.subPool < 0b11110 {
baseFee.Add(queued.PopBest(), BaseFeeSubPool)
baseFee.Add(queued.PopBest())
continue
}
pending.Add(queued.PopBest(), PendingSubPool)
pending.Add(queued.PopBest())
}
//7. If the top element in the worst red queue has subPool < 0b1000 (not satisfying minimum fee), discard.
@ -1125,12 +1136,13 @@ func promote(pending, baseFee, queued *SubPool, discard func(tx *metaTx)) {
}
type SubPool struct {
t SubPoolType
best *BestQueue
worst *WorstQueue
}
func NewSubPool() *SubPool {
return &SubPool{best: &BestQueue{}, worst: &WorstQueue{}}
func NewSubPool(t SubPoolType) *SubPool {
return &SubPool{t: t, best: &BestQueue{}, worst: &WorstQueue{}}
}
func (p *SubPool) EnforceInvariants() {
@ -1160,8 +1172,8 @@ func (p *SubPool) PopWorst() *metaTx {
return i
}
func (p *SubPool) Len() int { return p.best.Len() }
func (p *SubPool) Add(i *metaTx, subPoolType SubPoolType) {
i.currentSubPool = subPoolType
func (p *SubPool) Add(i *metaTx) {
i.currentSubPool = p.t
heap.Push(p.best, i)
heap.Push(p.worst, i)
}

View File

@ -7,6 +7,7 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"testing"
"github.com/google/btree"
@ -44,9 +45,9 @@ func FuzzTwoQueue(f *testing.F) {
}
assert := assert.New(t)
{
sub := NewSubPool()
sub := NewSubPool(PendingSubPool)
for _, i := range in {
sub.Add(&metaTx{subPool: SubPoolMarker(i & 0b11111), Tx: &TxSlot{nonce: 1, value: *uint256.NewInt(1)}}, PendingSubPool)
sub.Add(&metaTx{subPool: SubPoolMarker(i & 0b11111), Tx: &TxSlot{nonce: 1, value: *uint256.NewInt(1)}})
}
assert.Equal(len(in), sub.best.Len())
assert.Equal(len(in), sub.worst.Len())
@ -70,9 +71,9 @@ func FuzzTwoQueue(f *testing.F) {
}
{
sub := NewSubPool()
sub := NewSubPool(PendingSubPool)
for _, i := range in {
sub.Add(&metaTx{subPool: SubPoolMarker(i & 0b11111), Tx: &TxSlot{nonce: 1, value: *uint256.NewInt(1)}}, PendingSubPool)
sub.Add(&metaTx{subPool: SubPoolMarker(i & 0b11111), Tx: &TxSlot{nonce: 1, value: *uint256.NewInt(1)}})
}
var prev *uint8
i := sub.Len()
@ -506,7 +507,10 @@ func FuzzOnNewBlocks11(f *testing.F) {
db := mdbx.NewMDBX(log.New()).InMem().WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }).MustOpen()
t.Cleanup(db.Close)
err = db.Update(context.Background(), func(tx kv.RwTx) error { return pool.flush(tx, sendersCache) })
tx, err := db.BeginRw(context.Background())
require.NoError(t, err)
defer tx.Rollback()
err = pool.flush(tx, sendersCache)
require.NoError(t, err)
check(p2pReceived, TxSlots{}, "after_flush")
//checkNotify(p2pReceived, TxSlots{}, "after_flush")
@ -514,35 +518,30 @@ func FuzzOnNewBlocks11(f *testing.F) {
p2, err := New(ch, nil)
assert.NoError(err)
s2 := NewSendersCache()
err = db.View(context.Background(), func(tx kv.Tx) error { return p2.fromDB(tx, s2) })
err = p2.fromDB(tx, s2)
require.NoError(t, err)
check(txs2, TxSlots{}, "fromDB")
//checkNotify(txs2, TxSlots{}, "fromDB")
//fmt.Printf("bef: %d, %d, %d, %d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len(), len(pool.byHash))
//fmt.Printf("bef2: %d, %d, %d, %d\n", p2.pending.Len(), p2.baseFee.Len(), p2.queued.Len(), len(p2.byHash))
//for i, b := range pool.byHash {
// c, ok := p2.byHash[i]
// if !ok {
// _ = c
// fmt.Printf("nno:%x\n", i)
// for k, _ := range p2.byHash {
// fmt.Printf("nno2:%x\n", k)
// }
// sc := sendersCache.senderInfo[b.Tx.senderID]
// sc2 := s2.senderInfo[b.Tx.senderID]
// fmt.Printf("no: %d,%d,%d,%d\n", b.Tx.senderID, b.Tx.nonce, sc.nonce, sc2.nonce)
// }
//}
assert.Equal(sendersCache.senderID, s2.senderID)
assert.Equal(sendersCache.blockHeight.Load(), s2.blockHeight.Load())
require.Equal(t, len(sendersCache.senderIDs), len(s2.senderIDs))
require.Equal(t, len(sendersCache.senderInfo), len(s2.senderInfo))
require.Equal(t, len(pool.byHash), len(p2.byHash))
//assert.Equal(pool.pending.Len(), p2.pending.Len())
//assert.Equal(pool.baseFee.Len(), p2.baseFee.Len())
//assert.Equal(pool.queued.Len(), p2.queued.Len())
//assert.Equal(pool.pendingBaseFee.Load(), p2.pendingBaseFee.Load())
//assert.Equal(pool.protocolBaseFee.Load(), p2.protocolBaseFee.Load())
if pool.pending.Len() != p2.pending.Len() {
pool.printDebug("p1")
p2.printDebug("p2")
sendersCache.printDebug("s1")
s2.printDebug("s2")
fmt.Printf("bef: %d, %d, %d, %d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len(), len(pool.byHash))
fmt.Printf("bef2: %d, %d, %d, %d\n", p2.pending.Len(), p2.baseFee.Len(), p2.queued.Len(), len(p2.byHash))
}
assert.Equal(pool.pending.Len(), p2.pending.Len())
assert.Equal(pool.baseFee.Len(), p2.baseFee.Len())
assert.Equal(pool.queued.Len(), p2.queued.Len())
assert.Equal(pool.pendingBaseFee.Load(), p2.pendingBaseFee.Load())
assert.Equal(pool.protocolBaseFee.Load(), p2.protocolBaseFee.Load())
})
}

View File

@ -490,5 +490,6 @@ func bytesToUint64(buf []byte) (x uint64) {
}
func (tx *TxSlot) printDebug(prefix string) {
fmt.Printf("%s: senderID=%d,nonce=%d,tip=%d,hash=%x\n", prefix, tx.senderID, tx.nonce, tx.tip, tx.idHash)
fmt.Printf("%s: senderID=%d,nonce=%d,tip=%d,v=%d\n", prefix, tx.senderID, tx.nonce, tx.tip, tx.value.Uint64())
//fmt.Printf("%s: senderID=%d,nonce=%d,tip=%d,hash=%x\n", prefix, tx.senderID, tx.nonce, tx.tip, tx.idHash)
}