From 30ed59e9c8d84a37f81fca741766984bdf10d314 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Tue, 17 Dec 2019 22:07:11 -0600 Subject: [PATCH] Make Sure ChainHeadStream Remains Open (#4282) * do not return from stream * fix test * Merge branch 'master' into no-stream-return * Merge refs/heads/master into no-stream-return --- beacon-chain/rpc/beacon/blocks.go | 4 +++- beacon-chain/rpc/beacon/blocks_test.go | 13 ++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/beacon-chain/rpc/beacon/blocks.go b/beacon-chain/rpc/beacon/blocks.go index 53ba07a2a..a32dd6801 100644 --- a/beacon-chain/rpc/beacon/blocks.go +++ b/beacon-chain/rpc/beacon/blocks.go @@ -192,7 +192,9 @@ func (bs *Server) StreamChainHead(_ *ptypes.Empty, stream ethpb.BeaconChain_Stre if err != nil { return status.Errorf(codes.Internal, "Could not retrieve chain head: %v", err) } - return stream.Send(res) + if err := stream.Send(res); err != nil { + return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err) + } } case <-stateSub.Err(): return status.Error(codes.Aborted, "Subscriber closed, exiting goroutine") diff --git a/beacon-chain/rpc/beacon/blocks_test.go b/beacon-chain/rpc/beacon/blocks_test.go index a79400b86..9d0faa3f1 100644 --- a/beacon-chain/rpc/beacon/blocks_test.go +++ b/beacon-chain/rpc/beacon/blocks_test.go @@ -449,8 +449,9 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) { hRoot, _ := ssz.SigningRoot(b) chainService := &mock.ChainService{} + ctx := context.Background() server := &Server{ - Ctx: context.Background(), + Ctx: ctx, HeadFetcher: &mock.ChainService{Block: b, State: s}, BeaconDB: db, StateNotifier: chainService.StateNotifier(), @@ -459,7 +460,6 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockStream := mockRPC.NewMockBeaconChain_StreamChainHeadServer(ctrl) - mockStream.EXPECT().Context().Return(context.Background()) mockStream.EXPECT().Send( ðpb.ChainHead{ HeadSlot: b.Slot, @@ -475,12 +475,15 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) { PreviousJustifiedEpoch: 3, PreviousJustifiedBlockRoot: pjRoot[:], }, - ).Return(nil) + ).Do(func(arg0 interface{}) { + exitRoutine <- true + }) + mockStream.EXPECT().Context().Return(ctx).AnyTimes() + go func(tt *testing.T) { if err := server.StreamChainHead(&ptypes.Empty{}, mockStream); err != nil { tt.Errorf("Could not call RPC method: %v", err) } - <-exitRoutine }(t) // Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed). @@ -490,5 +493,5 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) { Data: &statefeed.BlockProcessedData{}, }) } - exitRoutine <- true + <-exitRoutine }