Merge pull request #30 from ledgerwatch/pool16

Correct grpc retry
This commit is contained in:
Alex Sharov 2021-08-14 11:47:47 +07:00 committed by GitHub
commit 00d841ba28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -122,69 +122,71 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) {
}
_, err := sentryClient.SetStatus(f.ctx, f.statusData, grpc.WaitForReady(true))
if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
return
s, ok := status.FromError(err)
retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)
if retryLater {
time.Sleep(time.Second)
continue
}
// Report error and wait more
log.Warn("sentry not ready yet", "err", err)
time.Sleep(time.Second)
log.Warn("[txpool.recvMessage] sentry not ready yet", "err", err)
continue
}
streamCtx, cancel := context.WithCancel(f.ctx)
defer cancel()
stream, err := sentryClient.Messages(streamCtx, &sentry.MessagesRequest{Ids: []sentry.MessageId{
sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65,
sentry.MessageId_GET_POOLED_TRANSACTIONS_65,
sentry.MessageId_TRANSACTIONS_65,
sentry.MessageId_POOLED_TRANSACTIONS_65,
sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66,
sentry.MessageId_GET_POOLED_TRANSACTIONS_66,
sentry.MessageId_TRANSACTIONS_66,
sentry.MessageId_POOLED_TRANSACTIONS_66,
}}, grpc.WaitForReady(true))
if err := f.receiveMessage(f.ctx, sentryClient); err != nil {
s, ok := status.FromError(err)
retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)
if retryLater {
time.Sleep(time.Second)
continue
}
log.Warn("[txpool.recvMessage]", "err", err)
continue
}
}
}
func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryClient) error {
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := sentryClient.Messages(streamCtx, &sentry.MessagesRequest{Ids: []sentry.MessageId{
sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65,
sentry.MessageId_GET_POOLED_TRANSACTIONS_65,
sentry.MessageId_TRANSACTIONS_65,
sentry.MessageId_POOLED_TRANSACTIONS_65,
sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66,
sentry.MessageId_GET_POOLED_TRANSACTIONS_66,
sentry.MessageId_TRANSACTIONS_66,
sentry.MessageId_POOLED_TRANSACTIONS_66,
}}, grpc.WaitForReady(true))
if err != nil {
select {
case <-f.ctx.Done():
return ctx.Err()
default:
}
return err
}
var req *sentry.InboundMessage
for req, err = stream.Recv(); ; req, err = stream.Recv() {
if err != nil {
select {
case <-f.ctx.Done():
return
return ctx.Err()
default:
}
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
return
}
if errors.Is(err, io.EOF) {
return
}
log.Warn("messages", "err", err)
return
return err
}
var req *sentry.InboundMessage
for req, err = stream.Recv(); ; req, err = stream.Recv() {
if err != nil {
select {
case <-f.ctx.Done():
return
default:
}
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
return
}
if errors.Is(err, io.EOF) {
return
}
log.Warn("stream.Recv", "err", err)
return
}
if req == nil {
return
}
if err = f.handleInboundMessage(streamCtx, req, sentryClient); err != nil {
log.Warn("Handling incoming message: %s", "err", err)
}
if f.wg != nil {
f.wg.Done()
}
if req == nil {
return nil
}
if err = f.handleInboundMessage(streamCtx, req, sentryClient); err != nil {
log.Warn("Handling incoming message: %s", "err", err)
}
if f.wg != nil {
f.wg.Done()
}
}
}
@ -301,60 +303,58 @@ func (f *Fetch) receivePeerLoop(sentryClient sentry.SentryClient) {
}
_, err := sentryClient.SetStatus(f.ctx, f.statusData, grpc.WaitForReady(true))
if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
return
s, ok := status.FromError(err)
retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)
if retryLater {
time.Sleep(time.Second)
continue
}
// Report error and wait more
log.Warn("sentry not ready yet", "err", err)
log.Warn("[txpool.recvPeers] sentry not ready yet", "err", err)
time.Sleep(time.Second)
continue
}
streamCtx, cancel := context.WithCancel(f.ctx)
defer cancel()
if err := f.receivePeer(sentryClient); err != nil {
s, ok := status.FromError(err)
retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)
if retryLater {
time.Sleep(time.Second)
continue
}
stream, err := sentryClient.Peers(streamCtx, &sentry.PeersRequest{})
if err != nil {
select {
case <-f.ctx.Done():
return
default:
}
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
return
}
if errors.Is(err, io.EOF) {
return
}
log.Warn("peers", "err", err)
log.Warn("[txpool.recvPeers]", "err", err)
return
}
}
}
var req *sentry.PeersReply
for req, err = stream.Recv(); ; req, err = stream.Recv() {
if err != nil {
select {
case <-f.ctx.Done():
return
default:
}
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
return
}
if errors.Is(err, io.EOF) {
return
}
log.Warn("stream.Recv", "err", err)
return
}
if req == nil {
return
}
if err = f.handleNewPeer(req); err != nil {
log.Warn("Handling new peer", "err", err)
}
if f.wg != nil {
f.wg.Done()
}
func (f *Fetch) receivePeer(sentryClient sentry.SentryClient) error {
streamCtx, cancel := context.WithCancel(f.ctx)
defer cancel()
stream, err := sentryClient.Peers(streamCtx, &sentry.PeersRequest{})
if err != nil {
select {
case <-f.ctx.Done():
return f.ctx.Err()
default:
}
return err
}
var req *sentry.PeersReply
for req, err = stream.Recv(); ; req, err = stream.Recv() {
if err != nil {
return err
}
if req == nil {
return nil
}
if err = f.handleNewPeer(req); err != nil {
return err
}
if f.wg != nil {
f.wg.Done()
}
}
}
@ -381,10 +381,9 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient)
return
default:
}
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
return
}
if errors.Is(err, io.EOF) {
s, ok := status.FromError(err)
terminated := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF)
if terminated {
return
}
time.Sleep(time.Second)
@ -397,10 +396,9 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient)
return
default:
}
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
return
}
if errors.Is(err, io.EOF) {
s, ok := status.FromError(err)
terminated := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF)
if terminated {
return
}
log.Warn("stream.Recv", "err", err)