From e2ddc55c955dfd6ca6d9ca0ff787c84a89f06976 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 5 Aug 2021 09:48:37 +0700 Subject: [PATCH] add onNewTxs method --- txpool/pool.go | 192 +++++++++++++++++++++++++++++++----------------- txpool/types.go | 1 + 2 files changed, 124 insertions(+), 69 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 61c0c69b6..e049c1eb9 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -71,8 +71,12 @@ type MetaTx struct { currentSubPool SubPoolType } -func newMetaTx(slot *TxSlot) *MetaTx { - return &MetaTx{Tx: slot, worstIndex: -1, bestIndex: -1} +func newMetaTx(slot *TxSlot, isLocal bool) *MetaTx { + mt := &MetaTx{Tx: slot, worstIndex: -1, bestIndex: -1} + if isLocal { + mt.SubPool = IsLocal + } + return mt } type SubPoolType uint8 @@ -204,6 +208,78 @@ func (p *TxPool) IdHashIsLocal(hash []byte) bool { return txn.SubPool&IsLocal != 0 } func (p *TxPool) OnNewPeer(peerID PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) } + +func (p *TxPool) OnNewTxs(newTxs TxSlots) error { + p.lock.Lock() + defer p.lock.Unlock() + + for i := range newTxs.txs { + id, ok := p.senderIDs[string(newTxs.senders[i*20:(i+1)*20])] + if !ok { + for i := range p.senderInfo { //TODO: create field for it? + if id < i { + id = i + } + } + id++ + p.senderIDs[string(newTxs.senders[i*20:(i+1)*20])] = id + newTxs.txs[i].senderID = id + } + } + if err := onNewTxs(p.senderInfo, newTxs, p.protocolBaseFee.Load(), p.blockBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil { + return err + } + + notifyNewTxs := make(Hashes, 0, 32*len(newTxs.txs)) + for i := range newTxs.txs { + _, ok := p.byHash[string(newTxs.txs[i].idHash[:])] + if !ok { + continue + } + notifyNewTxs = append(notifyNewTxs, newTxs.txs[i].idHash[:]...) + } + select { + case p.newTxs <- notifyNewTxs: + default: + } + + return nil +} +func onNewTxs(senderInfo map[uint64]*senderInfo, newTxs TxSlots, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*MetaTx, localsHistory *lru.Cache) error { + for i := range newTxs.txs { + if newTxs.txs[i].senderID == 0 { + return fmt.Errorf("senderID can't be zero") + } + } + + unsafeAddToPool(senderInfo, newTxs, queued, func(i *MetaTx) { + if _, ok := localsHistory.Get(i.Tx.idHash); ok { + //TODO: also check if sender is in list of local-senders + i.SubPool |= IsLocal + } + delete(byHash, string(i.Tx.idHash[:])) + }) + + for i := range senderInfo { + // TODO: aggregate changed senders before call this func + onSenderChange(senderInfo[i], protocolBaseFee, blockBaseFee) + } + + pending.EnforceInvariants() + baseFee.EnforceInvariants() + queued.EnforceInvariants() + + promote(pending, baseFee, queued, func(i *MetaTx) { + delete(byHash, string(i.Tx.idHash[:])) + senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i}) + if i.SubPool&IsLocal != 0 { + //TODO: only add to history if sender is not in list of local-senders + localsHistory.Add(i.Tx.idHash, struct{}{}) + } + }) + + return nil +} func (p *TxPool) OnNewBlock(unwindTxs, minedTxs TxSlots, protocolBaseFee, blockBaseFee uint64) error { p.lock.Lock() defer p.lock.Unlock() @@ -223,56 +299,29 @@ func (p *TxPool) OnNewBlock(unwindTxs, minedTxs TxSlots, protocolBaseFee, blockB unwindTxs.txs[i].senderID = id } } - - /* - for i := range unwindTxs { - unwindTxs[i].senderID == 0 - info, ok := s.cache[id] - if ok { - return info - } - p.tx.GetOne(kv.PlainState) - } - */ - // TODO: assign senderID to all transactions - if err := onNewBlock(p.senderInfo, unwindTxs.txs, minedTxs.txs, protocolBaseFee, blockBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil { + if err := onNewBlock(p.senderInfo, unwindTxs, minedTxs.txs, protocolBaseFee, blockBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil { return err } - newTxs := make(Hashes, 0, 32*len(unwindTxs.txs)) + notifyNewTxs := make(Hashes, 0, 32*len(unwindTxs.txs)) for i := range unwindTxs.txs { _, ok := p.byHash[string(unwindTxs.txs[i].idHash[:])] if !ok { continue } - newTxs = append(newTxs, unwindTxs.txs[i].idHash[:]...) + notifyNewTxs = append(notifyNewTxs, unwindTxs.txs[i].idHash[:]...) } select { - case p.newTxs <- newTxs: + case p.newTxs <- notifyNewTxs: default: } return nil - /* - localTxHashes = localTxHashes[:0] - p.AppendLocalHashes(last, localTxHashes) - initialAmount := len(localTxHashes) - sentToPeers := send.BroadcastLocalPooledTxs(localTxHashes) - if initialAmount == 1 { - p.logger.Info("local tx propagated", "to_peers_amount", sentToPeers, "tx_hash", localTxHashes) - } else { - p.logger.Info("local byHash propagated", "to_peers_amount", sentToPeers, "txs_amount", initialAmount) - } - - remoteTxHashes = remoteTxHashes[:0] - p.FillRemoteHashesSince(last, remoteTxHashes) - send.BroadcastRemotePooledTxs(remoteTxHashes) - */ } -func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*MetaTx, localsHistory *lru.Cache) error { - for i := range unwindTxs { - if unwindTxs[i].senderID == 0 { +func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*MetaTx, localsHistory *lru.Cache) error { + for i := range unwindTxs.txs { + if unwindTxs.txs[i].senderID == 0 { return fmt.Errorf("senderID can't be zero") } } @@ -282,8 +331,27 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs, minedTxs []*TxSlot } } - if unwindTxs != nil { - unwind(senderInfo, unwindTxs, pending, func(i *MetaTx) { + removeMined(senderInfo, minedTxs, protocolBaseFee, blockBaseFee, pending, baseFee, queued, func(i *MetaTx) { + delete(byHash, string(i.Tx.idHash[:])) + senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i}) + if i.SubPool&IsLocal != 0 { + //TODO: only add to history if sender is not in list of local-senders + localsHistory.Add(i.Tx.idHash, struct{}{}) + } + }) + + // This can be thought of a reverse operation from the one described before. + // When a block that was deemed "the best" of its height, is no longer deemed "the best", the + // transactions contained in it, are now viable for inclusion in other blocks, and therefore should + // be returned into the transaction pool. + // An interesting note here is that if the block contained any transactions local to the node, + // by being first removed from the pool (from the "local" part of it), and then re-injected, + // they effective lose their priority over the "remote" transactions. In order to prevent that, + // somehow the fact that certain transactions were local, needs to be remembered for some + // time (up to some "immutability threshold"). + if len(unwindTxs.txs) > 0 { + //TODO: restore isLocal flag in unwindTxs + unsafeAddToPool(senderInfo, unwindTxs, pending, func(i *MetaTx) { if _, ok := localsHistory.Get(i.Tx.idHash); ok { //TODO: also check if sender is in list of local-senders i.SubPool |= IsLocal @@ -291,7 +359,17 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs, minedTxs []*TxSlot delete(byHash, string(i.Tx.idHash[:])) }) } - forward(senderInfo, minedTxs, protocolBaseFee, blockBaseFee, pending, baseFee, queued, func(i *MetaTx) { + + for i := range senderInfo { + // TODO: aggregate changed senders before call this func + onSenderChange(senderInfo[i], protocolBaseFee, blockBaseFee) + } + + pending.EnforceInvariants() + baseFee.EnforceInvariants() + queued.EnforceInvariants() + + promote(pending, baseFee, queued, func(i *MetaTx) { delete(byHash, string(i.Tx.idHash[:])) senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i}) if i.SubPool&IsLocal != 0 { @@ -303,16 +381,14 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs, minedTxs []*TxSlot return nil } -// forward - apply new highest block (or batch of blocks) +// removeMined - apply new highest block (or batch of blocks) // // 1. New best block arrives, which potentially changes the balance and the nonce of some senders. // We use senderIds data structure to find relevant senderId values, and then use senders data structure to // modify state_balance and state_nonce, potentially remove some elements (if transaction with some nonce is // included into a block), and finally, walk over the transaction records and update SubPool fields depending on // the actual presence of nonce gaps and what the balance is. -func forward(senderInfo map[uint64]*senderInfo, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, discard func(tx *MetaTx)) { - // TODO: change sender.nonce - +func removeMined(senderInfo map[uint64]*senderInfo, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, discard func(tx *MetaTx)) { for _, tx := range minedTxs { sender, ok := senderInfo[tx.senderID] if !ok { @@ -344,39 +420,18 @@ func forward(senderInfo map[uint64]*senderInfo, minedTxs []*TxSlot, protocolBase } return true }) - - // TODO: aggregate changed senders before call this func - onSenderChange(sender, protocolBaseFee, blockBaseFee) } - - pending.EnforceInvariants() - baseFee.EnforceInvariants() - queued.EnforceInvariants() - - promote(pending, baseFee, queued, discard) } // unwind -// This can be thought of a reverse operation from the one described before. -// When a block that was deemed "the best" of its height, is no longer deemed "the best", the -// transactions contained in it, are now viable for inclusion in other blocks, and therefore should -// be returned into the transaction pool. -// An interesting note here is that if the block contained any transactions local to the node, -// by being first removed from the pool (from the "local" part of it), and then re-injected, -// they effective lose their priority over the "remote" transactions. In order to prevent that, -// somehow the fact that certain transactions were local, needs to be remembered for some -// time (up to some "immutability threshold"). -func unwind(senderInfo map[uint64]*senderInfo, unwindTxs []*TxSlot, pending *SubPool, beforeAdd func(tx *MetaTx)) { - // TODO: change sender.nonce - - for _, tx := range unwindTxs { +func unsafeAddToPool(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, to *SubPool, beforeAdd func(tx *MetaTx)) { + for i, tx := range unwindTxs.txs { sender, ok := senderInfo[tx.senderID] if !ok { panic("not implemented yet") } - //TODO: restore isLocal flag - mt := newMetaTx(tx) + mt := newMetaTx(tx, unwindTxs.isLocal[i]) // Insert to pending pool, if pool doesn't have tx with same Nonce and bigger Tip if found := sender.txNonce2Tx.Get(&nonce2TxItem{mt}); found != nil { if tx.tip <= found.(*nonce2TxItem).MetaTx.Tx.tip { @@ -385,9 +440,8 @@ func unwind(senderInfo map[uint64]*senderInfo, unwindTxs []*TxSlot, pending *Sub } beforeAdd(mt) sender.txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{mt}) - pending.UnsafeAdd(mt, PendingSubPool) + to.UnsafeAdd(mt, PendingSubPool) } - pending.EnforceInvariants() } func onSenderChange(sender *senderInfo, protocolBaseFee, blockBaseFee uint64) { diff --git a/txpool/types.go b/txpool/types.go index 22c0bee96..1a863905d 100644 --- a/txpool/types.go +++ b/txpool/types.go @@ -88,6 +88,7 @@ type TxSlot struct { type TxSlots struct { txs []*TxSlot senders []byte // plain 20-byte addresses + isLocal []bool } const (