correct grpc retry

This commit is contained in:
alex.sharov 2021-08-14 11:33:13 +07:00
parent 7fb2fa7edd
commit 72d47680cb

View File

@ -122,17 +122,34 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) {
} }
_, err := sentryClient.SetStatus(f.ctx, f.statusData, grpc.WaitForReady(true)) _, err := sentryClient.SetStatus(f.ctx, f.statusData, grpc.WaitForReady(true))
if err != nil { if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { s, ok := status.FromError(err)
return retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)
} if retryLater {
// Report error and wait more
log.Warn("sentry not ready yet", "err", err)
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
} }
streamCtx, cancel := context.WithCancel(f.ctx) // Report error and wait more
defer cancel() log.Warn("[txpool.recvMessage] sentry not ready yet", "err", err)
continue
}
if err := f.receiveMessage(f.ctx, sentryClient); err != nil {
s, ok := status.FromError(err)
retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)
if retryLater {
time.Sleep(time.Second)
continue
}
log.Warn("[txpool.recvMessage]", "err", err)
continue
}
}
}
func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryClient) error {
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := sentryClient.Messages(streamCtx, &sentry.MessagesRequest{Ids: []sentry.MessageId{ stream, err := sentryClient.Messages(streamCtx, &sentry.MessagesRequest{Ids: []sentry.MessageId{
sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65,
sentry.MessageId_GET_POOLED_TRANSACTIONS_65, sentry.MessageId_GET_POOLED_TRANSACTIONS_65,
@ -146,17 +163,10 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) {
if err != nil { if err != nil {
select { select {
case <-f.ctx.Done(): case <-f.ctx.Done():
return return ctx.Err()
default: default:
} }
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { return err
return
}
if errors.Is(err, io.EOF) {
return
}
log.Warn("messages", "err", err)
return
} }
var req *sentry.InboundMessage var req *sentry.InboundMessage
@ -164,20 +174,13 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) {
if err != nil { if err != nil {
select { select {
case <-f.ctx.Done(): case <-f.ctx.Done():
return return ctx.Err()
default: default:
} }
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { return err
return
}
if errors.Is(err, io.EOF) {
return
}
log.Warn("stream.Recv", "err", err)
return
} }
if req == nil { if req == nil {
return return nil
} }
if err = f.handleInboundMessage(streamCtx, req, sentryClient); err != nil { if err = f.handleInboundMessage(streamCtx, req, sentryClient); err != nil {
log.Warn("Handling incoming message: %s", "err", err) log.Warn("Handling incoming message: %s", "err", err)
@ -187,7 +190,6 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) {
} }
} }
} }
}
func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMessage, sentryClient sentry.SentryClient) error { func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMessage, sentryClient sentry.SentryClient) error {
switch req.Id { switch req.Id {
@ -301,14 +303,32 @@ func (f *Fetch) receivePeerLoop(sentryClient sentry.SentryClient) {
} }
_, err := sentryClient.SetStatus(f.ctx, f.statusData, grpc.WaitForReady(true)) _, err := sentryClient.SetStatus(f.ctx, f.statusData, grpc.WaitForReady(true))
if err != nil { if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { s, ok := status.FromError(err)
return retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)
} if retryLater {
// Report error and wait more
log.Warn("sentry not ready yet", "err", err)
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
} }
// Report error and wait more
log.Warn("[txpool.recvPeers] sentry not ready yet", "err", err)
time.Sleep(time.Second)
continue
}
if err := f.receivePeer(sentryClient); err != nil {
s, ok := status.FromError(err)
retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)
if retryLater {
time.Sleep(time.Second)
continue
}
log.Warn("[txpool.recvPeers]", "err", err)
return
}
}
}
func (f *Fetch) receivePeer(sentryClient sentry.SentryClient) error {
streamCtx, cancel := context.WithCancel(f.ctx) streamCtx, cancel := context.WithCancel(f.ctx)
defer cancel() defer cancel()
@ -316,48 +336,28 @@ func (f *Fetch) receivePeerLoop(sentryClient sentry.SentryClient) {
if err != nil { if err != nil {
select { select {
case <-f.ctx.Done(): case <-f.ctx.Done():
return return f.ctx.Err()
default: default:
} }
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { return err
return
}
if errors.Is(err, io.EOF) {
return
}
log.Warn("peers", "err", err)
return
} }
var req *sentry.PeersReply var req *sentry.PeersReply
for req, err = stream.Recv(); ; req, err = stream.Recv() { for req, err = stream.Recv(); ; req, err = stream.Recv() {
if err != nil { if err != nil {
select { return err
case <-f.ctx.Done():
return
default:
}
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
return
}
if errors.Is(err, io.EOF) {
return
}
log.Warn("stream.Recv", "err", err)
return
} }
if req == nil { if req == nil {
return return nil
} }
if err = f.handleNewPeer(req); err != nil { if err = f.handleNewPeer(req); err != nil {
log.Warn("Handling new peer", "err", err) return err
} }
if f.wg != nil { if f.wg != nil {
f.wg.Done() f.wg.Done()
} }
} }
} }
}
func (f *Fetch) handleNewPeer(req *sentry.PeersReply) error { func (f *Fetch) handleNewPeer(req *sentry.PeersReply) error {
if req == nil { if req == nil {
@ -381,10 +381,9 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient)
return return
default: default:
} }
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { s, ok := status.FromError(err)
return terminated := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF)
} if terminated {
if errors.Is(err, io.EOF) {
return return
} }
time.Sleep(time.Second) time.Sleep(time.Second)
@ -397,10 +396,9 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient)
return return
default: default:
} }
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { s, ok := status.FromError(err)
return terminated := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF)
} if terminated {
if errors.Is(err, io.EOF) {
return return
} }
log.Warn("stream.Recv", "err", err) log.Warn("stream.Recv", "err", err)