diff --git a/kv/tables.go b/kv/tables.go index 64853cfe8..5649b320f 100644 --- a/kv/tables.go +++ b/kv/tables.go @@ -414,6 +414,8 @@ var ChaindataTablesCfg = TableCfg{ }, } +var TxpoolTablesCfg = TableCfg{} + func sortBuckets() { sort.SliceStable(ChaindataTables, func(i, j int) bool { return strings.Compare(ChaindataTables[i], ChaindataTables[j]) < 0 @@ -443,4 +445,12 @@ func reinit() { tmp.IsDeprecated = true ChaindataTablesCfg[name] = tmp } + + for _, name := range TxPoolTables { + _, ok := TxpoolTablesCfg[name] + if !ok { + TxpoolTablesCfg[name] = TableCfgItem{} + } + } + } diff --git a/txpool/pool.go b/txpool/pool.go index c2a27b107..a1ad49dca 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -150,6 +150,7 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) { } return &TxPool{ lock: &sync.RWMutex{}, + senderIDs: map[string]uint64{}, senderInfo: map[uint64]*senderInfo{}, byHash: map[string]*metaTx{}, localsHistory: localsHistory, @@ -159,13 +160,15 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) { queued: NewSubPool(), newTxs: newTxs, db: db, + senderID: 1, }, nil } func (p *TxPool) logStats() { p.lock.RLock() defer p.lock.RUnlock() - log.Info(fmt.Sprintf("[txpool] queues size: pending=%d/%d, baseFee=%d/%d, queued=%d/%d", p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.pending.Len(), PendingSubPoolLimit)) + protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load() + log.Info(fmt.Sprintf("[txpool] baseFee: protocol=%d,pending=%d; queues size: pending=%d/%d, baseFee=%d/%d, queued=%d/%d", protocolBaseFee, pendingBaseFee, p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.queued.Len(), QueuedSubPoolLimit)) } func (p *TxPool) GetRlp(hash []byte) []byte { p.lock.RLock() @@ -177,34 +180,33 @@ func (p *TxPool) GetRlp(hash []byte) []byte { } return txn.Tx.rlp } -func (p *TxPool) AppendLocalHashes(buf []byte) { +func (p *TxPool) AppendLocalHashes(buf []byte) []byte { p.lock.RLock() defer p.lock.RUnlock() - i := 0 for hash, txn := range p.byHash { if txn.subPool&IsLocal == 0 { continue } - copy(buf[i*32:(i+1)*32], hash) - i++ + buf = append(buf, hash...) } + return buf } -func (p *TxPool) AppendRemoteHashes(buf []byte) { +func (p *TxPool) AppendRemoteHashes(buf []byte) []byte { p.lock.RLock() defer p.lock.RUnlock() - i := 0 for hash, txn := range p.byHash { if txn.subPool&IsLocal != 0 { continue } - copy(buf[i*32:(i+1)*32], hash) - i++ + buf = append(buf, hash...) } + return buf } -func (p *TxPool) AppendAllHashes(buf []byte) { - p.AppendLocalHashes(buf) - p.AppendRemoteHashes(buf[len(buf):]) +func (p *TxPool) AppendAllHashes(buf []byte) []byte { + buf = p.AppendLocalHashes(buf) + buf = p.AppendRemoteHashes(buf) + return buf } func (p *TxPool) IdHashKnown(hash []byte) bool { p.lock.RLock() @@ -230,7 +232,7 @@ func (p *TxPool) Started() bool { defer p.lock.Unlock() protocolBaseFee := p.protocolBaseFee.Load() - return protocolBaseFee == 0 + return protocolBaseFee > 0 } func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots) error { @@ -243,7 +245,7 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots) error { protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load() if protocolBaseFee == 0 || pendingBaseFee == 0 { - return fmt.Errorf("non-zero base fee") + return fmt.Errorf("non-zero base fee: %d,%d", protocolBaseFee, pendingBaseFee) } if err := setTxSenderID(coreDB, &p.senderID, p.senderIDs, p.senderInfo, newTxs); err != nil { @@ -326,10 +328,12 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin hasNewVal := pendingBaseFee > 0 if pendingBaseFee < protocolBaseFee { pendingBaseFee = protocolBaseFee + hasNewVal = true } if hasNewVal { - p.protocolBaseFee.Store(pendingBaseFee) + p.pendingBaseFee.Store(pendingBaseFee) } + log.Debug("set base fee", "protocol", protocolBaseFee, "pending", pendingBaseFee) return protocolBaseFee, p.pendingBaseFee.Load() } @@ -337,6 +341,8 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un log.Debug("[txpool.onNewBlock]", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) p.lock.Lock() defer p.lock.Unlock() + p.blockHeight.Store(blockHeight) + protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) if err := unwindTxs.Valid(); err != nil { return err } @@ -344,9 +350,6 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un return err } - p.blockHeight.Store(blockHeight) - protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) - if err := setTxSenderID(coreDB, &p.senderID, p.senderIDs, p.senderInfo, unwindTxs); err != nil { return err } @@ -355,7 +358,7 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un } for addr, id := range p.senderIDs { // merge state changes if v, ok := stateChanges[addr]; ok { - p.senderInfo[id] = &v + p.senderInfo[id] = newSenderInfo(v.nonce, v.balance) } } @@ -410,7 +413,8 @@ func setTxSenderID(coreDB kv.Tx, senderIDSequence *uint64, senderIDs map[string] id, ok := senderIDs[addr] if !ok { *senderIDSequence++ - senderIDs[addr] = *senderIDSequence + id = *senderIDSequence + senderIDs[addr] = id } txs.txs[i].senderID = id @@ -425,7 +429,7 @@ func setTxSenderID(coreDB kv.Tx, senderIDSequence *uint64, senderIDs map[string] if err != nil { return err } - sendersInfo[txs.txs[i].senderID] = &senderInfo{nonce: nonce, balance: balance} + sendersInfo[txs.txs[i].senderID] = newSenderInfo(nonce, balance) } } return nil @@ -443,7 +447,9 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs [ } } + j := 0 removeMined(senderInfo, minedTxs, pending, baseFee, queued, func(i *metaTx) { + j++ delete(byHash, string(i.Tx.idHash[:])) senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i}) if i.subPool&IsLocal != 0 { @@ -451,6 +457,7 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs [ localsHistory.Add(i.Tx.idHash, struct{}{}) } }) + log.Info("remove mined", "removed", j, "minedTxsLen", len(minedTxs)) // This can be thought of a reverse operation from the one described before. // When a block that was deemed "the best" of its height, is no longer deemed "the best", the @@ -524,10 +531,10 @@ func removeMined(senderInfo map[uint64]*senderInfo, minedTxs []*TxSlot, pending, // delete mined transactions from everywhere sender.txNonce2Tx.Ascend(func(i btree.Item) bool { it := i.(*nonce2TxItem) + fmt.Printf("nonce cmp: %d,%d, senderID=%d\n", it.metaTx.Tx.nonce, sender.nonce, tx.senderID) if it.metaTx.Tx.nonce > sender.nonce { return false } - // TODO: save local transactions to cache with TTL, in case of re-org - to restore isLocal flag of re-injected transactions // del from nonce2tx mapping sender.txNonce2Tx.Delete(i) @@ -624,7 +631,6 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, pendingBaseFee uint64) // baseFee of the currently pending block. Set to 0 otherwise. it.metaTx.subPool &^= EnoughFeeCapBlock if it.metaTx.Tx.feeCap >= pendingBaseFee { - fmt.Printf("setttttt: %d,%d,%d\n", protocolBaseFee, pendingBaseFee, it.metaTx.Tx.feeCap) it.metaTx.subPool |= EnoughFeeCapBlock } @@ -904,7 +910,7 @@ func BroadcastLoop(ctx context.Context, p *TxPool, newTxs chan Hashes, send *Sen if len(newPeers) == 0 { continue } - p.AppendAllHashes(remoteTxHashes[:0]) + remoteTxHashes = p.AppendAllHashes(remoteTxHashes[:0]) send.PropagatePooledTxsToPeersList(newPeers, remoteTxHashes) } }