Slasher redial (#6889)

* Restart streams on internal error
* debug instead of fatal on retry
* Merge branch 'master' of github.com:prysmaticlabs/prysm into slasher_redial
* goimports
* conn status fix
* Merge branch 'master' into slasher_redial
This commit is contained in:
Shay Zluf 2020-08-06 00:17:00 +03:00 committed by GitHub
parent 492944db01
commit 446bfca4f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 2 deletions

View File

@ -37,6 +37,7 @@ go_library(
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//connectivity:go_default_library",
"@org_golang_google_grpc//credentials:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
],

View File

@ -13,7 +13,9 @@ import (
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"
)
@ -46,7 +48,7 @@ func (bs *Service) ReceiveBlocks(ctx context.Context) {
if err != nil {
if e, ok := status.FromError(err); ok {
switch e.Code() {
case codes.Canceled:
case codes.Canceled, codes.Internal:
stream, err = bs.restartBlockStream(ctx)
if err != nil {
log.WithError(err).Error("Could not restart stream")
@ -109,7 +111,7 @@ func (bs *Service) ReceiveAttestations(ctx context.Context) {
if err != nil {
if e, ok := status.FromError(err); ok {
switch e.Code() {
case codes.Canceled:
case codes.Canceled, codes.Internal:
stream, err = bs.restartIndexedAttestationStream(ctx)
if err != nil {
log.WithError(err).Error("Could not restart stream")
@ -179,6 +181,16 @@ func (bs *Service) restartIndexedAttestationStream(ctx context.Context) (ethpb.B
select {
case <-ticker.C:
log.Info("Context closed, attempting to restart attestation stream")
conn, err := grpc.DialContext(bs.ctx, bs.provider, bs.beaconDialOptions...)
if err != nil {
log.Debug("Failed to dial beacon node")
continue
}
log.Debugf("connection status %v", conn.GetState())
if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Idle {
log.Debug("Beacon node is still down")
continue
}
stream, err := bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
if err != nil {
continue
@ -199,6 +211,16 @@ func (bs *Service) restartBlockStream(ctx context.Context) (ethpb.BeaconChain_St
select {
case <-ticker.C:
log.Info("Context closed, attempting to restart block stream")
conn, err := grpc.DialContext(bs.ctx, bs.provider, bs.beaconDialOptions...)
if err != nil {
log.Debug("Failed to dial beacon node")
continue
}
log.Debugf("connection status %v", conn.GetState())
if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Idle {
log.Debug("Beacon node is still down")
continue
}
stream, err := bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{})
if err != nil {
continue

View File

@ -59,6 +59,7 @@ type Service struct {
collectedAttestationsBuffer chan []*ethpb.IndexedAttestation
publicKeyCache *cache.PublicKeyCache
genesisValidatorRoot []byte
beaconDialOptions []grpc.DialOption
}
// Config options for the beaconclient service.
@ -173,6 +174,7 @@ func (bs *Service) Start() {
if err != nil {
log.Fatalf("Could not dial endpoint: %s, %v", bs.provider, err)
}
bs.beaconDialOptions = beaconOpts
log.Info("Successfully started gRPC connection")
bs.conn = conn
bs.beaconClient = ethpb.NewBeaconChainClient(bs.conn)