diff --git a/beacon-chain/rpc/apimiddleware/custom_handlers.go b/beacon-chain/rpc/apimiddleware/custom_handlers.go index 90bfb29ee..3099b83ac 100644 --- a/beacon-chain/rpc/apimiddleware/custom_handlers.go +++ b/beacon-chain/rpc/apimiddleware/custom_handlers.go @@ -369,16 +369,30 @@ func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http case events.AttestationTopic: data = &AttestationJson{} - // Data received in the event does not fit the expected event stream output. + // Data received in the aggregated att event does not fit the expected event stream output. // We extract the underlying attestation from event data // and assign the attestation back to event data for further processing. - eventData := &AggregatedAttReceivedDataJson{} - if err := json.Unmarshal(msg.Data, eventData); err != nil { + aggEventData := &AggregatedAttReceivedDataJson{} + if err := json.Unmarshal(msg.Data, aggEventData); err != nil { return apimiddleware.InternalServerError(err) } - attData, err := json.Marshal(eventData.Aggregate) - if err != nil { - return apimiddleware.InternalServerError(err) + var attData []byte + var err error + // If true, then we have an unaggregated attestation + if aggEventData.Aggregate == nil { + unaggEventData := &UnaggregatedAttReceivedDataJson{} + if err := json.Unmarshal(msg.Data, unaggEventData); err != nil { + return apimiddleware.InternalServerError(err) + } + attData, err = json.Marshal(unaggEventData) + if err != nil { + return apimiddleware.InternalServerError(err) + } + } else { + attData, err = json.Marshal(aggEventData.Aggregate) + if err != nil { + return apimiddleware.InternalServerError(err) + } } msg.Data = attData case events.VoluntaryExitTopic: diff --git a/beacon-chain/rpc/apimiddleware/custom_handlers_test.go b/beacon-chain/rpc/apimiddleware/custom_handlers_test.go index c63940db5..531fdf625 100644 --- a/beacon-chain/rpc/apimiddleware/custom_handlers_test.go +++ b/beacon-chain/rpc/apimiddleware/custom_handlers_test.go @@ -270,6 +270,98 @@ func TestReceiveEvents(t *testing.T) { errJson := receiveEvents(ch, w, req) assert.Equal(t, true, errJson == nil) + + expectedEvent := `event: finalized_checkpoint +data: {"block":"0x666f6f","state":"0x666f6f","epoch":"1","execution_optimistic":false} + +` + assert.DeepEqual(t, expectedEvent, w.Body.String()) +} + +func TestReceiveEvents_AggregatedAtt(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan *sse.Event) + w := httptest.NewRecorder() + w.Body = &bytes.Buffer{} + req := httptest.NewRequest("GET", "http://foo.example", &bytes.Buffer{}) + req = req.WithContext(ctx) + + go func() { + base64Val := "Zm9v" + data := AggregatedAttReceivedDataJson{ + Aggregate: &AttestationJson{ + AggregationBits: base64Val, + Data: &AttestationDataJson{ + Slot: "1", + CommitteeIndex: "1", + BeaconBlockRoot: base64Val, + Source: nil, + Target: nil, + }, + Signature: base64Val, + }, + } + bData, err := json.Marshal(data) + require.NoError(t, err) + msg := &sse.Event{ + Data: bData, + Event: []byte(events.AttestationTopic), + } + ch <- msg + time.Sleep(time.Second) + cancel() + }() + + errJson := receiveEvents(ch, w, req) + assert.Equal(t, true, errJson == nil) + + expectedEvent := `event: attestation +data: {"aggregation_bits":"0x666f6f","data":{"slot":"1","index":"1","beacon_block_root":"0x666f6f","source":null,"target":null},"signature":"0x666f6f"} + +` + assert.DeepEqual(t, expectedEvent, w.Body.String()) +} + +func TestReceiveEvents_UnaggregatedAtt(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan *sse.Event) + w := httptest.NewRecorder() + w.Body = &bytes.Buffer{} + req := httptest.NewRequest("GET", "http://foo.example", &bytes.Buffer{}) + req = req.WithContext(ctx) + + go func() { + base64Val := "Zm9v" + data := UnaggregatedAttReceivedDataJson{ + AggregationBits: base64Val, + Data: &AttestationDataJson{ + Slot: "1", + CommitteeIndex: "1", + BeaconBlockRoot: base64Val, + Source: nil, + Target: nil, + }, + Signature: base64Val, + } + bData, err := json.Marshal(data) + require.NoError(t, err) + msg := &sse.Event{ + Data: bData, + Event: []byte(events.AttestationTopic), + } + ch <- msg + time.Sleep(time.Second) + cancel() + }() + + errJson := receiveEvents(ch, w, req) + assert.Equal(t, true, errJson == nil) + + expectedEvent := `event: attestation +data: {"aggregation_bits":"0x666f6f","data":{"slot":"1","index":"1","beacon_block_root":"0x666f6f","source":null,"target":null},"signature":"0x666f6f"} + +` + assert.DeepEqual(t, expectedEvent, w.Body.String()) } func TestReceiveEvents_EventNotSupported(t *testing.T) { diff --git a/beacon-chain/rpc/apimiddleware/structs.go b/beacon-chain/rpc/apimiddleware/structs.go index 7a7881800..05ed9fabc 100644 --- a/beacon-chain/rpc/apimiddleware/structs.go +++ b/beacon-chain/rpc/apimiddleware/structs.go @@ -879,6 +879,12 @@ type AggregatedAttReceivedDataJson struct { Aggregate *AttestationJson `json:"aggregate"` } +type UnaggregatedAttReceivedDataJson struct { + AggregationBits string `json:"aggregation_bits" hex:"true"` + Data *AttestationDataJson `json:"data"` + Signature string `json:"signature" hex:"true"` +} + type EventFinalizedCheckpointJson struct { Block string `json:"block" hex:"true"` State string `json:"state" hex:"true"`