Make Sure ChainHeadStream Remains Open (#4282)

* do not return from stream
* fix test
* Merge branch 'master' into no-stream-return
* Merge refs/heads/master into no-stream-return
This commit is contained in:
Raul Jordan 2019-12-17 22:07:11 -06:00 committed by prylabs-bulldozer[bot]
parent 2e2d5199e8
commit 30ed59e9c8
2 changed files with 11 additions and 6 deletions

View File

@ -192,7 +192,9 @@ func (bs *Server) StreamChainHead(_ *ptypes.Empty, stream ethpb.BeaconChain_Stre
if err != nil {
return status.Errorf(codes.Internal, "Could not retrieve chain head: %v", err)
}
return stream.Send(res)
if err := stream.Send(res); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
}
case <-stateSub.Err():
return status.Error(codes.Aborted, "Subscriber closed, exiting goroutine")

View File

@ -449,8 +449,9 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) {
hRoot, _ := ssz.SigningRoot(b)
chainService := &mock.ChainService{}
ctx := context.Background()
server := &Server{
Ctx: context.Background(),
Ctx: ctx,
HeadFetcher: &mock.ChainService{Block: b, State: s},
BeaconDB: db,
StateNotifier: chainService.StateNotifier(),
@ -459,7 +460,6 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mockRPC.NewMockBeaconChain_StreamChainHeadServer(ctrl)
mockStream.EXPECT().Context().Return(context.Background())
mockStream.EXPECT().Send(
&ethpb.ChainHead{
HeadSlot: b.Slot,
@ -475,12 +475,15 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) {
PreviousJustifiedEpoch: 3,
PreviousJustifiedBlockRoot: pjRoot[:],
},
).Return(nil)
).Do(func(arg0 interface{}) {
exitRoutine <- true
})
mockStream.EXPECT().Context().Return(ctx).AnyTimes()
go func(tt *testing.T) {
if err := server.StreamChainHead(&ptypes.Empty{}, mockStream); err != nil {
tt.Errorf("Could not call RPC method: %v", err)
}
<-exitRoutine
}(t)
// Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed).
@ -490,5 +493,5 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) {
Data: &statefeed.BlockProcessedData{},
})
}
exitRoutine <- true
<-exitRoutine
}