Allow comma-separated event topics (#10052)

This commit is contained in:
Radosław Kapka 2022-01-07 18:17:29 +01:00 committed by GitHub
parent 8ee3019954
commit 3bce9df382
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 4 deletions

View File

@ -1,6 +1,8 @@
package events
import (
"strings"
gwpb "github.com/grpc-ecosystem/grpc-gateway/v2/proto/gateway"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
@ -54,11 +56,14 @@ func (s *Server) StreamEvents(
}
// Check if the topics in the request are valid.
requestedTopics := make(map[string]bool)
for _, topic := range req.Topics {
if _, ok := casesHandled[topic]; !ok {
return status.Errorf(codes.InvalidArgument, "Topic %s not allowed for event subscriptions", topic)
for _, rawTopic := range req.Topics {
splitTopic := strings.Split(rawTopic, ",")
for _, topic := range splitTopic {
if _, ok := casesHandled[topic]; !ok {
return status.Errorf(codes.InvalidArgument, "Topic %s not allowed for event subscriptions", topic)
}
requestedTopics[topic] = true
}
requestedTopics[topic] = true
}
// Subscribe to event feeds from information received in the beacon node runtime.

View File

@ -331,6 +331,65 @@ func TestStreamEvents_StateEvents(t *testing.T) {
})
}
func TestStreamEvents_CommaSeparatedTopics(t *testing.T) {
ctx := context.Background()
srv, ctrl, mockStream := setupServer(ctx, t)
defer ctrl.Finish()
wantedHead := &ethpb.EventHead{
Slot: 8,
Block: make([]byte, 32),
State: make([]byte, 32),
EpochTransition: true,
PreviousDutyDependentRoot: make([]byte, 32),
CurrentDutyDependentRoot: make([]byte, 32),
}
headGenericResponse, err := anypb.New(wantedHead)
require.NoError(t, err)
wantedHeadMessage := &gateway.EventSource{
Event: HeadTopic,
Data: headGenericResponse,
}
assertFeedSendAndReceive(ctx, &assertFeedArgs{
t: t,
srv: srv,
topics: []string{HeadTopic + "," + FinalizedCheckpointTopic},
stream: mockStream,
shouldReceive: wantedHeadMessage,
itemToSend: &feed.Event{
Type: statefeed.NewHead,
Data: wantedHead,
},
feed: srv.StateNotifier.StateFeed(),
})
wantedCheckpoint := &ethpb.EventFinalizedCheckpoint{
Block: make([]byte, 32),
State: make([]byte, 32),
Epoch: 8,
}
checkpointGenericResponse, err := anypb.New(wantedCheckpoint)
require.NoError(t, err)
wantedCheckpointMessage := &gateway.EventSource{
Event: FinalizedCheckpointTopic,
Data: checkpointGenericResponse,
}
assertFeedSendAndReceive(ctx, &assertFeedArgs{
t: t,
srv: srv,
topics: []string{HeadTopic + "," + FinalizedCheckpointTopic},
stream: mockStream,
shouldReceive: wantedCheckpointMessage,
itemToSend: &feed.Event{
Type: statefeed.FinalizedCheckpoint,
Data: wantedCheckpoint,
},
feed: srv.StateNotifier.StateFeed(),
})
}
func setupServer(ctx context.Context, t testing.TB) (*Server, *gomock.Controller, *mock.MockEvents_StreamEventsServer) {
srv := &Server{
BlockNotifier: &mockChain.MockBlockNotifier{},