diff --git a/txpool/fetch.go b/txpool/fetch.go index 3852e01b0..402312c5b 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -177,18 +177,16 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl if req == nil { return nil } - go func(req *sentry.InboundMessage) { - if err := f.handleInboundMessage(streamCtx, req, sentryClient); err != nil { - s, ok := status.FromError(err) - doLog := !((ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)) - if doLog { - log.Warn("Handling incoming message", "err", err) - } + if err := f.handleInboundMessage(streamCtx, req, sentryClient); err != nil { + s, ok := status.FromError(err) + doLog := !((ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)) + if doLog { + log.Warn("Handling incoming message", "err", err) } - if f.wg != nil { - f.wg.Done() - } - }(req) + } + if f.wg != nil { + f.wg.Done() + } } } diff --git a/txpool/pool.go b/txpool/pool.go index ce4726da7..83c2cdd84 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -173,7 +173,8 @@ func (sc *SendersCache) evict() int { func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error { sc.ensureSenderIDOnNewTxs(newTxs) - diff, err := sc.setTxSenderID(coreDBTx, newTxs) + toLoad := sc.setTxSenderID(newTxs) + diff, err := loadSenders(coreDBTx, toLoad) if err != nil { return err } @@ -183,12 +184,14 @@ func (sc *SendersCache) onNewTxs(coreDBTx kv.Tx, newTxs TxSlots) error { func (sc *SendersCache) onNewBlock(coreDBTx kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots) error { sc.mergeStateChanges(stateChanges, unwindTxs, minedTxs) - diff, err := sc.setTxSenderID(coreDBTx, unwindTxs) + toLoad := sc.setTxSenderID(unwindTxs) + diff, err := loadSenders(coreDBTx, toLoad) if err != nil { return err } sc.set(diff) - diff, err = sc.setTxSenderID(coreDBTx, minedTxs) + toLoad = sc.setTxSenderID(minedTxs) + diff, err = loadSenders(coreDBTx, toLoad) if err != nil { return err } @@ -260,10 +263,10 @@ func (sc *SendersCache) ensureSenderIDOnNewTxs(newTxs TxSlots) { } } -func (sc *SendersCache) setTxSenderID(coreDB kv.Tx, txs TxSlots) (map[uint64]*senderInfo, error) { +func (sc *SendersCache) setTxSenderID(txs TxSlots) map[uint64]string { sc.lock.RLock() defer sc.lock.RUnlock() - diff := map[uint64]*senderInfo{} + toLoad := map[uint64]string{} for i := range txs.txs { addr := string(txs.senders.At(i)) @@ -272,20 +275,30 @@ func (sc *SendersCache) setTxSenderID(coreDB kv.Tx, txs TxSlots) (map[uint64]*se // load data from db if need _, ok := sc.senderInfo[txs.txs[i].senderID] - if !ok { - _, ok = diff[txs.txs[i].senderID] - if !ok { - encoded, err := coreDB.GetOne(kv.PlainState, txs.senders.At(i)) - if err != nil { - return nil, err - } - nonce, balance, err := DecodeSender(encoded) - if err != nil { - return nil, err - } - diff[txs.txs[i].senderID] = newSenderInfo(nonce, balance) - } + if ok { + continue } + _, ok = toLoad[txs.txs[i].senderID] + if ok { + continue + } + toLoad[txs.txs[i].senderID] = addr + } + return toLoad +} + +func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]*senderInfo, error) { + diff := make(map[uint64]*senderInfo, len(toLoad)) + for id := range toLoad { + encoded, err := coreDB.GetOne(kv.PlainState, []byte(toLoad[id])) + if err != nil { + return nil, err + } + nonce, balance, err := DecodeSender(encoded) + if err != nil { + return nil, err + } + diff[id] = newSenderInfo(nonce, balance) } return diff, nil } @@ -337,9 +350,10 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) { } func (p *TxPool) logStats() { + protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load() + p.lock.RLock() defer p.lock.RUnlock() - protocolBaseFee, pendingBaseFee := p.protocolBaseFee.Load(), p.pendingBaseFee.Load() log.Info(fmt.Sprintf("[txpool] baseFee: protocol=%d,pending=%d; queues size: pending=%d/%d, baseFee=%d/%d, queued=%d/%d", protocolBaseFee, pendingBaseFee, p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.queued.Len(), QueuedSubPoolLimit)) } func (p *TxPool) GetRlp(hash []byte) []byte { @@ -421,7 +435,6 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error if err := onNewTxs(senders, newTxs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil { return err } - fmt.Printf("len1: %d, %d\n", len(p.byHash), len(newTxs.txs)) notifyNewTxs := make(Hashes, 0, 32*len(newTxs.txs)) for i := range newTxs.txs { _, ok := p.byHash[string(newTxs.txs[i].idHash[:])] @@ -440,20 +453,19 @@ func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots, senders *SendersCache) error return nil } func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBaseFee uint64, pending, baseFee, queued *SubPool, byHash map[string]*metaTx, localsHistory *simplelru.LRU) error { - defer func(t time.Time) { fmt.Printf("pool.go:444: %s\n", time.Since(t)) }(time.Now()) for i := range newTxs.txs { if newTxs.txs[i].senderID == 0 { return fmt.Errorf("senderID can't be zero") } } - unsafeAddToPool(senders, newTxs, pending, PendingSubPool, func(i *metaTx) { + unsafeAddToPool(senders, newTxs, pending, PendingSubPool, func(i *metaTx, sender *senderInfo) { if _, ok := localsHistory.Get(i.Tx.idHash); ok { //TODO: also check if sender is in list of local-senders i.subPool |= IsLocal } byHash[string(i.Tx.idHash[:])] = i - replaced := senders.get(i.Tx.senderID).txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{i}) + replaced := sender.txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{i}) if replaced != nil { replacedMT := replaced.(*nonce2TxItem).metaTx delete(byHash, string(replacedMT.Tx.idHash[:])) @@ -469,13 +481,11 @@ func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBas } } }) - fmt.Printf("aa: %d\n", len(byHash)) senders.forEach(func(sender *senderInfo) { // TODO: aggregate changed senders before call this func onSenderChange(sender, protocolBaseFee, pendingBaseFee) }) - defer func(t time.Time) { fmt.Printf("pool.go:478: %s\n", time.Since(t)) }(time.Now()) pending.EnforceInvariants() baseFee.EnforceInvariants() @@ -489,7 +499,6 @@ func onNewTxs(senders *SendersCache, newTxs TxSlots, protocolBaseFee, pendingBas localsHistory.Add(i.Tx.idHash, struct{}{}) } }) - fmt.Printf("cc: %d\n", len(byHash)) return nil } @@ -525,11 +534,9 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un p.lock.Lock() defer p.lock.Unlock() //log.Debug("[txpool.onNewBlock]", "senderInfo", len(p.senderInfo)) - fmt.Printf("len2: %d\n", len(p.byHash)) if err := onNewBlock(senders, unwindTxs, minedTxs.txs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil { return err } - fmt.Printf("len3: %d\n", len(p.byHash)) notifyNewTxs := make(Hashes, 0, 32*len(unwindTxs.txs)) for i := range unwindTxs.txs { @@ -599,14 +606,14 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr // they effective lose their priority over the "remote" transactions. In order to prevent that, // somehow the fact that certain transactions were local, needs to be remembered for some // time (up to some "immutability threshold"). - unsafeAddToPool(senders, unwindTxs, pending, PendingSubPool, func(i *metaTx) { + unsafeAddToPool(senders, unwindTxs, pending, PendingSubPool, func(i *metaTx, sender *senderInfo) { //fmt.Printf("add: %d,%d\n", i.Tx.senderID, i.Tx.nonce) if _, ok := localsHistory.Get(i.Tx.idHash); ok { //TODO: also check if sender is in list of local-senders i.subPool |= IsLocal } byHash[string(i.Tx.idHash[:])] = i - replaced := senders.get(i.Tx.senderID).txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{i}) + replaced := sender.txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{i}) if replaced != nil { replacedMT := replaced.(*nonce2TxItem).metaTx delete(byHash, string(replacedMT.Tx.idHash[:])) @@ -689,19 +696,19 @@ func removeMined(senders *SendersCache, minedTxs []*TxSlot, pending, baseFee, qu } // unwind -func unsafeAddToPool(senders *SendersCache, unwindTxs TxSlots, to *SubPool, subPoolType SubPoolType, beforeAdd func(tx *metaTx)) { - defer func(t time.Time) { fmt.Printf("unsafeAddToPool %s\n", time.Since(t)) }(time.Now()) +func unsafeAddToPool(senders *SendersCache, unwindTxs TxSlots, to *SubPool, subPoolType SubPoolType, beforeAdd func(tx *metaTx, sender *senderInfo)) { for i, tx := range unwindTxs.txs { mt := newMetaTx(tx, unwindTxs.isLocal[i]) + sender := senders.get(tx.senderID) // Insert to pending pool, if pool doesn't have tx with same Nonce and bigger Tip - if found := senders.getTx(tx.senderID, mt); found != nil { - if tx.tip <= found.Tx.tip { + if found := sender.txNonce2Tx.Get(&nonce2TxItem{mt}); found != nil { + if tx.tip <= found.(*nonce2TxItem).Tx.tip { continue } //mt = found } - beforeAdd(mt) + beforeAdd(mt, sender) to.UnsafeAdd(mt, subPoolType) } }