This commit is contained in:
alex.sharov 2021-08-23 14:16:39 +07:00
parent 1eab615b8d
commit 224d851465
3 changed files with 48 additions and 31 deletions

View File

@ -469,7 +469,7 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) {
func (p *TxPool) printDebug(prefix string) {
fmt.Printf("%s.pool.byHash\n", prefix)
for _, j := range p.byHash {
fmt.Printf("\tsenderID=%d, nonce=%d\n", j.Tx.senderID, j.Tx.nonce)
fmt.Printf("\tsenderID=%d, nonce=%d, tip=%d\n", j.Tx.senderID, j.Tx.nonce, j.Tx.tip)
}
fmt.Printf("%s.pool.queues.len: %d,%d,%d\n", prefix, p.pending.Len(), p.baseFee.Len(), p.queued.Len())
}
@ -588,7 +588,18 @@ func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBas
}
changedSenders := map[uint64]*senderInfo{}
unsafeAddToPool(senders, newTxs, pending, PendingSubPool, byHash, func(i *metaTx, sender *senderInfo) {
unsafeAddToPool(senders, newTxs, pending, PendingSubPool, byHash, func(txn *metaTx) {
switch txn.currentSubPool {
case PendingSubPool:
pending.UnsafeRemove(txn)
case BaseFeeSubPool:
baseFee.UnsafeRemove(txn)
case QueuedSubPool:
queued.UnsafeRemove(txn)
default:
//already removed
}
}, func(i *metaTx, sender *senderInfo) {
changedSenders[i.Tx.senderID] = sender
if _, ok := localsHistory.Get(i.Tx.idHash); ok {
i.subPool |= IsLocal
@ -829,7 +840,18 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr
// they effective lose their priority over the "remote" transactions. In order to prevent that,
// somehow the fact that certain transactions were local, needs to be remembered for some
// time (up to some "immutability threshold").
unsafeAddToPool(senders, unwindTxs, pending, PendingSubPool, byHash, func(i *metaTx, sender *senderInfo) {
unsafeAddToPool(senders, unwindTxs, pending, PendingSubPool, byHash, func(txn *metaTx) {
switch txn.currentSubPool {
case PendingSubPool:
pending.UnsafeRemove(txn)
case BaseFeeSubPool:
baseFee.UnsafeRemove(txn)
case QueuedSubPool:
queued.UnsafeRemove(txn)
default:
//already removed
}
}, func(i *metaTx, sender *senderInfo) {
changedSenders[i.Tx.senderID] = sender
//fmt.Printf("add: %d,%d\n", i.Tx.senderID, i.Tx.nonce)
if _, ok := localsHistory.Get(i.Tx.idHash); ok {
@ -890,7 +912,6 @@ func removeMined(senders *SendersCache, minedTxs []*TxSlot, pending, baseFee, qu
return false
}
toDel = append(toDel, i)
//fmt.Printf("del2: %d\n", it.metaTx.Tx.nonce)
// del from sub-pool
switch it.metaTx.currentSubPool {
case PendingSubPool:
@ -916,7 +937,7 @@ func removeMined(senders *SendersCache, minedTxs []*TxSlot, pending, baseFee, qu
}
// unwind
func unsafeAddToPool(senders *SendersCache, newTxs TxSlots, to *SubPool, subPoolType SubPoolType, byHash map[string]*metaTx, beforeAdd func(tx *metaTx, sender *senderInfo)) {
func unsafeAddToPool(senders *SendersCache, newTxs TxSlots, to *SubPool, subPoolType SubPoolType, byHash map[string]*metaTx, discard func(tx *metaTx), beforeAdd func(tx *metaTx, sender *senderInfo)) {
for i, tx := range newTxs.txs {
if _, ok := byHash[string(tx.idHash[:])]; ok {
continue
@ -930,6 +951,7 @@ func unsafeAddToPool(senders *SendersCache, newTxs TxSlots, to *SubPool, subPool
continue
}
}
byHash[string(mt.Tx.idHash[:])] = mt
replaced := sender.txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{mt})
if replaced != nil {
@ -937,16 +959,7 @@ func unsafeAddToPool(senders *SendersCache, newTxs TxSlots, to *SubPool, subPool
if replacedMT.Tx.idHash != mt.Tx.idHash {
delete(byHash, string(replacedMT.Tx.idHash[:]))
}
switch replacedMT.currentSubPool {
case PendingSubPool:
to.UnsafeRemove(replacedMT)
case BaseFeeSubPool:
to.UnsafeRemove(replacedMT)
case QueuedSubPool:
to.UnsafeRemove(replacedMT)
default:
//already removed
}
discard(replacedMT)
}
beforeAdd(mt, sender)
to.UnsafeAdd(mt, subPoolType)
@ -1166,12 +1179,12 @@ func (p *SubPool) UnsafeAdd(i *metaTx, subPoolType SubPoolType) {
p.worst.Push(i)
p.best.Push(i)
}
func (p *SubPool) DebugPrint() {
func (p *SubPool) DebugPrint(prefix string) {
for i, it := range *p.best {
fmt.Printf("best: %d, %d, %d\n", i, it.subPool, it.bestIndex)
fmt.Printf("%s.best: %d, %d, %d\n", prefix, i, it.subPool, it.bestIndex)
}
for i, it := range *p.worst {
fmt.Printf("worst: %d, %d, %d\n", i, it.subPool, it.worstIndex)
fmt.Printf("%s.worst: %d, %d, %d\n", prefix, i, it.subPool, it.worstIndex)
}
}

View File

@ -5,14 +5,18 @@ package txpool
import (
"bytes"
"context"
"encoding/binary"
"testing"
"github.com/google/btree"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/rlp"
"github.com/ledgerwatch/log/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// https://blog.golang.org/fuzz-beta
@ -500,19 +504,19 @@ func FuzzOnNewBlocks11(f *testing.F) {
check(p2pReceived, TxSlots{}, "p2pmsg1")
checkNotify(p2pReceived, TxSlots{}, "p2pmsg1")
//db := mdbx.NewMDBX(log.New()).InMem().WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }).MustOpen()
//t.Cleanup(db.Close)
//err = db.Update(context.Background(), func(tx kv.RwTx) error { return pool.flush(tx, sendersCache) })
//require.NoError(t, err)
//check(p2pReceived, TxSlots{}, "after_flush")
db := mdbx.NewMDBX(log.New()).InMem().WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }).MustOpen()
t.Cleanup(db.Close)
err = db.Update(context.Background(), func(tx kv.RwTx) error { return pool.flush(tx, sendersCache) })
require.NoError(t, err)
check(p2pReceived, TxSlots{}, "after_flush")
//checkNotify(p2pReceived, TxSlots{}, "after_flush")
//p2, err := New(ch, nil)
//assert.NoError(err)
//s2 := NewSendersCache()
//err = db.View(context.Background(), func(tx kv.Tx) error { return p2.fromDB(tx, s2) })
//require.NoError(t, err)
//check(txs2, TxSlots{}, "fromDB")
p2, err := New(ch, nil)
assert.NoError(err)
s2 := NewSendersCache()
err = db.View(context.Background(), func(tx kv.Tx) error { return p2.fromDB(tx, s2) })
require.NoError(t, err)
check(txs2, TxSlots{}, "fromDB")
//checkNotify(txs2, TxSlots{}, "fromDB")
//fmt.Printf("bef: %d, %d, %d, %d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len(), len(pool.byHash))
//fmt.Printf("bef2: %d, %d, %d, %d\n", p2.pending.Len(), p2.baseFee.Len(), p2.queued.Len(), len(p2.byHash))
@ -529,7 +533,7 @@ func FuzzOnNewBlocks11(f *testing.F) {
// fmt.Printf("no: %d,%d,%d,%d\n", b.Tx.senderID, b.Tx.nonce, sc.nonce, sc2.nonce)
// }
//}
//assert.Equal(sendersCache.senderID, s2.senderID)
assert.Equal(sendersCache.senderID, s2.senderID)
//assert.Equal(sendersCache.blockHeight.Load(), s2.blockHeight.Load())
//require.Equal(t, len(sendersCache.senderIDs), len(s2.senderIDs))
//require.Equal(t, len(sendersCache.senderInfo), len(s2.senderInfo))

View File

@ -489,5 +489,5 @@ func bytesToUint64(buf []byte) (x uint64) {
}
func (tx *TxSlot) printDebug(prefix string) {
fmt.Printf("%s: senderID=%d,nonce=%d,hash=%x\n", prefix, tx.senderID, tx.nonce, tx.idHash)
fmt.Printf("%s: senderID=%d,nonce=%d,tip=%d,hash=%x\n", prefix, tx.senderID, tx.nonce, tx.tip, tx.idHash)
}