diff --git a/kv/tables.go b/kv/tables.go index 848a9dae8..4c8fe70b1 100644 --- a/kv/tables.go +++ b/kv/tables.go @@ -350,7 +350,7 @@ const ( PoolSenderIDToAdress = "PoolSenderIDToAddress" // sender_id_u64 -> sender_20bytes PoolSender = "PoolSender" // sender_id_u64 -> nonce, balance PoolTransaction = "PoolTransaction" // txHash -> sender_id_u64+blockNum_u64+tx_rlp - PoolStateEviction = "PoolStateEviction" // commit_id_u64 -> [sender_id_u64] - list of senders who had no transactions at this time, if after some time they still have no transactions - evict them. + PoolStateEviction = "PoolStateEviction" // commit_id_u64 -> roaring([sender_id_u64]) - list of senders who had no transactions at this time, if after some time they still have no transactions - evict them. PoolInfo = "PoolInfo" // option_key -> option_value ) diff --git a/txpool/fetch.go b/txpool/fetch.go index 97b73a66d..9e79c94ae 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -435,7 +435,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) addr := gointerfaces.ConvertH160toAddress(change.Address) diff[string(addr[:])] = senderInfo{nonce: nonce, balance: balance} } - if err := f.pool.OnNewBlock(diff, unwindTxs, minedTxs, req.ProtocolBaseFee, 0, req.BlockHeight, gointerfaces.ConvertH256ToHash(req.BlockHash), f.senders); err != nil { + if err := f.pool.OnNewBlock(diff, unwindTxs, minedTxs, req.ProtocolBaseFee, req.BlockHeight, gointerfaces.ConvertH256ToHash(req.BlockHash), f.senders); err != nil { log.Warn("onNewBlock", "err", err) } if f.wg != nil { diff --git a/txpool/mocks_test.go b/txpool/mocks_test.go index 6b2a15994..3f81841c3 100644 --- a/txpool/mocks_test.go +++ b/txpool/mocks_test.go @@ -28,7 +28,7 @@ var _ Pool = &PoolMock{} // IdHashKnownFunc: func(tx kv.Tx, hash []byte) (bool, error) { // panic("mock out the IdHashKnown method") // }, -// OnNewBlockFunc: func(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error { +// OnNewBlockFunc: func(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error { // panic("mock out the OnNewBlock method") // }, // OnNewTxsFunc: func(ctx context.Context, coreDB kv.RoDB, newTxs TxSlots) error { @@ -54,7 +54,7 @@ type PoolMock struct { IdHashKnownFunc func(tx kv.Tx, hash []byte) (bool, error) // OnNewBlockFunc mocks the OnNewBlock method. - OnNewBlockFunc func(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error + OnNewBlockFunc func(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error // OnNewTxsFunc mocks the OnNewTxs method. OnNewTxsFunc func(ctx context.Context, coreDB kv.RoDB, newTxs TxSlots) error @@ -91,10 +91,8 @@ type PoolMock struct { UnwindTxs TxSlots // MinedTxs is the minedTxs argument value. MinedTxs TxSlots - // ProtocolBaseFee is the protocolBaseFee argument value. - ProtocolBaseFee uint64 - // PendingBaseFee is the pendingBaseFee argument value. - PendingBaseFee uint64 + // BaseFee is the baseFee argument value. + BaseFee uint64 // BlockHeight is the blockHeight argument value. BlockHeight uint64 // BlockHash is the blockHash argument value. @@ -233,25 +231,23 @@ func (mock *PoolMock) IdHashKnownCalls() []struct { } // OnNewBlock calls OnNewBlockFunc. -func (mock *PoolMock) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, protocolBaseFee uint64, pendingBaseFee uint64, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error { +func (mock *PoolMock) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error { callInfo := struct { - StateChanges map[string]senderInfo - UnwindTxs TxSlots - MinedTxs TxSlots - ProtocolBaseFee uint64 - PendingBaseFee uint64 - BlockHeight uint64 - BlockHash [32]byte - Senders *SendersCache + StateChanges map[string]senderInfo + UnwindTxs TxSlots + MinedTxs TxSlots + BaseFee uint64 + BlockHeight uint64 + BlockHash [32]byte + Senders *SendersCache }{ - StateChanges: stateChanges, - UnwindTxs: unwindTxs, - MinedTxs: minedTxs, - ProtocolBaseFee: protocolBaseFee, - PendingBaseFee: pendingBaseFee, - BlockHeight: blockHeight, - BlockHash: blockHash, - Senders: senders, + StateChanges: stateChanges, + UnwindTxs: unwindTxs, + MinedTxs: minedTxs, + BaseFee: baseFee, + BlockHeight: blockHeight, + BlockHash: blockHash, + Senders: senders, } mock.lockOnNewBlock.Lock() mock.calls.OnNewBlock = append(mock.calls.OnNewBlock, callInfo) @@ -262,31 +258,29 @@ func (mock *PoolMock) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs T ) return errOut } - return mock.OnNewBlockFunc(stateChanges, unwindTxs, minedTxs, protocolBaseFee, pendingBaseFee, blockHeight, blockHash, senders) + return mock.OnNewBlockFunc(stateChanges, unwindTxs, minedTxs, baseFee, blockHeight, blockHash, senders) } // OnNewBlockCalls gets all the calls that were made to OnNewBlock. // Check the length with: // len(mockedPool.OnNewBlockCalls()) func (mock *PoolMock) OnNewBlockCalls() []struct { - StateChanges map[string]senderInfo - UnwindTxs TxSlots - MinedTxs TxSlots - ProtocolBaseFee uint64 - PendingBaseFee uint64 - BlockHeight uint64 - BlockHash [32]byte - Senders *SendersCache + StateChanges map[string]senderInfo + UnwindTxs TxSlots + MinedTxs TxSlots + BaseFee uint64 + BlockHeight uint64 + BlockHash [32]byte + Senders *SendersCache } { var calls []struct { - StateChanges map[string]senderInfo - UnwindTxs TxSlots - MinedTxs TxSlots - ProtocolBaseFee uint64 - PendingBaseFee uint64 - BlockHeight uint64 - BlockHash [32]byte - Senders *SendersCache + StateChanges map[string]senderInfo + UnwindTxs TxSlots + MinedTxs TxSlots + BaseFee uint64 + BlockHeight uint64 + BlockHash [32]byte + Senders *SendersCache } mock.lockOnNewBlock.RLock() calls = mock.calls.OnNewBlock diff --git a/txpool/pool.go b/txpool/pool.go index 7e1bec721..549a2a9e4 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -23,6 +23,8 @@ import ( "encoding/binary" "fmt" "math" + "runtime" + "sort" "sync" "time" @@ -69,7 +71,7 @@ type Pool interface { Started() bool GetRlp(tx kv.Tx, hash []byte) ([]byte, error) OnNewTxs(ctx context.Context, coreDB kv.RoDB, newTxs TxSlots) error - OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error + OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, baseFee, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error AddNewGoodPeer(peerID PeerID) } @@ -447,13 +449,17 @@ func (sc *SendersCache) syncMissedStateDiff(ctx context.Context, tx kv.RwTx, cor return nil } +func calcProtocolBaseFee(baseFee uint64) uint64 { + return 7 +} + // TxPool - holds all pool-related data structures and lock-based tiny methods // most of logic implemented by pure tests-friendly functions type TxPool struct { lock *sync.RWMutex protocolBaseFee atomic.Uint64 - pendingBaseFee atomic.Uint64 + currentBaseFee atomic.Uint64 senderID uint64 byHash map[string]*metaTx // tx_hash => tx @@ -550,7 +556,7 @@ func (p *TxPool) printDebug(prefix string) { } } func (p *TxPool) logStats(tx kv.Tx) error { - protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load() + protocolBaseFee, currentBaseFee := p.protocolBaseFee.Load(), p.currentBaseFee.Load() p.lock.RLock() defer p.lock.RUnlock() @@ -562,10 +568,14 @@ func (p *TxPool) logStats(tx kv.Tx) error { if err != nil { return err } - log.Info(fmt.Sprintf("baseFee: %dm->%dm; queuesSize: pending=%d/%d, baseFee=%d/%d, queued=%d/%d; sendersCache: id=%d+%d,info=%d+%d", - protocolBaseFee/1_000_000, pendingBaseFee/1_000_000, + var m runtime.MemStats + runtime.ReadMemStats(&m) + + log.Info(fmt.Sprintf("baseFee: %d,%dm; queuesSize: pending=%d/%d, baseFee=%d/%d, queued=%d/%d; sendersCache: id=%d+%d,info=%d+%d, alloc=%dMb, sys=%dMb\n", + protocolBaseFee, currentBaseFee/1_000_000, p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.queued.Len(), QueuedSubPoolLimit, idsInMem, idsInDb, infoInMem, infoInDb, + m.Alloc/1024/1024, m.Sys/1024/1024, )) return nil } @@ -657,11 +667,11 @@ func (p *TxPool) OnNewTxs(ctx context.Context, coreDB kv.RoDB, newTxs TxSlots) e return err } - protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load() - if protocolBaseFee == 0 || pendingBaseFee == 0 { - return fmt.Errorf("non-zero base fee: %d,%d", protocolBaseFee, pendingBaseFee) + protocolBaseFee, currentBaseFee := p.protocolBaseFee.Load(), p.currentBaseFee.Load() + if protocolBaseFee == 0 || currentBaseFee == 0 { + return fmt.Errorf("non-zero base fee: %d,%d", protocolBaseFee, currentBaseFee) } - if err := onNewTxs(tx, p.senders, newTxs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.txNonce2Tx, p.byHash, p.discardLocked); err != nil { + if err := onNewTxs(tx, p.senders, newTxs, protocolBaseFee, currentBaseFee, p.pending, p.baseFee, p.queued, p.txNonce2Tx, p.byHash, p.discardLocked); err != nil { return err } notifyNewTxs := make(Hashes, 0, 32*len(newTxs.txs)) @@ -682,7 +692,7 @@ func (p *TxPool) OnNewTxs(ctx context.Context, coreDB kv.RoDB, newTxs TxSlots) e //log.Info("on new txs", "in", time.Since(t)) return nil } -func onNewTxs(tx kv.Tx, senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBaseFee uint64, pending, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error { +func onNewTxs(tx kv.Tx, senders *SendersCache, newTxs TxSlots, protocolBaseFee, currentBaseFee uint64, pending, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error { for i := range newTxs.txs { if newTxs.txs[i].senderID == 0 { return fmt.Errorf("senderID can't be zero") @@ -695,7 +705,7 @@ func onNewTxs(tx kv.Tx, senders *SendersCache, newTxs TxSlots, protocolBaseFee, if err != nil { return err } - onSenderChange(id, sender, byNonce, protocolBaseFee, pendingBaseFee, discard) + onSenderChange(id, sender, byNonce, protocolBaseFee, currentBaseFee) } pending.EnforceInvariants() @@ -707,20 +717,15 @@ func onNewTxs(tx kv.Tx, senders *SendersCache, newTxs TxSlots, protocolBaseFee, return nil } -func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uint64) { - p.protocolBaseFee.Store(protocolBaseFee) - hasNewVal := pendingBaseFee > 0 - if pendingBaseFee < protocolBaseFee { - pendingBaseFee = protocolBaseFee - hasNewVal = true +func (p *TxPool) setBaseFee(baseFee uint64) (uint64, uint64) { + if baseFee > 0 { + p.protocolBaseFee.Store(calcProtocolBaseFee(baseFee)) + p.currentBaseFee.Store(baseFee) } - if hasNewVal { - p.pendingBaseFee.Store(pendingBaseFee) - } - return protocolBaseFee, p.pendingBaseFee.Load() + return p.protocolBaseFee.Load(), p.currentBaseFee.Load() } -func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error { +func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, baseFee, blockHeight uint64, blockHash [32]byte, senders *SendersCache) error { defer onNewBlockTimer.UpdateDuration(time.Now()) p.lock.Lock() defer p.lock.Unlock() @@ -731,11 +736,11 @@ func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, mined return err } defer tx.Rollback() - protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) + protocolBaseFee, baseFee := p.setBaseFee(baseFee) if err := senders.onNewBlock(tx, stateChanges, unwindTxs, minedTxs, blockHeight, blockHash); err != nil { return err } - //log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) + //log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "baseFee", baseFee, "blockHeight", blockHeight) if err := unwindTxs.Valid(); err != nil { return err } @@ -743,7 +748,7 @@ func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, mined return err } - if err := onNewBlock(tx, senders, unwindTxs, minedTxs.txs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.txNonce2Tx, p.byHash, p.discardLocked); err != nil { + if err := onNewBlock(tx, senders, unwindTxs, minedTxs.txs, protocolBaseFee, baseFee, p.pending, p.baseFee, p.queued, p.txNonce2Tx, p.byHash, p.discardLocked); err != nil { return err } @@ -804,7 +809,7 @@ func onNewBlock(tx kv.Tx, senders *SendersCache, unwindTxs TxSlots, minedTxs []* if err != nil { return err } - onSenderChange(id, sender, byNonce, protocolBaseFee, pendingBaseFee, discard) + onSenderChange(id, sender, byNonce, protocolBaseFee, pendingBaseFee) } pending.EnforceInvariants() @@ -909,7 +914,7 @@ func unsafeAddToPendingPool(byNonce *ByNonce, newTxs TxSlots, pending, baseFee, return changedSenders } -func onSenderChange(senderID uint64, sender *senderInfo, byNonce *ByNonce, protocolBaseFee, pendingBaseFee uint64, discard func(*metaTx)) { +func onSenderChange(senderID uint64, sender *senderInfo, byNonce *ByNonce, protocolBaseFee, currentBaseFee uint64) { noGapsNonce := sender.nonce + 1 cumulativeRequiredBalance := uint256.NewInt(0) minFeeCap := uint64(math.MaxUint64) @@ -921,10 +926,10 @@ func onSenderChange(senderID uint64, sender *senderInfo, byNonce *ByNonce, proto needBalance.Add(needBalance, &mt.Tx.value) minFeeCap = min(minFeeCap, mt.Tx.feeCap) minTip = min(minTip, mt.Tx.tip) - if pendingBaseFee >= minFeeCap { + if currentBaseFee >= minFeeCap { mt.effectiveTip = minTip } else { - mt.effectiveTip = minFeeCap - pendingBaseFee + mt.effectiveTip = minFeeCap - currentBaseFee } // 1. Minimum fee requirement. Set to 1 if feeCap of the transaction is no less than in-protocol // parameter of minimal base fee. Set to 0 if feeCap is less than minimum base fee, which means @@ -965,7 +970,7 @@ func onSenderChange(senderID uint64, sender *senderInfo, byNonce *ByNonce, proto // 4. Dynamic fee requirement. Set to 1 if feeCap of the transaction is no less than // baseFee of the currently pending block. Set to 0 otherwise. mt.subPool &^= EnoughFeeCapBlock - if mt.Tx.feeCap >= pendingBaseFee { + if mt.Tx.feeCap >= currentBaseFee { mt.subPool |= EnoughFeeCapBlock } @@ -1222,13 +1227,13 @@ func BroadcastLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, s if err := db.View(ctx, func(tx kv.Tx) error { return p.logStats(tx) }); err != nil { log.Error("log stats", "err", err) } - if ASSERT { - go func() { - if err := p.forceCheckState(ctx, db, coreDB); err != nil { - log.Error("forceCheckState", "err", err) - } - }() - } + //if ASSERT { + // go func() { + // if err := p.forceCheckState(ctx, db, coreDB); err != nil { + // log.Error("forceCheckState", "err", err) + // } + // }() + //} logEvery := time.NewTicker(p.cfg.logEvery) defer logEvery.Stop() @@ -1285,6 +1290,7 @@ func BroadcastLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, s } } +//nolint func coreProgress(coreTx kv.Tx) (uint64, error) { stageProgress, err := coreTx.GetOne(kv.SyncStageProgress, []byte("Finish")) if err != nil { @@ -1293,6 +1299,7 @@ func coreProgress(coreTx kv.Tx) (uint64, error) { return binary.BigEndian.Uint64(stageProgress), err } +//nolint func (p *TxPool) forceCheckState(ctx context.Context, db, coreDB kv.RoDB) error { for { if err := db.View(ctx, func(tx kv.Tx) error { @@ -1356,6 +1363,16 @@ func (p *TxPool) flush(db kv.RwDB) (evicted, written uint64, err error) { return evicted, written, nil } func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) { + if ASSERT { + c1, _ := tx.Cursor(kv.PoolSenderID) + c2, _ := tx.Cursor(kv.PoolSenderIDToAdress) + count1, _ := c1.Count() + count2, _ := c2.Count() + if count1 != count2 { + fmt.Printf("counts: %d, %d\n", count1, count2) + panic(1) + } + } sendersWithoutTransactions := roaring64.New() for i := 0; i < len(p.deletedTxs); i++ { if p.txNonce2Tx.count(p.deletedTxs[i].Tx.senderID) == 0 { @@ -1430,7 +1447,6 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) { } metaTx.Tx.rlp = nil } - if ASSERT { txs := TxSlots{} parseCtx := NewTxParseContext() @@ -1466,7 +1482,7 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) { if err := tx.Put(kv.PoolInfo, PoolProtocolBaseFeeKey, encID); err != nil { return evicted, err } - binary.BigEndian.PutUint64(encID, p.pendingBaseFee.Load()) + binary.BigEndian.PutUint64(encID, p.currentBaseFee.Load()) if err := tx.Put(kv.PoolInfo, PoolPendingBaseFeeKey, encID); err != nil { return evicted, err } @@ -1475,6 +1491,7 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) { if err != nil { return evicted, err } + if ASSERT { _ = tx.ForEach(kv.PoolSenderIDToAdress, nil, func(idBytes, addr []byte) error { found := false @@ -1530,7 +1547,6 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) { // DB will stay consitant but some in-memory structures may be alread cleaned, and retry will not work // failed write transaction must not create side-effects p.deletedTxs = p.deletedTxs[:0] - return evicted, nil } @@ -1538,7 +1554,8 @@ func (sc *SendersCache) flush(tx kv.RwTx, byNonce *ByNonce, sendersWithoutTransa sc.lock.Lock() defer sc.lock.Unlock() sc.commitID++ - //var justDeleted, justInserted []uint64 + + var justDeleted, justInserted []uint64 encID := make([]byte, 8) for addr, id := range sc.senderIDs { binary.BigEndian.PutUint64(encID, id) @@ -1556,16 +1573,17 @@ func (sc *SendersCache) flush(tx kv.RwTx, byNonce *ByNonce, sendersWithoutTransa if err := tx.Put(kv.PoolSenderIDToAdress, encID, []byte(addr)); err != nil { return evicted, err } - //if ASSERT { - // justInserted = append(justInserted, id) - //} + if ASSERT { + justInserted = append(justInserted, id) + } if byNonce.count(id) == 0 { sendersWithoutTransactions.Add(id) } } - //if ASSERT { - // sort.Slice(justInserted, func(i, j int) bool { return justInserted[i] < justInserted[j] }) - //} + + if ASSERT { + sort.Slice(justInserted, func(i, j int) bool { return justInserted[i] < justInserted[j] }) + } v := make([]byte, 8, 8+32) for id, info := range sc.senderInfo { @@ -1575,35 +1593,15 @@ func (sc *SendersCache) flush(tx kv.RwTx, byNonce *ByNonce, sendersWithoutTransa binary.BigEndian.PutUint64(encID, id) binary.BigEndian.PutUint64(v, info.nonce) v = append(v[:8], info.balance.Bytes()...) - //TODO: check that nothing changed - if err := tx.Put(kv.PoolSender, encID, v); err != nil { + enc, err := tx.GetOne(kv.PoolSender, encID) + if err != nil { return evicted, err } - } - //fmt.Printf("justDeleted:%d, justInserted:%d\n", justDeleted, justInserted) - if ASSERT { - { - duplicates := map[string]uint64{} - _ = tx.ForPrefix(kv.PoolSenderIDToAdress, nil, func(k, v []byte) error { - id, ok := duplicates[string(v)] - if ok { - fmt.Printf("duplicate: %d,%d,%x\n", id, binary.BigEndian.Uint64(k), string(v)) - panic(1) - } - return nil - }) + if bytes.Equal(enc, v) { + continue } - { - duplicates := map[uint64]string{} - _ = tx.ForPrefix(kv.PoolSenderIDToAdress, nil, func(k, v []byte) error { - id := binary.BigEndian.Uint64(v) - addr, ok := duplicates[id] - if ok { - fmt.Printf("duplicate: %x,%x,%d\n", addr, k, binary.BigEndian.Uint64(v)) - panic(1) - } - return nil - }) + if err := tx.Put(kv.PoolSender, encID, v); err != nil { + return evicted, err } } @@ -1640,19 +1638,26 @@ func (sc *SendersCache) flush(tx kv.RwTx, byNonce *ByNonce, sendersWithoutTransa return 0, err } for _, senderID := range ids.ToArray() { - if _, ok := sc.senderInfo[senderID]; ok { - continue - } + //if _, ok := sc.senderInfo[senderID]; ok { + // continue + //} if byNonce.count(senderID) > 0 { continue } - addr, err := tx.GetOne(kv.PoolSenderID, encID) + binary.BigEndian.PutUint64(encID, senderID) + addr, err := tx.GetOne(kv.PoolSenderIDToAdress, encID) if err != nil { return evicted, err } - if _, ok := sc.senderIDs[string(addr)]; ok { + if addr == nil { continue } + if len(addr) != 20 { + panic(22) + } + //if _, ok := sc.senderIDs[string(addr)]; ok { + // continue + //} if err := tx.Delete(kv.PoolSenderID, addr, nil); err != nil { return evicted, err } @@ -1663,15 +1668,17 @@ func (sc *SendersCache) flush(tx kv.RwTx, byNonce *ByNonce, sendersWithoutTransa return evicted, err } evicted++ - //if ASSERT { - // justDeleted = append(justDeleted, senderID) - //} + if ASSERT { + justDeleted = append(justDeleted, senderID) //nolint + } } if err := c.DeleteCurrent(); err != nil { return evicted, err } } + //fmt.Printf("justDeleted:%d, justInserted:%d\n", justDeleted, justInserted) + if ASSERT { _ = tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { //id := binary.BigEndian.Uint64(v[:8]) @@ -1805,7 +1812,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error { return err } - var protocolBaseFee, pendingBaseFee uint64 + var protocolBaseFee, currentBaseFee uint64 { v, err := tx.GetOne(kv.PoolInfo, PoolProtocolBaseFeeKey) if err != nil { @@ -1821,7 +1828,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error { return err } if len(v) > 0 { - pendingBaseFee = binary.BigEndian.Uint64(v) + currentBaseFee = binary.BigEndian.Uint64(v) } } cacheMisses, err := p.senders.onNewTxs(tx, txs) @@ -1833,10 +1840,10 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error { return err } } - if err := onNewTxs(tx, p.senders, txs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.txNonce2Tx, p.byHash, p.discardLocked); err != nil { + if err := onNewTxs(tx, p.senders, txs, protocolBaseFee, currentBaseFee, p.pending, p.baseFee, p.queued, p.txNonce2Tx, p.byHash, p.discardLocked); err != nil { return err } - p.pendingBaseFee.Store(pendingBaseFee) + p.currentBaseFee.Store(currentBaseFee) p.protocolBaseFee.Store(protocolBaseFee) return nil diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index cc1ac68f0..06e9b2f4f 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -274,20 +274,19 @@ func splitDataset(in TxSlots) (TxSlots, TxSlots, TxSlots, TxSlots) { return p1, p2, p3, p4 } -func FuzzOnNewBlocks11(f *testing.F) { +func FuzzOnNewBlocks12(f *testing.F) { var u64 = [1 * 4]byte{1} var sender = [1 + 1 + 1]byte{1} - f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 1, 2) - f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 3, 4) - f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 10, 12) - f.Fuzz(func(t *testing.T, txNonce, values, tips, feeCap, sender []byte, protocolBaseFee1, pendingBaseFee1 uint8) { + f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 12) + f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 14) + f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 123) + f.Fuzz(func(t *testing.T, txNonce, values, tips, feeCap, sender []byte, currentBaseFee1 uint8) { //t.Parallel() - protocolBaseFee, pendingBaseFee := uint64(protocolBaseFee1%16+1), uint64(pendingBaseFee1%8+1) - if protocolBaseFee == 0 || pendingBaseFee == 0 { + currentBaseFee := uint64(currentBaseFee1%16 + 1) + if currentBaseFee == 0 { t.Skip() } - pendingBaseFee += protocolBaseFee if len(sender) < 1+1+1 { t.Skip() } @@ -338,10 +337,10 @@ func FuzzOnNewBlocks11(f *testing.F) { //assert.True(tx.SenderHasEnoughBalance) } if tx.subPool&EnoughFeeCapProtocol > 0 { - assert.LessOrEqual(protocolBaseFee, tx.Tx.feeCap, msg) + assert.LessOrEqual(calcProtocolBaseFee(currentBaseFee), tx.Tx.feeCap, msg) } if tx.subPool&EnoughFeeCapBlock > 0 { - assert.LessOrEqual(pendingBaseFee, tx.Tx.feeCap, msg) + assert.LessOrEqual(currentBaseFee, tx.Tx.feeCap, msg) } // side data structures must have all txs @@ -377,10 +376,10 @@ func FuzzOnNewBlocks11(f *testing.F) { //assert.True(tx.SenderHasEnoughBalance, msg) } if tx.subPool&EnoughFeeCapProtocol > 0 { - assert.LessOrEqual(protocolBaseFee, tx.Tx.feeCap, msg) + assert.LessOrEqual(calcProtocolBaseFee(currentBaseFee), tx.Tx.feeCap, msg) } if tx.subPool&EnoughFeeCapBlock > 0 { - assert.LessOrEqual(pendingBaseFee, tx.Tx.feeCap, msg) + assert.LessOrEqual(currentBaseFee, tx.Tx.feeCap, msg) } assert.True(pool.txNonce2Tx.has(tx), msg) @@ -404,10 +403,10 @@ func FuzzOnNewBlocks11(f *testing.F) { //assert.True(tx.SenderHasEnoughBalance, msg) } if tx.subPool&EnoughFeeCapProtocol > 0 { - assert.LessOrEqual(protocolBaseFee, tx.Tx.feeCap, msg) + assert.LessOrEqual(calcProtocolBaseFee(currentBaseFee), tx.Tx.feeCap, msg) } if tx.subPool&EnoughFeeCapBlock > 0 { - assert.LessOrEqual(pendingBaseFee, tx.Tx.feeCap, msg) + assert.LessOrEqual(currentBaseFee, tx.Tx.feeCap, msg) } assert.True(pool.txNonce2Tx.has(tx), "%s, %d, %x", msg, tx.Tx.nonce, tx.Tx.idHash) @@ -478,28 +477,35 @@ func FuzzOnNewBlocks11(f *testing.F) { } prevTotal = pending.Len() + baseFee.Len() + queued.Len() } + checkDB := func(tx kv.Tx) { + c1, _ := tx.Cursor(kv.PoolSenderID) + c2, _ := tx.Cursor(kv.PoolSenderIDToAdress) + count1, _ := c1.Count() + count2, _ := c2.Count() + assert.Equal(count1, count2) + } //fmt.Printf("-------\n") // go to first fork //fmt.Printf("ll1: %d,%d,%d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len()) txs1, txs2, p2pReceived, txs3 := splitDataset(txs) - err = pool.OnNewBlock(map[string]senderInfo{}, txs1, TxSlots{}, protocolBaseFee, pendingBaseFee, 1, [32]byte{}, sendersCache) + err = pool.OnNewBlock(map[string]senderInfo{}, txs1, TxSlots{}, currentBaseFee, 1, [32]byte{}, sendersCache) assert.NoError(err) check(txs1, TxSlots{}, "fork1") checkNotify(txs1, TxSlots{}, "fork1") _, _, _ = p2pReceived, txs2, txs3 - err = pool.OnNewBlock(map[string]senderInfo{}, TxSlots{}, txs2, protocolBaseFee, pendingBaseFee, 1, [32]byte{}, sendersCache) + err = pool.OnNewBlock(map[string]senderInfo{}, TxSlots{}, txs2, currentBaseFee, 1, [32]byte{}, sendersCache) check(TxSlots{}, txs2, "fork1 mined") checkNotify(TxSlots{}, txs2, "fork1 mined") // unwind everything and switch to new fork (need unwind mined now) - err = pool.OnNewBlock(map[string]senderInfo{}, txs2, TxSlots{}, protocolBaseFee, pendingBaseFee, 2, [32]byte{}, sendersCache) + err = pool.OnNewBlock(map[string]senderInfo{}, txs2, TxSlots{}, currentBaseFee, 2, [32]byte{}, sendersCache) assert.NoError(err) check(txs2, TxSlots{}, "fork2") checkNotify(txs2, TxSlots{}, "fork2") - err = pool.OnNewBlock(map[string]senderInfo{}, TxSlots{}, txs3, protocolBaseFee, pendingBaseFee, 2, [32]byte{}, sendersCache) + err = pool.OnNewBlock(map[string]senderInfo{}, TxSlots{}, txs3, currentBaseFee, 2, [32]byte{}, sendersCache) assert.NoError(err) check(TxSlots{}, txs3, "fork2 mined") checkNotify(TxSlots{}, txs3, "fork2 mined") @@ -520,6 +526,7 @@ func FuzzOnNewBlocks11(f *testing.F) { _, err = pool.flushLocked(tx) // we don't test eviction here, because dedicated test exists require.NoError(err) check(p2pReceived, TxSlots{}, "after_flush") + checkDB(tx) //checkNotify(p2pReceived, TxSlots{}, "after_flush") s2 := NewSendersCache() @@ -527,6 +534,7 @@ func FuzzOnNewBlocks11(f *testing.F) { assert.NoError(err) err = p2.fromDB(context.Background(), tx, nil) require.NoError(err) + checkDB(tx) for _, txn := range p2.byHash { assert.Nil(txn.Tx.rlp) } @@ -547,7 +555,7 @@ func FuzzOnNewBlocks11(f *testing.F) { assert.Equal(pool.pending.Len(), p2.pending.Len()) assert.Equal(pool.baseFee.Len(), p2.baseFee.Len()) assert.Equal(pool.queued.Len(), p2.queued.Len()) - assert.Equal(pool.pendingBaseFee.Load(), p2.pendingBaseFee.Load()) + assert.Equal(pool.currentBaseFee.Load(), p2.currentBaseFee.Load()) assert.Equal(pool.protocolBaseFee.Load(), p2.protocolBaseFee.Load()) }) diff --git a/txpool/pool_test.go b/txpool/pool_test.go index 2119345a4..af7eb689d 100644 --- a/txpool/pool_test.go +++ b/txpool/pool_test.go @@ -17,6 +17,7 @@ package txpool import ( + "fmt" "testing" "github.com/RoaringBitmap/roaring/roaring64" @@ -29,12 +30,15 @@ import ( func TestSenders(t *testing.T) { t.Run("evict_all_on_next_round", func(t *testing.T) { senders, require := NewSendersCache(), require.New(t) - senders.senderInfo[1] = newSenderInfo(1, *uint256.NewInt(1)) - senders.senderInfo[2] = newSenderInfo(1, *uint256.NewInt(1)) _, tx := memdb.NewTestPoolTx(t) byNonce := &ByNonce{btree.New(16)} - changed := roaring64.New() + + senders.senderIDs[fmt.Sprintf("%020x", 1)] = 1 + senders.senderInfo[1] = newSenderInfo(1, *uint256.NewInt(1)) + senders.senderIDs[fmt.Sprintf("%020x", 2)] = 2 + senders.senderInfo[2] = newSenderInfo(1, *uint256.NewInt(1)) + changed.AddMany([]uint64{1, 2}) evicted, err := senders.flush(tx, byNonce, changed, 1) require.NoError(err) @@ -43,15 +47,18 @@ func TestSenders(t *testing.T) { changed.Clear() evicted, err = senders.flush(tx, byNonce, changed, 1) require.NoError(err) + require.Equal(2, int(evicted)) }) - t.Run("do_not_evict_if_used_in_current_round", func(t *testing.T) { + t.Run("evict_even_if_used_in_current_round_but_no_txs", func(t *testing.T) { senders, require := NewSendersCache(), require.New(t) _, tx := memdb.NewTestPoolTx(t) byNonce := &ByNonce{btree.New(16)} senders.senderInfo[1] = newSenderInfo(1, *uint256.NewInt(1)) + senders.senderIDs[fmt.Sprintf("%020x", 1)] = 1 senders.senderInfo[2] = newSenderInfo(1, *uint256.NewInt(1)) + senders.senderIDs[fmt.Sprintf("%020x", 2)] = 2 changed := roaring64.New() changed.AddMany([]uint64{1, 2}) @@ -60,11 +67,12 @@ func TestSenders(t *testing.T) { require.Zero(evicted) senders.senderInfo[1] = newSenderInfo(1, *uint256.NewInt(1)) // means used in current round, but still has 0 transactions + senders.senderIDs[fmt.Sprintf("%020x", 1)] = 1 changed.Clear() changed.AddMany([]uint64{1}) evicted, err = senders.flush(tx, byNonce, changed, 1) require.NoError(err) - require.Equal(1, int(evicted)) + require.Equal(2, int(evicted)) }) }