From 224d8514652a15c478fe7e84154ac123e56a35c3 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Mon, 23 Aug 2021 14:16:39 +0700 Subject: [PATCH] Changes --- txpool/pool.go | 49 +++++++++++++++++++++++++--------------- txpool/pool_fuzz_test.go | 28 +++++++++++++---------- txpool/types.go | 2 +- 3 files changed, 48 insertions(+), 31 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 49bf8a5fd..e84a6ec4a 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -469,7 +469,7 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) { func (p *TxPool) printDebug(prefix string) { fmt.Printf("%s.pool.byHash\n", prefix) for _, j := range p.byHash { - fmt.Printf("\tsenderID=%d, nonce=%d\n", j.Tx.senderID, j.Tx.nonce) + fmt.Printf("\tsenderID=%d, nonce=%d, tip=%d\n", j.Tx.senderID, j.Tx.nonce, j.Tx.tip) } fmt.Printf("%s.pool.queues.len: %d,%d,%d\n", prefix, p.pending.Len(), p.baseFee.Len(), p.queued.Len()) } @@ -588,7 +588,18 @@ func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBas } changedSenders := map[uint64]*senderInfo{} - unsafeAddToPool(senders, newTxs, pending, PendingSubPool, byHash, func(i *metaTx, sender *senderInfo) { + unsafeAddToPool(senders, newTxs, pending, PendingSubPool, byHash, func(txn *metaTx) { + switch txn.currentSubPool { + case PendingSubPool: + pending.UnsafeRemove(txn) + case BaseFeeSubPool: + baseFee.UnsafeRemove(txn) + case QueuedSubPool: + queued.UnsafeRemove(txn) + default: + //already removed + } + }, func(i *metaTx, sender *senderInfo) { changedSenders[i.Tx.senderID] = sender if _, ok := localsHistory.Get(i.Tx.idHash); ok { i.subPool |= IsLocal @@ -829,7 +840,18 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr // they effective lose their priority over the "remote" transactions. In order to prevent that, // somehow the fact that certain transactions were local, needs to be remembered for some // time (up to some "immutability threshold"). - unsafeAddToPool(senders, unwindTxs, pending, PendingSubPool, byHash, func(i *metaTx, sender *senderInfo) { + unsafeAddToPool(senders, unwindTxs, pending, PendingSubPool, byHash, func(txn *metaTx) { + switch txn.currentSubPool { + case PendingSubPool: + pending.UnsafeRemove(txn) + case BaseFeeSubPool: + baseFee.UnsafeRemove(txn) + case QueuedSubPool: + queued.UnsafeRemove(txn) + default: + //already removed + } + }, func(i *metaTx, sender *senderInfo) { changedSenders[i.Tx.senderID] = sender //fmt.Printf("add: %d,%d\n", i.Tx.senderID, i.Tx.nonce) if _, ok := localsHistory.Get(i.Tx.idHash); ok { @@ -890,7 +912,6 @@ func removeMined(senders *SendersCache, minedTxs []*TxSlot, pending, baseFee, qu return false } toDel = append(toDel, i) - //fmt.Printf("del2: %d\n", it.metaTx.Tx.nonce) // del from sub-pool switch it.metaTx.currentSubPool { case PendingSubPool: @@ -916,7 +937,7 @@ func removeMined(senders *SendersCache, minedTxs []*TxSlot, pending, baseFee, qu } // unwind -func unsafeAddToPool(senders *SendersCache, newTxs TxSlots, to *SubPool, subPoolType SubPoolType, byHash map[string]*metaTx, beforeAdd func(tx *metaTx, sender *senderInfo)) { +func unsafeAddToPool(senders *SendersCache, newTxs TxSlots, to *SubPool, subPoolType SubPoolType, byHash map[string]*metaTx, discard func(tx *metaTx), beforeAdd func(tx *metaTx, sender *senderInfo)) { for i, tx := range newTxs.txs { if _, ok := byHash[string(tx.idHash[:])]; ok { continue @@ -930,6 +951,7 @@ func unsafeAddToPool(senders *SendersCache, newTxs TxSlots, to *SubPool, subPool continue } } + byHash[string(mt.Tx.idHash[:])] = mt replaced := sender.txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{mt}) if replaced != nil { @@ -937,16 +959,7 @@ func unsafeAddToPool(senders *SendersCache, newTxs TxSlots, to *SubPool, subPool if replacedMT.Tx.idHash != mt.Tx.idHash { delete(byHash, string(replacedMT.Tx.idHash[:])) } - switch replacedMT.currentSubPool { - case PendingSubPool: - to.UnsafeRemove(replacedMT) - case BaseFeeSubPool: - to.UnsafeRemove(replacedMT) - case QueuedSubPool: - to.UnsafeRemove(replacedMT) - default: - //already removed - } + discard(replacedMT) } beforeAdd(mt, sender) to.UnsafeAdd(mt, subPoolType) @@ -1166,12 +1179,12 @@ func (p *SubPool) UnsafeAdd(i *metaTx, subPoolType SubPoolType) { p.worst.Push(i) p.best.Push(i) } -func (p *SubPool) DebugPrint() { +func (p *SubPool) DebugPrint(prefix string) { for i, it := range *p.best { - fmt.Printf("best: %d, %d, %d\n", i, it.subPool, it.bestIndex) + fmt.Printf("%s.best: %d, %d, %d\n", prefix, i, it.subPool, it.bestIndex) } for i, it := range *p.worst { - fmt.Printf("worst: %d, %d, %d\n", i, it.subPool, it.worstIndex) + fmt.Printf("%s.worst: %d, %d, %d\n", prefix, i, it.subPool, it.worstIndex) } } diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index c0c2cd139..abef96b07 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -5,14 +5,18 @@ package txpool import ( "bytes" + "context" "encoding/binary" "testing" "github.com/google/btree" "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/rlp" "github.com/ledgerwatch/log/v3" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // https://blog.golang.org/fuzz-beta @@ -500,19 +504,19 @@ func FuzzOnNewBlocks11(f *testing.F) { check(p2pReceived, TxSlots{}, "p2pmsg1") checkNotify(p2pReceived, TxSlots{}, "p2pmsg1") - //db := mdbx.NewMDBX(log.New()).InMem().WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }).MustOpen() - //t.Cleanup(db.Close) - //err = db.Update(context.Background(), func(tx kv.RwTx) error { return pool.flush(tx, sendersCache) }) - //require.NoError(t, err) - //check(p2pReceived, TxSlots{}, "after_flush") + db := mdbx.NewMDBX(log.New()).InMem().WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }).MustOpen() + t.Cleanup(db.Close) + err = db.Update(context.Background(), func(tx kv.RwTx) error { return pool.flush(tx, sendersCache) }) + require.NoError(t, err) + check(p2pReceived, TxSlots{}, "after_flush") //checkNotify(p2pReceived, TxSlots{}, "after_flush") - //p2, err := New(ch, nil) - //assert.NoError(err) - //s2 := NewSendersCache() - //err = db.View(context.Background(), func(tx kv.Tx) error { return p2.fromDB(tx, s2) }) - //require.NoError(t, err) - //check(txs2, TxSlots{}, "fromDB") + p2, err := New(ch, nil) + assert.NoError(err) + s2 := NewSendersCache() + err = db.View(context.Background(), func(tx kv.Tx) error { return p2.fromDB(tx, s2) }) + require.NoError(t, err) + check(txs2, TxSlots{}, "fromDB") //checkNotify(txs2, TxSlots{}, "fromDB") //fmt.Printf("bef: %d, %d, %d, %d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len(), len(pool.byHash)) //fmt.Printf("bef2: %d, %d, %d, %d\n", p2.pending.Len(), p2.baseFee.Len(), p2.queued.Len(), len(p2.byHash)) @@ -529,7 +533,7 @@ func FuzzOnNewBlocks11(f *testing.F) { // fmt.Printf("no: %d,%d,%d,%d\n", b.Tx.senderID, b.Tx.nonce, sc.nonce, sc2.nonce) // } //} - //assert.Equal(sendersCache.senderID, s2.senderID) + assert.Equal(sendersCache.senderID, s2.senderID) //assert.Equal(sendersCache.blockHeight.Load(), s2.blockHeight.Load()) //require.Equal(t, len(sendersCache.senderIDs), len(s2.senderIDs)) //require.Equal(t, len(sendersCache.senderInfo), len(s2.senderInfo)) diff --git a/txpool/types.go b/txpool/types.go index daea8a2b4..bfee66827 100644 --- a/txpool/types.go +++ b/txpool/types.go @@ -489,5 +489,5 @@ func bytesToUint64(buf []byte) (x uint64) { } func (tx *TxSlot) printDebug(prefix string) { - fmt.Printf("%s: senderID=%d,nonce=%d,hash=%x\n", prefix, tx.senderID, tx.nonce, tx.idHash) + fmt.Printf("%s: senderID=%d,nonce=%d,tip=%d,hash=%x\n", prefix, tx.senderID, tx.nonce, tx.tip, tx.idHash) }