diff --git a/txpool/fetch.go b/txpool/fetch.go index 402312c5b..5e7fa6160 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -193,6 +193,9 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMessage, sentryClient sentry.SentryClient) error { switch req.Id { case sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65: + if !f.pool.Started() { + return nil + } hashCount, pos, err := ParseHashesCount(req.Data, 0) if err != nil { return fmt.Errorf("parsing NewPooledTransactionHashes: %w", err) @@ -228,6 +231,9 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes } } case sentry.MessageId_GET_POOLED_TRANSACTIONS_66, sentry.MessageId_GET_POOLED_TRANSACTIONS_65: + if !f.pool.Started() { + return nil + } //TODO: handleInboundMessage is single-threaded - means it can accept as argument couple buffers (or analog of txParseContext). Protobuf encoding will copy data anyway, but DirectClient doesn't var encodedRequest []byte messageId := sentry.MessageId_POOLED_TRANSACTIONS_66 diff --git a/txpool/packets.go b/txpool/packets.go index 44b7af7fb..63cfa1da0 100644 --- a/txpool/packets.go +++ b/txpool/packets.go @@ -188,7 +188,6 @@ func ParsePooledTransactions65(payload []byte, pos int, ctx *TxParseContext, txS pos, err = ctx.ParseTransaction(payload, pos, txSlots.txs[i], txSlots.senders.At(i)) if err != nil { if errors.Is(err, ErrRejected) { - fmt.Printf("rejected\n") continue } return 0, err @@ -217,7 +216,6 @@ func ParsePooledTransactions66(payload []byte, pos int, ctx *TxParseContext, txS pos, err = ctx.ParseTransaction(payload, pos, txSlots.txs[i], txSlots.senders.At(i)) if err != nil { if errors.Is(err, ErrRejected) { - fmt.Printf("rejected\n") continue } return requestID, 0, err diff --git a/txpool/pool.go b/txpool/pool.go index ffdf541a9..791fd2945 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -146,6 +146,11 @@ func (sc *SendersCache) forEach(f func(info *senderInfo)) { f(sc.senderInfo[i]) } } +func (sc *SendersCache) len() int { + sc.lock.RLock() + defer sc.lock.RUnlock() + return len(sc.senderInfo) +} func (sc *SendersCache) evict() int { sc.lock.Lock() @@ -502,7 +507,6 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin if hasNewVal { p.pendingBaseFee.Store(pendingBaseFee) } - log.Debug("set base fee", "protocol", protocolBaseFee, "pending", pendingBaseFee) return protocolBaseFee, p.pendingBaseFee.Load() } @@ -510,7 +514,7 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un if err := senders.onNewBlock(coreDB, stateChanges, unwindTxs, minedTxs, blockHeight); err != nil { return err } - log.Debug("[txpool.onNewBlock]", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) + log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight) protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee) if err := unwindTxs.Valid(); err != nil { return err @@ -521,7 +525,6 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un p.lock.Lock() defer p.lock.Unlock() - //log.Debug("[txpool.onNewBlock]", "senderInfo", len(p.senderInfo)) if err := onNewBlock(senders, unwindTxs, minedTxs.txs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil { return err } @@ -1019,7 +1022,9 @@ func BroadcastLoop(ctx context.Context, db kv.RwDB, p *TxPool, senders *SendersC case <-evictSendersEvery.C: // evict sendersInfo without txs count := senders.evict() - log.Debug("evicted senders", "amount", count) + if count > 0 { + log.Debug("evicted senders", "amount", count) + } if db != nil { if err := db.Update(ctx, func(tx kv.RwTx) error { return p.flushIsLocalHistory(tx)