diff --git a/txpool/pool.go b/txpool/pool.go index cadcf68e9..3645ee3eb 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -138,14 +138,6 @@ func (sc *SendersCache) get(senderID uint64) *senderInfo { } return sender } -func (sc *SendersCache) forEach(f func(info *senderInfo)) { - sc.lock.RLock() - defer sc.lock.RUnlock() - - for i := range sc.senderInfo { - f(sc.senderInfo[i]) - } -} func (sc *SendersCache) len() int { sc.lock.RLock() defer sc.lock.RUnlock() @@ -235,7 +227,14 @@ func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, un id = sc.senderID sc.senderIDs[addr] = id } - sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) + old, ok := sc.senderInfo[id] + if ok { + old.nonce = v.nonce + old.balance = v.balance + v.txNonce2Tx = old.txNonce2Tx + } else { + sc.senderInfo[id] = newSenderInfo(v.nonce, v.balance) + } } /* @@ -253,8 +252,10 @@ func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, un id = sc.senderID sc.senderIDs[string(unwindedTxs.senders.At(i))] = id } - if _, ok := stateChanges[string(unwindedTxs.senders.At(i))]; !ok { - sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0)) + if _, ok := sc.senderInfo[id]; !ok { + if _, ok := stateChanges[string(unwindedTxs.senders.At(i))]; !ok { + sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0)) + } } } @@ -265,8 +266,10 @@ func (sc *SendersCache) mergeStateChanges(stateChanges map[string]senderInfo, un id = sc.senderID sc.senderIDs[string(minedTxs.senders.At(i))] = id } - if _, ok := stateChanges[string(minedTxs.senders.At(i))]; !ok { - sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0)) + if _, ok := sc.senderInfo[id]; !ok { + if _, ok := stateChanges[string(minedTxs.senders.At(i))]; !ok { + sc.senderInfo[id] = newSenderInfo(0, *uint256.NewInt(0)) + } } //if v, ok := stateChanges[string(minedTxs.senders.At(i))]; ok { @@ -449,7 +452,7 @@ func (p *TxPool) Started() bool { } func (p *TxPool) Add(coreDB kv.RoDB, newTxs TxSlots, senders *SendersCache) error { - t := time.Now() + //t := time.Now() if err := senders.onNewTxs(coreDB, newTxs); err != nil { return err } @@ -481,7 +484,7 @@ func (p *TxPool) Add(coreDB kv.RoDB, newTxs TxSlots, senders *SendersCache) erro } } - log.Info("on new txs", "in", time.Since(t)) + //log.Info("on new txs", "in", time.Since(t)) return nil } func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *simplelru.LRU) error { @@ -491,7 +494,11 @@ func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBas } } + changedSenders := map[uint64]*senderInfo{} + unsafeAddToPool(senders, newTxs, pending, PendingSubPool, func(i *metaTx, sender *senderInfo) { + changedSenders[i.Tx.senderID] = sender + if _, ok := localsHistory.Get(i.Tx.idHash); ok { //TODO: also check if sender is in list of local-senders i.subPool |= IsLocal @@ -514,10 +521,9 @@ func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBas } }) - senders.forEach(func(sender *senderInfo) { - // TODO: aggregate changed senders before call this func + for _, sender := range changedSenders { onSenderChange(sender, protocolBaseFee, pendingBaseFee) - }) + } pending.EnforceInvariants() baseFee.EnforceInvariants() @@ -550,11 +556,11 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, pendingBaseFee, blockHeight uint64, senders *SendersCache) error { t := time.Now() + protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) if err := senders.onNewBlock(stateChanges, unwindTxs, minedTxs, blockHeight); err != nil { return err } //log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) - protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) if err := unwindTxs.Valid(); err != nil { return err } @@ -634,6 +640,8 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr log.Info("remove mined", "removed", j, "minedTxsLen", len(minedTxs)) } + changedSenders := make(map[uint64]*senderInfo, len(unwindTxs.txs)/4) + // This can be thought of a reverse operation from the one described before. // When a block that was deemed "the best" of its height, is no longer deemed "the best", the // transactions contained in it, are now viable for inclusion in other blocks, and therefore should @@ -644,6 +652,7 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr // somehow the fact that certain transactions were local, needs to be remembered for some // time (up to some "immutability threshold"). unsafeAddToPool(senders, unwindTxs, pending, PendingSubPool, func(i *metaTx, sender *senderInfo) { + changedSenders[i.Tx.senderID] = sender //fmt.Printf("add: %d,%d\n", i.Tx.senderID, i.Tx.nonce) if _, ok := localsHistory.Get(i.Tx.idHash); ok { //TODO: also check if sender is in list of local-senders @@ -667,10 +676,9 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr } }) - senders.forEach(func(sender *senderInfo) { - // TODO: aggregate changed senders before call this func + for _, sender := range changedSenders { onSenderChange(sender, protocolBaseFee, pendingBaseFee) - }) + } pending.EnforceInvariants() baseFee.EnforceInvariants()