Merge pull request #39 from ledgerwatch/pool23

Pool: re-calculate only changed senders
This commit is contained in:
Alex Sharov 2021-08-21 19:52:30 +07:00 committed by GitHub
commit be56af8ffb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -138,14 +138,6 @@ func (sc *SendersCache) get(senderID uint64) *senderInfo {
} }
return sender return sender
} }
func (sc *SendersCache) forEach(f func(info *senderInfo)) {
sc.lock.RLock()
defer sc.lock.RUnlock()
for i := range sc.senderInfo {
f(sc.senderInfo[i])
}
}
func (sc *SendersCache) len() int { func (sc *SendersCache) len() int {
sc.lock.RLock() sc.lock.RLock()
defer sc.lock.RUnlock() defer sc.lock.RUnlock()
@ -235,7 +227,14 @@ func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, un
id = sc.senderID id = sc.senderID
sc.senderIDs[addr] = id sc.senderIDs[addr] = id
} }
sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) old, ok := sc.senderInfo[id]
if ok {
old.nonce = v.nonce
old.balance = v.balance
v.txNonce2Tx = old.txNonce2Tx
} else {
sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance)
}
} }
/* /*
@ -253,8 +252,10 @@ func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, un
id = sc.senderID id = sc.senderID
sc.senderIDs[string(unwindedTxs.senders.At(i))] = id sc.senderIDs[string(unwindedTxs.senders.At(i))] = id
} }
if _, ok := stateChanges[string(unwindedTxs.senders.At(i))]; !ok { if _, ok := sc.senderInfo[id]; !ok {
sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0)) if _, ok := stateChanges[string(unwindedTxs.senders.At(i))]; !ok {
sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0))
}
} }
} }
@ -265,8 +266,10 @@ func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, un
id = sc.senderID id = sc.senderID
sc.senderIDs[string(minedTxs.senders.At(i))] = id sc.senderIDs[string(minedTxs.senders.At(i))] = id
} }
if _, ok := stateChanges[string(minedTxs.senders.At(i))]; !ok { if _, ok := sc.senderInfo[id]; !ok {
sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0)) if _, ok := stateChanges[string(minedTxs.senders.At(i))]; !ok {
sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0))
}
} }
//if v, ok := stateChanges[string(minedTxs.senders.At(i))]; ok { //if v, ok := stateChanges[string(minedTxs.senders.At(i))]; ok {
@ -449,7 +452,7 @@ func (p *TxPool) Started() bool {
} }
func (p *TxPool) Add(coreDB kv.RoDB, newTxs TxSlots, senders *SendersCache) error { func (p *TxPool) Add(coreDB kv.RoDB, newTxs TxSlots, senders *SendersCache) error {
t := time.Now() //t := time.Now()
if err := senders.onNewTxs(coreDB, newTxs); err != nil { if err := senders.onNewTxs(coreDB, newTxs); err != nil {
return err return err
} }
@ -481,7 +484,7 @@ func (p *TxPool) Add(coreDB kv.RoDB, newTxs TxSlots, senders *SendersCache) erro
} }
} }
log.Info("on new txs", "in", time.Since(t)) //log.Info("on new txs", "in", time.Since(t))
return nil return nil
} }
func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *simplelru.LRU) error { func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *simplelru.LRU) error {
@ -491,7 +494,11 @@ func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBas
} }
} }
changedSenders := map[uint64]*senderInfo{}
unsafeAddToPool(senders, newTxs, pending, PendingSubPool, func(i *metaTx, sender *senderInfo) { unsafeAddToPool(senders, newTxs, pending, PendingSubPool, func(i *metaTx, sender *senderInfo) {
changedSenders[i.Tx.senderID] = sender
if _, ok := localsHistory.Get(i.Tx.idHash); ok { if _, ok := localsHistory.Get(i.Tx.idHash); ok {
//TODO: also check if sender is in list of local-senders //TODO: also check if sender is in list of local-senders
i.subPool |= IsLocal i.subPool |= IsLocal
@ -514,10 +521,9 @@ func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBas
} }
}) })
senders.forEach(func(sender *senderInfo) { for _, sender := range changedSenders {
// TODO: aggregate changed senders before call this func
onSenderChange(sender, protocolBaseFee, pendingBaseFee) onSenderChange(sender, protocolBaseFee, pendingBaseFee)
}) }
pending.EnforceInvariants() pending.EnforceInvariants()
baseFee.EnforceInvariants() baseFee.EnforceInvariants()
@ -550,11 +556,11 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin
func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error { func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error {
t := time.Now() t := time.Now()
protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee)
if err := senders.onNewBlock(stateChanges, unwindTxs, minedTxs, blockHeight); err != nil { if err := senders.onNewBlock(stateChanges, unwindTxs, minedTxs, blockHeight); err != nil {
return err return err
} }
//log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) //log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight)
protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee)
if err := unwindTxs.Valid(); err != nil { if err := unwindTxs.Valid(); err != nil {
return err return err
} }
@ -634,6 +640,8 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr
log.Info("remove mined", "removed", j, "minedTxsLen", len(minedTxs)) log.Info("remove mined", "removed", j, "minedTxsLen", len(minedTxs))
} }
changedSenders := make(map[uint64]*senderInfo, len(unwindTxs.txs)/4)
// This can be thought of a reverse operation from the one described before. // This can be thought of a reverse operation from the one described before.
// When a block that was deemed "the best" of its height, is no longer deemed "the best", the // When a block that was deemed "the best" of its height, is no longer deemed "the best", the
// transactions contained in it, are now viable for inclusion in other blocks, and therefore should // transactions contained in it, are now viable for inclusion in other blocks, and therefore should
@ -644,6 +652,7 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr
// somehow the fact that certain transactions were local, needs to be remembered for some // somehow the fact that certain transactions were local, needs to be remembered for some
// time (up to some "immutability threshold"). // time (up to some "immutability threshold").
unsafeAddToPool(senders, unwindTxs, pending, PendingSubPool, func(i *metaTx, sender *senderInfo) { unsafeAddToPool(senders, unwindTxs, pending, PendingSubPool, func(i *metaTx, sender *senderInfo) {
changedSenders[i.Tx.senderID] = sender
//fmt.Printf("add: %d,%d\n", i.Tx.senderID, i.Tx.nonce) //fmt.Printf("add: %d,%d\n", i.Tx.senderID, i.Tx.nonce)
if _, ok := localsHistory.Get(i.Tx.idHash); ok { if _, ok := localsHistory.Get(i.Tx.idHash); ok {
//TODO: also check if sender is in list of local-senders //TODO: also check if sender is in list of local-senders
@ -667,10 +676,9 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr
} }
}) })
senders.forEach(func(sender *senderInfo) { for _, sender := range changedSenders {
// TODO: aggregate changed senders before call this func
onSenderChange(sender, protocolBaseFee, pendingBaseFee) onSenderChange(sender, protocolBaseFee, pendingBaseFee)
}) }
pending.EnforceInvariants() pending.EnforceInvariants()
baseFee.EnforceInvariants() baseFee.EnforceInvariants()