diff --git a/txpool/fetch.go b/txpool/fetch.go index 1f70ba930..e738a15ff 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -122,69 +122,71 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) { } _, err := sentryClient.SetStatus(f.ctx, f.statusData, grpc.WaitForReady(true)) if err != nil { - if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { - return + s, ok := status.FromError(err) + retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) + if retryLater { + time.Sleep(time.Second) + continue } // Report error and wait more - log.Warn("sentry not ready yet", "err", err) - time.Sleep(time.Second) + log.Warn("[txpool.recvMessage] sentry not ready yet", "err", err) continue } - streamCtx, cancel := context.WithCancel(f.ctx) - defer cancel() - stream, err := sentryClient.Messages(streamCtx, &sentry.MessagesRequest{Ids: []sentry.MessageId{ - sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65, - sentry.MessageId_GET_POOLED_TRANSACTIONS_65, - sentry.MessageId_TRANSACTIONS_65, - sentry.MessageId_POOLED_TRANSACTIONS_65, - sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, - sentry.MessageId_GET_POOLED_TRANSACTIONS_66, - sentry.MessageId_TRANSACTIONS_66, - sentry.MessageId_POOLED_TRANSACTIONS_66, - }}, grpc.WaitForReady(true)) + if err := f.receiveMessage(f.ctx, sentryClient); err != nil { + s, ok := status.FromError(err) + retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) + if retryLater { + time.Sleep(time.Second) + continue + } + + log.Warn("[txpool.recvMessage]", "err", err) + continue + } + } +} + +func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryClient) error { + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + stream, err := sentryClient.Messages(streamCtx, &sentry.MessagesRequest{Ids: []sentry.MessageId{ + sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65, + sentry.MessageId_GET_POOLED_TRANSACTIONS_65, + sentry.MessageId_TRANSACTIONS_65, + sentry.MessageId_POOLED_TRANSACTIONS_65, + sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, + sentry.MessageId_GET_POOLED_TRANSACTIONS_66, + sentry.MessageId_TRANSACTIONS_66, + sentry.MessageId_POOLED_TRANSACTIONS_66, + }}, grpc.WaitForReady(true)) + if err != nil { + select { + case <-f.ctx.Done(): + return ctx.Err() + default: + } + return err + } + + var req *sentry.InboundMessage + for req, err = stream.Recv(); ; req, err = stream.Recv() { if err != nil { select { case <-f.ctx.Done(): - return + return ctx.Err() default: } - if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { - return - } - if errors.Is(err, io.EOF) { - return - } - log.Warn("messages", "err", err) - return + return err } - - var req *sentry.InboundMessage - for req, err = stream.Recv(); ; req, err = stream.Recv() { - if err != nil { - select { - case <-f.ctx.Done(): - return - default: - } - if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { - return - } - if errors.Is(err, io.EOF) { - return - } - log.Warn("stream.Recv", "err", err) - return - } - if req == nil { - return - } - if err = f.handleInboundMessage(streamCtx, req, sentryClient); err != nil { - log.Warn("Handling incoming message: %s", "err", err) - } - if f.wg != nil { - f.wg.Done() - } + if req == nil { + return nil + } + if err = f.handleInboundMessage(streamCtx, req, sentryClient); err != nil { + log.Warn("Handling incoming message: %s", "err", err) + } + if f.wg != nil { + f.wg.Done() } } } @@ -301,60 +303,58 @@ func (f *Fetch) receivePeerLoop(sentryClient sentry.SentryClient) { } _, err := sentryClient.SetStatus(f.ctx, f.statusData, grpc.WaitForReady(true)) if err != nil { - if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { - return + s, ok := status.FromError(err) + retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) + if retryLater { + time.Sleep(time.Second) + continue } // Report error and wait more - log.Warn("sentry not ready yet", "err", err) + log.Warn("[txpool.recvPeers] sentry not ready yet", "err", err) time.Sleep(time.Second) continue } - streamCtx, cancel := context.WithCancel(f.ctx) - defer cancel() + if err := f.receivePeer(sentryClient); err != nil { + s, ok := status.FromError(err) + retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) + if retryLater { + time.Sleep(time.Second) + continue + } - stream, err := sentryClient.Peers(streamCtx, &sentry.PeersRequest{}) - if err != nil { - select { - case <-f.ctx.Done(): - return - default: - } - if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { - return - } - if errors.Is(err, io.EOF) { - return - } - log.Warn("peers", "err", err) + log.Warn("[txpool.recvPeers]", "err", err) return } + } +} - var req *sentry.PeersReply - for req, err = stream.Recv(); ; req, err = stream.Recv() { - if err != nil { - select { - case <-f.ctx.Done(): - return - default: - } - if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { - return - } - if errors.Is(err, io.EOF) { - return - } - log.Warn("stream.Recv", "err", err) - return - } - if req == nil { - return - } - if err = f.handleNewPeer(req); err != nil { - log.Warn("Handling new peer", "err", err) - } - if f.wg != nil { - f.wg.Done() - } +func (f *Fetch) receivePeer(sentryClient sentry.SentryClient) error { + streamCtx, cancel := context.WithCancel(f.ctx) + defer cancel() + + stream, err := sentryClient.Peers(streamCtx, &sentry.PeersRequest{}) + if err != nil { + select { + case <-f.ctx.Done(): + return f.ctx.Err() + default: + } + return err + } + + var req *sentry.PeersReply + for req, err = stream.Recv(); ; req, err = stream.Recv() { + if err != nil { + return err + } + if req == nil { + return nil + } + if err = f.handleNewPeer(req); err != nil { + return err + } + if f.wg != nil { + f.wg.Done() } } } @@ -381,10 +381,9 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) return default: } - if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { - return - } - if errors.Is(err, io.EOF) { + s, ok := status.FromError(err) + terminated := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) + if terminated { return } time.Sleep(time.Second) @@ -397,10 +396,9 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) return default: } - if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { - return - } - if errors.Is(err, io.EOF) { + s, ok := status.FromError(err) + terminated := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) + if terminated { return } log.Warn("stream.Recv", "err", err)