diff --git a/beacon-chain/rpc/beacon/blocks.go b/beacon-chain/rpc/beacon/blocks.go index 53ba07a2a..a32dd6801 100644 --- a/beacon-chain/rpc/beacon/blocks.go +++ b/beacon-chain/rpc/beacon/blocks.go @@ -192,7 +192,9 @@ func (bs *Server) StreamChainHead(_ *ptypes.Empty, stream ethpb.BeaconChain_Stre if err != nil { return status.Errorf(codes.Internal, "Could not retrieve chain head: %v", err) } - return stream.Send(res) + if err := stream.Send(res); err != nil { + return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err) + } } case <-stateSub.Err(): return status.Error(codes.Aborted, "Subscriber closed, exiting goroutine") diff --git a/beacon-chain/rpc/beacon/blocks_test.go b/beacon-chain/rpc/beacon/blocks_test.go index a79400b86..9d0faa3f1 100644 --- a/beacon-chain/rpc/beacon/blocks_test.go +++ b/beacon-chain/rpc/beacon/blocks_test.go @@ -449,8 +449,9 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) { hRoot, _ := ssz.SigningRoot(b) chainService := &mock.ChainService{} + ctx := context.Background() server := &Server{ - Ctx: context.Background(), + Ctx: ctx, HeadFetcher: &mock.ChainService{Block: b, State: s}, BeaconDB: db, StateNotifier: chainService.StateNotifier(), @@ -459,7 +460,6 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockStream := mockRPC.NewMockBeaconChain_StreamChainHeadServer(ctrl) - mockStream.EXPECT().Context().Return(context.Background()) mockStream.EXPECT().Send( ðpb.ChainHead{ HeadSlot: b.Slot, @@ -475,12 +475,15 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) { PreviousJustifiedEpoch: 3, PreviousJustifiedBlockRoot: pjRoot[:], }, - ).Return(nil) + ).Do(func(arg0 interface{}) { + exitRoutine <- true + }) + mockStream.EXPECT().Context().Return(ctx).AnyTimes() + go func(tt *testing.T) { if err := server.StreamChainHead(&ptypes.Empty{}, mockStream); err != nil { tt.Errorf("Could not call RPC method: %v", err) } - <-exitRoutine }(t) // Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed). @@ -490,5 +493,5 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) { Data: &statefeed.BlockProcessedData{}, }) } - exitRoutine <- true + <-exitRoutine }