Pool: eviction works, use correct minBaseFee (#48)

This commit is contained in:
Alex Sharov 2021-08-30 19:26:29 +07:00 committed by GitHub
parent d474087511
commit 29bf077da4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 168 additions and 151 deletions

View File

@ -350,7 +350,7 @@ const (
PoolSenderIDToAdress = "PoolSenderIDToAddress" // sender_id_u64 -> sender_20bytes
PoolSender = "PoolSender" // sender_id_u64 -> nonce, balance
PoolTransaction = "PoolTransaction" // txHash -> sender_id_u64+blockNum_u64+tx_rlp
PoolStateEviction = "PoolStateEviction" // commit_id_u64 -> [sender_id_u64] - list of senders who had no transactions at this time, if after some time they still have no transactions - evict them.
PoolStateEviction = "PoolStateEviction" // commit_id_u64 -> roaring([sender_id_u64]) - list of senders who had no transactions at this time, if after some time they still have no transactions - evict them.
PoolInfo = "PoolInfo" // option_key -> option_value
)

View File

@ -435,7 +435,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient)
addr := gointerfaces.ConvertH160toAddress(change.Address)
diff[string(addr[:])] = senderInfo{nonce: nonce, balance: balance}
}
if err := f.pool.OnNewBlock(diff, unwindTxs, minedTxs, req.ProtocolBaseFee, 0, req.BlockHeight, gointerfaces.ConvertH256ToHash(req.BlockHash), f.senders); err != nil {
if err := f.pool.OnNewBlock(diff, unwindTxs, minedTxs, req.ProtocolBaseFee, req.BlockHeight, gointerfaces.ConvertH256ToHash(req.BlockHash), f.senders); err != nil {
log.Warn("onNewBlock", "err", err)
}
if f.wg != nil {

View File

@ -28,7 +28,7 @@ var _ Pool = &PoolMock{}
// IdHashKnownFunc: func(tx kv.Tx, hash []byte) (bool, error) {
// panic("mock out the IdHashKnown method")
// },
// OnNewBlockFunc: func(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error {
// OnNewBlockFunc: func(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error {
// panic("mock out the OnNewBlock method")
// },
// OnNewTxsFunc: func(ctx context.Context, coreDB kv.RoDB, newTxs TxSlots) error {
@ -54,7 +54,7 @@ type PoolMock struct {
IdHashKnownFunc func(tx kv.Tx, hash []byte) (bool, error)
// OnNewBlockFunc mocks the OnNewBlock method.
OnNewBlockFunc func(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error
OnNewBlockFunc func(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error
// OnNewTxsFunc mocks the OnNewTxs method.
OnNewTxsFunc func(ctx context.Context, coreDB kv.RoDB, newTxs TxSlots) error
@ -91,10 +91,8 @@ type PoolMock struct {
UnwindTxs TxSlots
// MinedTxs is the minedTxs argument value.
MinedTxs TxSlots
// ProtocolBaseFee is the protocolBaseFee argument value.
ProtocolBaseFee uint64
// PendingBaseFee is the pendingBaseFee argument value.
PendingBaseFee uint64
// BaseFee is the baseFee argument value.
BaseFee uint64
// BlockHeight is the blockHeight argument value.
BlockHeight uint64
// BlockHash is the blockHash argument value.
@ -233,25 +231,23 @@ func (mock *PoolMock) IdHashKnownCalls() []struct {
}
// OnNewBlock calls OnNewBlockFunc.
func (mock *PoolMock) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error {
func (mock *PoolMock) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error {
callInfo := struct {
StateChanges map[string]senderInfo
UnwindTxs TxSlots
MinedTxs TxSlots
ProtocolBaseFee uint64
PendingBaseFee uint64
BlockHeight uint64
BlockHash [32]byte
Senders *SendersCache
StateChanges map[string]senderInfo
UnwindTxs TxSlots
MinedTxs TxSlots
BaseFee uint64
BlockHeight uint64
BlockHash [32]byte
Senders *SendersCache
}{
StateChanges: stateChanges,
UnwindTxs: unwindTxs,
MinedTxs: minedTxs,
ProtocolBaseFee: protocolBaseFee,
PendingBaseFee: pendingBaseFee,
BlockHeight: blockHeight,
BlockHash: blockHash,
Senders: senders,
StateChanges: stateChanges,
UnwindTxs: unwindTxs,
MinedTxs: minedTxs,
BaseFee: baseFee,
BlockHeight: blockHeight,
BlockHash: blockHash,
Senders: senders,
}
mock.lockOnNewBlock.Lock()
mock.calls.OnNewBlock = append(mock.calls.OnNewBlock, callInfo)
@ -262,31 +258,29 @@ func (mock *PoolMock) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs T
)
return errOut
}
return mock.OnNewBlockFunc(stateChanges, unwindTxs, minedTxs, protocolBaseFee, pendingBaseFee, blockHeight, blockHash, senders)
return mock.OnNewBlockFunc(stateChanges, unwindTxs, minedTxs, baseFee, blockHeight, blockHash, senders)
}
// OnNewBlockCalls gets all the calls that were made to OnNewBlock.
// Check the length with:
// len(mockedPool.OnNewBlockCalls())
func (mock *PoolMock) OnNewBlockCalls() []struct {
StateChanges map[string]senderInfo
UnwindTxs TxSlots
MinedTxs TxSlots
ProtocolBaseFee uint64
PendingBaseFee uint64
BlockHeight uint64
BlockHash [32]byte
Senders *SendersCache
StateChanges map[string]senderInfo
UnwindTxs TxSlots
MinedTxs TxSlots
BaseFee uint64
BlockHeight uint64
BlockHash [32]byte
Senders *SendersCache
} {
var calls []struct {
StateChanges map[string]senderInfo
UnwindTxs TxSlots
MinedTxs TxSlots
ProtocolBaseFee uint64
PendingBaseFee uint64
BlockHeight uint64
BlockHash [32]byte
Senders *SendersCache
StateChanges map[string]senderInfo
UnwindTxs TxSlots
MinedTxs TxSlots
BaseFee uint64
BlockHeight uint64
BlockHash [32]byte
Senders *SendersCache
}
mock.lockOnNewBlock.RLock()
calls = mock.calls.OnNewBlock

View File

@ -23,6 +23,8 @@ import (
"encoding/binary"
"fmt"
"math"
"runtime"
"sort"
"sync"
"time"
@ -69,7 +71,7 @@ type Pool interface {
Started() bool
GetRlp(tx kv.Tx, hash []byte) ([]byte, error)
OnNewTxs(ctx context.Context, coreDB kv.RoDB, newTxs TxSlots) error
OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error
OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, baseFee, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error
AddNewGoodPeer(peerID PeerID)
}
@ -447,13 +449,17 @@ func (sc *SendersCache) syncMissedStateDiff(ctx context.Context, tx kv.RwTx, cor
return nil
}
func calcProtocolBaseFee(baseFee uint64) uint64 {
return 7
}
// TxPool - holds all pool-related data structures and lock-based tiny methods
// most of logic implemented by pure tests-friendly functions
type TxPool struct {
lock *sync.RWMutex
protocolBaseFee atomic.Uint64
pendingBaseFee atomic.Uint64
currentBaseFee atomic.Uint64
senderID uint64
byHash map[string]*metaTx // tx_hash => tx
@ -550,7 +556,7 @@ func (p *TxPool) printDebug(prefix string) {
}
}
func (p *TxPool) logStats(tx kv.Tx) error {
protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load()
protocolBaseFee, currentBaseFee := p.protocolBaseFee.Load(), p.currentBaseFee.Load()
p.lock.RLock()
defer p.lock.RUnlock()
@ -562,10 +568,14 @@ func (p *TxPool) logStats(tx kv.Tx) error {
if err != nil {
return err
}
log.Info(fmt.Sprintf("baseFee: %dm->%dm; queuesSize: pending=%d/%d, baseFee=%d/%d, queued=%d/%d; sendersCache: id=%d+%d,info=%d+%d",
protocolBaseFee/1_000_000, pendingBaseFee/1_000_000,
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info(fmt.Sprintf("baseFee: %d,%dm; queuesSize: pending=%d/%d, baseFee=%d/%d, queued=%d/%d; sendersCache: id=%d+%d,info=%d+%d, alloc=%dMb, sys=%dMb\n",
protocolBaseFee, currentBaseFee/1_000_000,
p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.queued.Len(), QueuedSubPoolLimit,
idsInMem, idsInDb, infoInMem, infoInDb,
m.Alloc/1024/1024, m.Sys/1024/1024,
))
return nil
}
@ -657,11 +667,11 @@ func (p *TxPool) OnNewTxs(ctx context.Context, coreDB kv.RoDB, newTxs TxSlots) e
return err
}
protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load()
if protocolBaseFee == 0 || pendingBaseFee == 0 {
return fmt.Errorf("non-zero base fee: %d,%d", protocolBaseFee, pendingBaseFee)
protocolBaseFee, currentBaseFee := p.protocolBaseFee.Load(), p.currentBaseFee.Load()
if protocolBaseFee == 0 || currentBaseFee == 0 {
return fmt.Errorf("non-zero base fee: %d,%d", protocolBaseFee, currentBaseFee)
}
if err := onNewTxs(tx, p.senders, newTxs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.txNonce2Tx, p.byHash, p.discardLocked); err != nil {
if err := onNewTxs(tx, p.senders, newTxs, protocolBaseFee, currentBaseFee, p.pending, p.baseFee, p.queued, p.txNonce2Tx, p.byHash, p.discardLocked); err != nil {
return err
}
notifyNewTxs := make(Hashes, 0, 32*len(newTxs.txs))
@ -682,7 +692,7 @@ func (p *TxPool) OnNewTxs(ctx context.Context, coreDB kv.RoDB, newTxs TxSlots) e
//log.Info("on new txs", "in", time.Since(t))
return nil
}
func onNewTxs(tx kv.Tx, senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBaseFee uint64, pending, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error {
func onNewTxs(tx kv.Tx, senders *SendersCache, newTxs TxSlots, protocolBaseFee, currentBaseFee uint64, pending, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error {
for i := range newTxs.txs {
if newTxs.txs[i].senderID == 0 {
return fmt.Errorf("senderID can't be zero")
@ -695,7 +705,7 @@ func onNewTxs(tx kv.Tx, senders *SendersCache, newTxs TxSlots, protocolBaseFee,
if err != nil {
return err
}
onSenderChange(id, sender, byNonce, protocolBaseFee, pendingBaseFee, discard)
onSenderChange(id, sender, byNonce, protocolBaseFee, currentBaseFee)
}
pending.EnforceInvariants()
@ -707,20 +717,15 @@ func onNewTxs(tx kv.Tx, senders *SendersCache, newTxs TxSlots, protocolBaseFee,
return nil
}
func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uint64) {
p.protocolBaseFee.Store(protocolBaseFee)
hasNewVal := pendingBaseFee > 0
if pendingBaseFee < protocolBaseFee {
pendingBaseFee = protocolBaseFee
hasNewVal = true
func (p *TxPool) setBaseFee(baseFee uint64) (uint64, uint64) {
if baseFee > 0 {
p.protocolBaseFee.Store(calcProtocolBaseFee(baseFee))
p.currentBaseFee.Store(baseFee)
}
if hasNewVal {
p.pendingBaseFee.Store(pendingBaseFee)
}
return protocolBaseFee, p.pendingBaseFee.Load()
return p.protocolBaseFee.Load(), p.currentBaseFee.Load()
}
func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error {
func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, baseFee, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error {
defer onNewBlockTimer.UpdateDuration(time.Now())
p.lock.Lock()
defer p.lock.Unlock()
@ -731,11 +736,11 @@ func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, mined
return err
}
defer tx.Rollback()
protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee)
protocolBaseFee, baseFee := p.setBaseFee(baseFee)
if err := senders.onNewBlock(tx, stateChanges, unwindTxs, minedTxs, blockHeight, blockHash); err != nil {
return err
}
//log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight)
//log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "baseFee", baseFee, "blockHeight", blockHeight)
if err := unwindTxs.Valid(); err != nil {
return err
}
@ -743,7 +748,7 @@ func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, mined
return err
}
if err := onNewBlock(tx, senders, unwindTxs, minedTxs.txs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.txNonce2Tx, p.byHash, p.discardLocked); err != nil {
if err := onNewBlock(tx, senders, unwindTxs, minedTxs.txs, protocolBaseFee, baseFee, p.pending, p.baseFee, p.queued, p.txNonce2Tx, p.byHash, p.discardLocked); err != nil {
return err
}
@ -804,7 +809,7 @@ func onNewBlock(tx kv.Tx, senders *SendersCache, unwindTxs TxSlots, minedTxs []*
if err != nil {
return err
}
onSenderChange(id, sender, byNonce, protocolBaseFee, pendingBaseFee, discard)
onSenderChange(id, sender, byNonce, protocolBaseFee, pendingBaseFee)
}
pending.EnforceInvariants()
@ -909,7 +914,7 @@ func unsafeAddToPendingPool(byNonce *ByNonce, newTxs TxSlots, pending, baseFee,
return changedSenders
}
func onSenderChange(senderID uint64, sender *senderInfo, byNonce *ByNonce, protocolBaseFee, pendingBaseFee uint64, discard func(*metaTx)) {
func onSenderChange(senderID uint64, sender *senderInfo, byNonce *ByNonce, protocolBaseFee, currentBaseFee uint64) {
noGapsNonce := sender.nonce + 1
cumulativeRequiredBalance := uint256.NewInt(0)
minFeeCap := uint64(math.MaxUint64)
@ -921,10 +926,10 @@ func onSenderChange(senderID uint64, sender *senderInfo, byNonce *ByNonce, proto
needBalance.Add(needBalance, &mt.Tx.value)
minFeeCap = min(minFeeCap, mt.Tx.feeCap)
minTip = min(minTip, mt.Tx.tip)
if pendingBaseFee >= minFeeCap {
if currentBaseFee >= minFeeCap {
mt.effectiveTip = minTip
} else {
mt.effectiveTip = minFeeCap - pendingBaseFee
mt.effectiveTip = minFeeCap - currentBaseFee
}
// 1. Minimum fee requirement. Set to 1 if feeCap of the transaction is no less than in-protocol
// parameter of minimal base fee. Set to 0 if feeCap is less than minimum base fee, which means
@ -965,7 +970,7 @@ func onSenderChange(senderID uint64, sender *senderInfo, byNonce *ByNonce, proto
// 4. Dynamic fee requirement. Set to 1 if feeCap of the transaction is no less than
// baseFee of the currently pending block. Set to 0 otherwise.
mt.subPool &^= EnoughFeeCapBlock
if mt.Tx.feeCap >= pendingBaseFee {
if mt.Tx.feeCap >= currentBaseFee {
mt.subPool |= EnoughFeeCapBlock
}
@ -1222,13 +1227,13 @@ func BroadcastLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, s
if err := db.View(ctx, func(tx kv.Tx) error { return p.logStats(tx) }); err != nil {
log.Error("log stats", "err", err)
}
if ASSERT {
go func() {
if err := p.forceCheckState(ctx, db, coreDB); err != nil {
log.Error("forceCheckState", "err", err)
}
}()
}
//if ASSERT {
// go func() {
// if err := p.forceCheckState(ctx, db, coreDB); err != nil {
// log.Error("forceCheckState", "err", err)
// }
// }()
//}
logEvery := time.NewTicker(p.cfg.logEvery)
defer logEvery.Stop()
@ -1285,6 +1290,7 @@ func BroadcastLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, s
}
}
//nolint
func coreProgress(coreTx kv.Tx) (uint64, error) {
stageProgress, err := coreTx.GetOne(kv.SyncStageProgress, []byte("Finish"))
if err != nil {
@ -1293,6 +1299,7 @@ func coreProgress(coreTx kv.Tx) (uint64, error) {
return binary.BigEndian.Uint64(stageProgress), err
}
//nolint
func (p *TxPool) forceCheckState(ctx context.Context, db, coreDB kv.RoDB) error {
for {
if err := db.View(ctx, func(tx kv.Tx) error {
@ -1356,6 +1363,16 @@ func (p *TxPool) flush(db kv.RwDB) (evicted, written uint64, err error) {
return evicted, written, nil
}
func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) {
if ASSERT {
c1, _ := tx.Cursor(kv.PoolSenderID)
c2, _ := tx.Cursor(kv.PoolSenderIDToAdress)
count1, _ := c1.Count()
count2, _ := c2.Count()
if count1 != count2 {
fmt.Printf("counts: %d, %d\n", count1, count2)
panic(1)
}
}
sendersWithoutTransactions := roaring64.New()
for i := 0; i < len(p.deletedTxs); i++ {
if p.txNonce2Tx.count(p.deletedTxs[i].Tx.senderID) == 0 {
@ -1430,7 +1447,6 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) {
}
metaTx.Tx.rlp = nil
}
if ASSERT {
txs := TxSlots{}
parseCtx := NewTxParseContext()
@ -1466,7 +1482,7 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) {
if err := tx.Put(kv.PoolInfo, PoolProtocolBaseFeeKey, encID); err != nil {
return evicted, err
}
binary.BigEndian.PutUint64(encID, p.pendingBaseFee.Load())
binary.BigEndian.PutUint64(encID, p.currentBaseFee.Load())
if err := tx.Put(kv.PoolInfo, PoolPendingBaseFeeKey, encID); err != nil {
return evicted, err
}
@ -1475,6 +1491,7 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) {
if err != nil {
return evicted, err
}
if ASSERT {
_ = tx.ForEach(kv.PoolSenderIDToAdress, nil, func(idBytes, addr []byte) error {
found := false
@ -1530,7 +1547,6 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) {
// DB will stay consitant but some in-memory structures may be alread cleaned, and retry will not work
// failed write transaction must not create side-effects
p.deletedTxs = p.deletedTxs[:0]
return evicted, nil
}
@ -1538,7 +1554,8 @@ func (sc *SendersCache) flush(tx kv.RwTx, byNonce *ByNonce, sendersWithoutTransa
sc.lock.Lock()
defer sc.lock.Unlock()
sc.commitID++
//var justDeleted, justInserted []uint64
var justDeleted, justInserted []uint64
encID := make([]byte, 8)
for addr, id := range sc.senderIDs {
binary.BigEndian.PutUint64(encID, id)
@ -1556,16 +1573,17 @@ func (sc *SendersCache) flush(tx kv.RwTx, byNonce *ByNonce, sendersWithoutTransa
if err := tx.Put(kv.PoolSenderIDToAdress, encID, []byte(addr)); err != nil {
return evicted, err
}
//if ASSERT {
// justInserted = append(justInserted, id)
//}
if ASSERT {
justInserted = append(justInserted, id)
}
if byNonce.count(id) == 0 {
sendersWithoutTransactions.Add(id)
}
}
//if ASSERT {
// sort.Slice(justInserted, func(i, j int) bool { return justInserted[i] < justInserted[j] })
//}
if ASSERT {
sort.Slice(justInserted, func(i, j int) bool { return justInserted[i] < justInserted[j] })
}
v := make([]byte, 8, 8+32)
for id, info := range sc.senderInfo {
@ -1575,35 +1593,15 @@ func (sc *SendersCache) flush(tx kv.RwTx, byNonce *ByNonce, sendersWithoutTransa
binary.BigEndian.PutUint64(encID, id)
binary.BigEndian.PutUint64(v, info.nonce)
v = append(v[:8], info.balance.Bytes()...)
//TODO: check that nothing changed
if err := tx.Put(kv.PoolSender, encID, v); err != nil {
enc, err := tx.GetOne(kv.PoolSender, encID)
if err != nil {
return evicted, err
}
}
//fmt.Printf("justDeleted:%d, justInserted:%d\n", justDeleted, justInserted)
if ASSERT {
{
duplicates := map[string]uint64{}
_ = tx.ForPrefix(kv.PoolSenderIDToAdress, nil, func(k, v []byte) error {
id, ok := duplicates[string(v)]
if ok {
fmt.Printf("duplicate: %d,%d,%x\n", id, binary.BigEndian.Uint64(k), string(v))
panic(1)
}
return nil
})
if bytes.Equal(enc, v) {
continue
}
{
duplicates := map[uint64]string{}
_ = tx.ForPrefix(kv.PoolSenderIDToAdress, nil, func(k, v []byte) error {
id := binary.BigEndian.Uint64(v)
addr, ok := duplicates[id]
if ok {
fmt.Printf("duplicate: %x,%x,%d\n", addr, k, binary.BigEndian.Uint64(v))
panic(1)
}
return nil
})
if err := tx.Put(kv.PoolSender, encID, v); err != nil {
return evicted, err
}
}
@ -1640,19 +1638,26 @@ func (sc *SendersCache) flush(tx kv.RwTx, byNonce *ByNonce, sendersWithoutTransa
return 0, err
}
for _, senderID := range ids.ToArray() {
if _, ok := sc.senderInfo[senderID]; ok {
continue
}
//if _, ok := sc.senderInfo[senderID]; ok {
// continue
//}
if byNonce.count(senderID) > 0 {
continue
}
addr, err := tx.GetOne(kv.PoolSenderID, encID)
binary.BigEndian.PutUint64(encID, senderID)
addr, err := tx.GetOne(kv.PoolSenderIDToAdress, encID)
if err != nil {
return evicted, err
}
if _, ok := sc.senderIDs[string(addr)]; ok {
if addr == nil {
continue
}
if len(addr) != 20 {
panic(22)
}
//if _, ok := sc.senderIDs[string(addr)]; ok {
// continue
//}
if err := tx.Delete(kv.PoolSenderID, addr, nil); err != nil {
return evicted, err
}
@ -1663,15 +1668,17 @@ func (sc *SendersCache) flush(tx kv.RwTx, byNonce *ByNonce, sendersWithoutTransa
return evicted, err
}
evicted++
//if ASSERT {
// justDeleted = append(justDeleted, senderID)
//}
if ASSERT {
justDeleted = append(justDeleted, senderID) //nolint
}
}
if err := c.DeleteCurrent(); err != nil {
return evicted, err
}
}
//fmt.Printf("justDeleted:%d, justInserted:%d\n", justDeleted, justInserted)
if ASSERT {
_ = tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error {
//id := binary.BigEndian.Uint64(v[:8])
@ -1805,7 +1812,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error {
return err
}
var protocolBaseFee, pendingBaseFee uint64
var protocolBaseFee, currentBaseFee uint64
{
v, err := tx.GetOne(kv.PoolInfo, PoolProtocolBaseFeeKey)
if err != nil {
@ -1821,7 +1828,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error {
return err
}
if len(v) > 0 {
pendingBaseFee = binary.BigEndian.Uint64(v)
currentBaseFee = binary.BigEndian.Uint64(v)
}
}
cacheMisses, err := p.senders.onNewTxs(tx, txs)
@ -1833,10 +1840,10 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error {
return err
}
}
if err := onNewTxs(tx, p.senders, txs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.txNonce2Tx, p.byHash, p.discardLocked); err != nil {
if err := onNewTxs(tx, p.senders, txs, protocolBaseFee, currentBaseFee, p.pending, p.baseFee, p.queued, p.txNonce2Tx, p.byHash, p.discardLocked); err != nil {
return err
}
p.pendingBaseFee.Store(pendingBaseFee)
p.currentBaseFee.Store(currentBaseFee)
p.protocolBaseFee.Store(protocolBaseFee)
return nil

View File

@ -274,20 +274,19 @@ func splitDataset(in TxSlots) (TxSlots, TxSlots, TxSlots, TxSlots) {
return p1, p2, p3, p4
}
func FuzzOnNewBlocks11(f *testing.F) {
func FuzzOnNewBlocks12(f *testing.F) {
var u64 = [1 * 4]byte{1}
var sender = [1 + 1 + 1]byte{1}
f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 1, 2)
f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 3, 4)
f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 10, 12)
f.Fuzz(func(t *testing.T, txNonce, values, tips, feeCap, sender []byte, protocolBaseFee1, pendingBaseFee1 uint8) {
f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 12)
f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 14)
f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 123)
f.Fuzz(func(t *testing.T, txNonce, values, tips, feeCap, sender []byte, currentBaseFee1 uint8) {
//t.Parallel()
protocolBaseFee, pendingBaseFee := uint64(protocolBaseFee1%16+1), uint64(pendingBaseFee1%8+1)
if protocolBaseFee == 0 || pendingBaseFee == 0 {
currentBaseFee := uint64(currentBaseFee1%16 + 1)
if currentBaseFee == 0 {
t.Skip()
}
pendingBaseFee += protocolBaseFee
if len(sender) < 1+1+1 {
t.Skip()
}
@ -338,10 +337,10 @@ func FuzzOnNewBlocks11(f *testing.F) {
//assert.True(tx.SenderHasEnoughBalance)
}
if tx.subPool&EnoughFeeCapProtocol > 0 {
assert.LessOrEqual(protocolBaseFee, tx.Tx.feeCap, msg)
assert.LessOrEqual(calcProtocolBaseFee(currentBaseFee), tx.Tx.feeCap, msg)
}
if tx.subPool&EnoughFeeCapBlock > 0 {
assert.LessOrEqual(pendingBaseFee, tx.Tx.feeCap, msg)
assert.LessOrEqual(currentBaseFee, tx.Tx.feeCap, msg)
}
// side data structures must have all txs
@ -377,10 +376,10 @@ func FuzzOnNewBlocks11(f *testing.F) {
//assert.True(tx.SenderHasEnoughBalance, msg)
}
if tx.subPool&EnoughFeeCapProtocol > 0 {
assert.LessOrEqual(protocolBaseFee, tx.Tx.feeCap, msg)
assert.LessOrEqual(calcProtocolBaseFee(currentBaseFee), tx.Tx.feeCap, msg)
}
if tx.subPool&EnoughFeeCapBlock > 0 {
assert.LessOrEqual(pendingBaseFee, tx.Tx.feeCap, msg)
assert.LessOrEqual(currentBaseFee, tx.Tx.feeCap, msg)
}
assert.True(pool.txNonce2Tx.has(tx), msg)
@ -404,10 +403,10 @@ func FuzzOnNewBlocks11(f *testing.F) {
//assert.True(tx.SenderHasEnoughBalance, msg)
}
if tx.subPool&EnoughFeeCapProtocol > 0 {
assert.LessOrEqual(protocolBaseFee, tx.Tx.feeCap, msg)
assert.LessOrEqual(calcProtocolBaseFee(currentBaseFee), tx.Tx.feeCap, msg)
}
if tx.subPool&EnoughFeeCapBlock > 0 {
assert.LessOrEqual(pendingBaseFee, tx.Tx.feeCap, msg)
assert.LessOrEqual(currentBaseFee, tx.Tx.feeCap, msg)
}
assert.True(pool.txNonce2Tx.has(tx), "%s, %d, %x", msg, tx.Tx.nonce, tx.Tx.idHash)
@ -478,28 +477,35 @@ func FuzzOnNewBlocks11(f *testing.F) {
}
prevTotal = pending.Len() + baseFee.Len() + queued.Len()
}
checkDB := func(tx kv.Tx) {
c1, _ := tx.Cursor(kv.PoolSenderID)
c2, _ := tx.Cursor(kv.PoolSenderIDToAdress)
count1, _ := c1.Count()
count2, _ := c2.Count()
assert.Equal(count1, count2)
}
//fmt.Printf("-------\n")
// go to first fork
//fmt.Printf("ll1: %d,%d,%d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len())
txs1, txs2, p2pReceived, txs3 := splitDataset(txs)
err = pool.OnNewBlock(map[string]senderInfo{}, txs1, TxSlots{}, protocolBaseFee, pendingBaseFee, 1, [32]byte{}, sendersCache)
err = pool.OnNewBlock(map[string]senderInfo{}, txs1, TxSlots{}, currentBaseFee, 1, [32]byte{}, sendersCache)
assert.NoError(err)
check(txs1, TxSlots{}, "fork1")
checkNotify(txs1, TxSlots{}, "fork1")
_, _, _ = p2pReceived, txs2, txs3
err = pool.OnNewBlock(map[string]senderInfo{}, TxSlots{}, txs2, protocolBaseFee, pendingBaseFee, 1, [32]byte{}, sendersCache)
err = pool.OnNewBlock(map[string]senderInfo{}, TxSlots{}, txs2, currentBaseFee, 1, [32]byte{}, sendersCache)
check(TxSlots{}, txs2, "fork1 mined")
checkNotify(TxSlots{}, txs2, "fork1 mined")
// unwind everything and switch to new fork (need unwind mined now)
err = pool.OnNewBlock(map[string]senderInfo{}, txs2, TxSlots{}, protocolBaseFee, pendingBaseFee, 2, [32]byte{}, sendersCache)
err = pool.OnNewBlock(map[string]senderInfo{}, txs2, TxSlots{}, currentBaseFee, 2, [32]byte{}, sendersCache)
assert.NoError(err)
check(txs2, TxSlots{}, "fork2")
checkNotify(txs2, TxSlots{}, "fork2")
err = pool.OnNewBlock(map[string]senderInfo{}, TxSlots{}, txs3, protocolBaseFee, pendingBaseFee, 2, [32]byte{}, sendersCache)
err = pool.OnNewBlock(map[string]senderInfo{}, TxSlots{}, txs3, currentBaseFee, 2, [32]byte{}, sendersCache)
assert.NoError(err)
check(TxSlots{}, txs3, "fork2 mined")
checkNotify(TxSlots{}, txs3, "fork2 mined")
@ -520,6 +526,7 @@ func FuzzOnNewBlocks11(f *testing.F) {
_, err = pool.flushLocked(tx) // we don't test eviction here, because dedicated test exists
require.NoError(err)
check(p2pReceived, TxSlots{}, "after_flush")
checkDB(tx)
//checkNotify(p2pReceived, TxSlots{}, "after_flush")
s2 := NewSendersCache()
@ -527,6 +534,7 @@ func FuzzOnNewBlocks11(f *testing.F) {
assert.NoError(err)
err = p2.fromDB(context.Background(), tx, nil)
require.NoError(err)
checkDB(tx)
for _, txn := range p2.byHash {
assert.Nil(txn.Tx.rlp)
}
@ -547,7 +555,7 @@ func FuzzOnNewBlocks11(f *testing.F) {
assert.Equal(pool.pending.Len(), p2.pending.Len())
assert.Equal(pool.baseFee.Len(), p2.baseFee.Len())
assert.Equal(pool.queued.Len(), p2.queued.Len())
assert.Equal(pool.pendingBaseFee.Load(), p2.pendingBaseFee.Load())
assert.Equal(pool.currentBaseFee.Load(), p2.currentBaseFee.Load())
assert.Equal(pool.protocolBaseFee.Load(), p2.protocolBaseFee.Load())
})

View File

@ -17,6 +17,7 @@
package txpool
import (
"fmt"
"testing"
"github.com/RoaringBitmap/roaring/roaring64"
@ -29,12 +30,15 @@ import (
func TestSenders(t *testing.T) {
t.Run("evict_all_on_next_round", func(t *testing.T) {
senders, require := NewSendersCache(), require.New(t)
senders.senderInfo[1] = newSenderInfo(1, *uint256.NewInt(1))
senders.senderInfo[2] = newSenderInfo(1, *uint256.NewInt(1))
_, tx := memdb.NewTestPoolTx(t)
byNonce := &ByNonce{btree.New(16)}
changed := roaring64.New()
senders.senderIDs[fmt.Sprintf("%020x", 1)] = 1
senders.senderInfo[1] = newSenderInfo(1, *uint256.NewInt(1))
senders.senderIDs[fmt.Sprintf("%020x", 2)] = 2
senders.senderInfo[2] = newSenderInfo(1, *uint256.NewInt(1))
changed.AddMany([]uint64{1, 2})
evicted, err := senders.flush(tx, byNonce, changed, 1)
require.NoError(err)
@ -43,15 +47,18 @@ func TestSenders(t *testing.T) {
changed.Clear()
evicted, err = senders.flush(tx, byNonce, changed, 1)
require.NoError(err)
require.Equal(2, int(evicted))
})
t.Run("do_not_evict_if_used_in_current_round", func(t *testing.T) {
t.Run("evict_even_if_used_in_current_round_but_no_txs", func(t *testing.T) {
senders, require := NewSendersCache(), require.New(t)
_, tx := memdb.NewTestPoolTx(t)
byNonce := &ByNonce{btree.New(16)}
senders.senderInfo[1] = newSenderInfo(1, *uint256.NewInt(1))
senders.senderIDs[fmt.Sprintf("%020x", 1)] = 1
senders.senderInfo[2] = newSenderInfo(1, *uint256.NewInt(1))
senders.senderIDs[fmt.Sprintf("%020x", 2)] = 2
changed := roaring64.New()
changed.AddMany([]uint64{1, 2})
@ -60,11 +67,12 @@ func TestSenders(t *testing.T) {
require.Zero(evicted)
senders.senderInfo[1] = newSenderInfo(1, *uint256.NewInt(1)) // means used in current round, but still has 0 transactions
senders.senderIDs[fmt.Sprintf("%020x", 1)] = 1
changed.Clear()
changed.AddMany([]uint64{1})
evicted, err = senders.flush(tx, byNonce, changed, 1)
require.NoError(err)
require.Equal(1, int(evicted))
require.Equal(2, int(evicted))
})
}