From 446bfca4f359a86f7588509ec8a74b55869fc7da Mon Sep 17 00:00:00 2001 From: Shay Zluf Date: Thu, 6 Aug 2020 00:17:00 +0300 Subject: [PATCH] Slasher redial (#6889) * Restart streams on internal error * debug instead of fatal on retry * Merge branch 'master' of github.com:prysmaticlabs/prysm into slasher_redial * goimports * conn status fix * Merge branch 'master' into slasher_redial --- slasher/beaconclient/BUILD.bazel | 1 + slasher/beaconclient/receivers.go | 26 ++++++++++++++++++++++++-- slasher/beaconclient/service.go | 2 ++ 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/slasher/beaconclient/BUILD.bazel b/slasher/beaconclient/BUILD.bazel index 7ea701dca..7ce8ce23a 100644 --- a/slasher/beaconclient/BUILD.bazel +++ b/slasher/beaconclient/BUILD.bazel @@ -37,6 +37,7 @@ go_library( "@io_opencensus_go//trace:go_default_library", "@org_golang_google_grpc//:go_default_library", "@org_golang_google_grpc//codes:go_default_library", + "@org_golang_google_grpc//connectivity:go_default_library", "@org_golang_google_grpc//credentials:go_default_library", "@org_golang_google_grpc//status:go_default_library", ], diff --git a/slasher/beaconclient/receivers.go b/slasher/beaconclient/receivers.go index 4d03f6a9c..f2912ecc2 100644 --- a/slasher/beaconclient/receivers.go +++ b/slasher/beaconclient/receivers.go @@ -13,7 +13,9 @@ import ( "github.com/prysmaticlabs/prysm/shared/slotutil" "github.com/sirupsen/logrus" "go.opencensus.io/trace" + "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/status" ) @@ -46,7 +48,7 @@ func (bs *Service) ReceiveBlocks(ctx context.Context) { if err != nil { if e, ok := status.FromError(err); ok { switch e.Code() { - case codes.Canceled: + case codes.Canceled, codes.Internal: stream, err = bs.restartBlockStream(ctx) if err != nil { log.WithError(err).Error("Could not restart stream") @@ -109,7 +111,7 @@ func (bs *Service) ReceiveAttestations(ctx context.Context) { if err != nil { if e, ok := status.FromError(err); ok { switch e.Code() { - case codes.Canceled: + case codes.Canceled, codes.Internal: stream, err = bs.restartIndexedAttestationStream(ctx) if err != nil { log.WithError(err).Error("Could not restart stream") @@ -179,6 +181,16 @@ func (bs *Service) restartIndexedAttestationStream(ctx context.Context) (ethpb.B select { case <-ticker.C: log.Info("Context closed, attempting to restart attestation stream") + conn, err := grpc.DialContext(bs.ctx, bs.provider, bs.beaconDialOptions...) + if err != nil { + log.Debug("Failed to dial beacon node") + continue + } + log.Debugf("connection status %v", conn.GetState()) + if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Idle { + log.Debug("Beacon node is still down") + continue + } stream, err := bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{}) if err != nil { continue @@ -199,6 +211,16 @@ func (bs *Service) restartBlockStream(ctx context.Context) (ethpb.BeaconChain_St select { case <-ticker.C: log.Info("Context closed, attempting to restart block stream") + conn, err := grpc.DialContext(bs.ctx, bs.provider, bs.beaconDialOptions...) + if err != nil { + log.Debug("Failed to dial beacon node") + continue + } + log.Debugf("connection status %v", conn.GetState()) + if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Idle { + log.Debug("Beacon node is still down") + continue + } stream, err := bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{}) if err != nil { continue diff --git a/slasher/beaconclient/service.go b/slasher/beaconclient/service.go index c4eb55cf0..4e8cc8f01 100644 --- a/slasher/beaconclient/service.go +++ b/slasher/beaconclient/service.go @@ -59,6 +59,7 @@ type Service struct { collectedAttestationsBuffer chan []*ethpb.IndexedAttestation publicKeyCache *cache.PublicKeyCache genesisValidatorRoot []byte + beaconDialOptions []grpc.DialOption } // Config options for the beaconclient service. @@ -173,6 +174,7 @@ func (bs *Service) Start() { if err != nil { log.Fatalf("Could not dial endpoint: %s, %v", bs.provider, err) } + bs.beaconDialOptions = beaconOpts log.Info("Successfully started gRPC connection") bs.conn = conn bs.beaconClient = ethpb.NewBeaconChainClient(bs.conn)