add onNewTxs method

This commit is contained in:
alex.sharov 2021-08-05 09:48:37 +07:00
parent 1e8a19ac37
commit e2ddc55c95
2 changed files with 124 additions and 69 deletions

View File

@ -71,8 +71,12 @@ type MetaTx struct {
currentSubPool SubPoolType
}
func newMetaTx(slot *TxSlot) *MetaTx {
return &MetaTx{Tx: slot, worstIndex: -1, bestIndex: -1}
func newMetaTx(slot *TxSlot, isLocal bool) *MetaTx {
mt := &MetaTx{Tx: slot, worstIndex: -1, bestIndex: -1}
if isLocal {
mt.SubPool = IsLocal
}
return mt
}
type SubPoolType uint8
@ -204,6 +208,78 @@ func (p *TxPool) IdHashIsLocal(hash []byte) bool {
return txn.SubPool&IsLocal != 0
}
func (p *TxPool) OnNewPeer(peerID PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) }
func (p *TxPool) OnNewTxs(newTxs TxSlots) error {
p.lock.Lock()
defer p.lock.Unlock()
for i := range newTxs.txs {
id, ok := p.senderIDs[string(newTxs.senders[i*20:(i+1)*20])]
if !ok {
for i := range p.senderInfo { //TODO: create field for it?
if id < i {
id = i
}
}
id++
p.senderIDs[string(newTxs.senders[i*20:(i+1)*20])] = id
newTxs.txs[i].senderID = id
}
}
if err := onNewTxs(p.senderInfo, newTxs, p.protocolBaseFee.Load(), p.blockBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil {
return err
}
notifyNewTxs := make(Hashes, 0, 32*len(newTxs.txs))
for i := range newTxs.txs {
_, ok := p.byHash[string(newTxs.txs[i].idHash[:])]
if !ok {
continue
}
notifyNewTxs = append(notifyNewTxs, newTxs.txs[i].idHash[:]...)
}
select {
case p.newTxs <- notifyNewTxs:
default:
}
return nil
}
func onNewTxs(senderInfo map[uint64]*senderInfo, newTxs TxSlots, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*MetaTx, localsHistory *lru.Cache) error {
for i := range newTxs.txs {
if newTxs.txs[i].senderID == 0 {
return fmt.Errorf("senderID can't be zero")
}
}
unsafeAddToPool(senderInfo, newTxs, queued, func(i *MetaTx) {
if _, ok := localsHistory.Get(i.Tx.idHash); ok {
//TODO: also check if sender is in list of local-senders
i.SubPool |= IsLocal
}
delete(byHash, string(i.Tx.idHash[:]))
})
for i := range senderInfo {
// TODO: aggregate changed senders before call this func
onSenderChange(senderInfo[i], protocolBaseFee, blockBaseFee)
}
pending.EnforceInvariants()
baseFee.EnforceInvariants()
queued.EnforceInvariants()
promote(pending, baseFee, queued, func(i *MetaTx) {
delete(byHash, string(i.Tx.idHash[:]))
senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i})
if i.SubPool&IsLocal != 0 {
//TODO: only add to history if sender is not in list of local-senders
localsHistory.Add(i.Tx.idHash, struct{}{})
}
})
return nil
}
func (p *TxPool) OnNewBlock(unwindTxs, minedTxs TxSlots, protocolBaseFee, blockBaseFee uint64) error {
p.lock.Lock()
defer p.lock.Unlock()
@ -223,56 +299,29 @@ func (p *TxPool) OnNewBlock(unwindTxs, minedTxs TxSlots, protocolBaseFee, blockB
unwindTxs.txs[i].senderID = id
}
}
/*
for i := range unwindTxs {
unwindTxs[i].senderID == 0
info, ok := s.cache[id]
if ok {
return info
}
p.tx.GetOne(kv.PlainState)
}
*/
// TODO: assign senderID to all transactions
if err := onNewBlock(p.senderInfo, unwindTxs.txs, minedTxs.txs, protocolBaseFee, blockBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil {
if err := onNewBlock(p.senderInfo, unwindTxs, minedTxs.txs, protocolBaseFee, blockBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil {
return err
}
newTxs := make(Hashes, 0, 32*len(unwindTxs.txs))
notifyNewTxs := make(Hashes, 0, 32*len(unwindTxs.txs))
for i := range unwindTxs.txs {
_, ok := p.byHash[string(unwindTxs.txs[i].idHash[:])]
if !ok {
continue
}
newTxs = append(newTxs, unwindTxs.txs[i].idHash[:]...)
notifyNewTxs = append(notifyNewTxs, unwindTxs.txs[i].idHash[:]...)
}
select {
case p.newTxs <- newTxs:
case p.newTxs <- notifyNewTxs:
default:
}
return nil
/*
localTxHashes = localTxHashes[:0]
p.AppendLocalHashes(last, localTxHashes)
initialAmount := len(localTxHashes)
sentToPeers := send.BroadcastLocalPooledTxs(localTxHashes)
if initialAmount == 1 {
p.logger.Info("local tx propagated", "to_peers_amount", sentToPeers, "tx_hash", localTxHashes)
} else {
p.logger.Info("local byHash propagated", "to_peers_amount", sentToPeers, "txs_amount", initialAmount)
}
remoteTxHashes = remoteTxHashes[:0]
p.FillRemoteHashesSince(last, remoteTxHashes)
send.BroadcastRemotePooledTxs(remoteTxHashes)
*/
}
func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*MetaTx, localsHistory *lru.Cache) error {
for i := range unwindTxs {
if unwindTxs[i].senderID == 0 {
func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*MetaTx, localsHistory *lru.Cache) error {
for i := range unwindTxs.txs {
if unwindTxs.txs[i].senderID == 0 {
return fmt.Errorf("senderID can't be zero")
}
}
@ -282,8 +331,27 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs, minedTxs []*TxSlot
}
}
if unwindTxs != nil {
unwind(senderInfo, unwindTxs, pending, func(i *MetaTx) {
removeMined(senderInfo, minedTxs, protocolBaseFee, blockBaseFee, pending, baseFee, queued, func(i *MetaTx) {
delete(byHash, string(i.Tx.idHash[:]))
senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i})
if i.SubPool&IsLocal != 0 {
//TODO: only add to history if sender is not in list of local-senders
localsHistory.Add(i.Tx.idHash, struct{}{})
}
})
// This can be thought of a reverse operation from the one described before.
// When a block that was deemed "the best" of its height, is no longer deemed "the best", the
// transactions contained in it, are now viable for inclusion in other blocks, and therefore should
// be returned into the transaction pool.
// An interesting note here is that if the block contained any transactions local to the node,
// by being first removed from the pool (from the "local" part of it), and then re-injected,
// they effective lose their priority over the "remote" transactions. In order to prevent that,
// somehow the fact that certain transactions were local, needs to be remembered for some
// time (up to some "immutability threshold").
if len(unwindTxs.txs) > 0 {
//TODO: restore isLocal flag in unwindTxs
unsafeAddToPool(senderInfo, unwindTxs, pending, func(i *MetaTx) {
if _, ok := localsHistory.Get(i.Tx.idHash); ok {
//TODO: also check if sender is in list of local-senders
i.SubPool |= IsLocal
@ -291,7 +359,17 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs, minedTxs []*TxSlot
delete(byHash, string(i.Tx.idHash[:]))
})
}
forward(senderInfo, minedTxs, protocolBaseFee, blockBaseFee, pending, baseFee, queued, func(i *MetaTx) {
for i := range senderInfo {
// TODO: aggregate changed senders before call this func
onSenderChange(senderInfo[i], protocolBaseFee, blockBaseFee)
}
pending.EnforceInvariants()
baseFee.EnforceInvariants()
queued.EnforceInvariants()
promote(pending, baseFee, queued, func(i *MetaTx) {
delete(byHash, string(i.Tx.idHash[:]))
senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i})
if i.SubPool&IsLocal != 0 {
@ -303,16 +381,14 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs, minedTxs []*TxSlot
return nil
}
// forward - apply new highest block (or batch of blocks)
// removeMined - apply new highest block (or batch of blocks)
//
// 1. New best block arrives, which potentially changes the balance and the nonce of some senders.
// We use senderIds data structure to find relevant senderId values, and then use senders data structure to
// modify state_balance and state_nonce, potentially remove some elements (if transaction with some nonce is
// included into a block), and finally, walk over the transaction records and update SubPool fields depending on
// the actual presence of nonce gaps and what the balance is.
func forward(senderInfo map[uint64]*senderInfo, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, discard func(tx *MetaTx)) {
// TODO: change sender.nonce
func removeMined(senderInfo map[uint64]*senderInfo, minedTxs []*TxSlot, protocolBaseFee, blockBaseFee uint64, pending, baseFee, queued *SubPool, discard func(tx *MetaTx)) {
for _, tx := range minedTxs {
sender, ok := senderInfo[tx.senderID]
if !ok {
@ -344,39 +420,18 @@ func forward(senderInfo map[uint64]*senderInfo, minedTxs []*TxSlot, protocolBase
}
return true
})
// TODO: aggregate changed senders before call this func
onSenderChange(sender, protocolBaseFee, blockBaseFee)
}
pending.EnforceInvariants()
baseFee.EnforceInvariants()
queued.EnforceInvariants()
promote(pending, baseFee, queued, discard)
}
// unwind
// This can be thought of a reverse operation from the one described before.
// When a block that was deemed "the best" of its height, is no longer deemed "the best", the
// transactions contained in it, are now viable for inclusion in other blocks, and therefore should
// be returned into the transaction pool.
// An interesting note here is that if the block contained any transactions local to the node,
// by being first removed from the pool (from the "local" part of it), and then re-injected,
// they effective lose their priority over the "remote" transactions. In order to prevent that,
// somehow the fact that certain transactions were local, needs to be remembered for some
// time (up to some "immutability threshold").
func unwind(senderInfo map[uint64]*senderInfo, unwindTxs []*TxSlot, pending *SubPool, beforeAdd func(tx *MetaTx)) {
// TODO: change sender.nonce
for _, tx := range unwindTxs {
func unsafeAddToPool(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, to *SubPool, beforeAdd func(tx *MetaTx)) {
for i, tx := range unwindTxs.txs {
sender, ok := senderInfo[tx.senderID]
if !ok {
panic("not implemented yet")
}
//TODO: restore isLocal flag
mt := newMetaTx(tx)
mt := newMetaTx(tx, unwindTxs.isLocal[i])
// Insert to pending pool, if pool doesn't have tx with same Nonce and bigger Tip
if found := sender.txNonce2Tx.Get(&nonce2TxItem{mt}); found != nil {
if tx.tip <= found.(*nonce2TxItem).MetaTx.Tx.tip {
@ -385,9 +440,8 @@ func unwind(senderInfo map[uint64]*senderInfo, unwindTxs []*TxSlot, pending *Sub
}
beforeAdd(mt)
sender.txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{mt})
pending.UnsafeAdd(mt, PendingSubPool)
to.UnsafeAdd(mt, PendingSubPool)
}
pending.EnforceInvariants()
}
func onSenderChange(sender *senderInfo, protocolBaseFee, blockBaseFee uint64) {

View File

@ -88,6 +88,7 @@ type TxSlot struct {
type TxSlots struct {
txs []*TxSlot
senders []byte // plain 20-byte addresses
isLocal []bool
}
const (