mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-25 04:47:18 +00:00
Handle unaggregated attestation event (#11558)
This commit is contained in:
parent
df694aad71
commit
98b9c9e6c9
@ -369,16 +369,30 @@ func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http
|
||||
case events.AttestationTopic:
|
||||
data = &AttestationJson{}
|
||||
|
||||
// Data received in the event does not fit the expected event stream output.
|
||||
// Data received in the aggregated att event does not fit the expected event stream output.
|
||||
// We extract the underlying attestation from event data
|
||||
// and assign the attestation back to event data for further processing.
|
||||
eventData := &AggregatedAttReceivedDataJson{}
|
||||
if err := json.Unmarshal(msg.Data, eventData); err != nil {
|
||||
aggEventData := &AggregatedAttReceivedDataJson{}
|
||||
if err := json.Unmarshal(msg.Data, aggEventData); err != nil {
|
||||
return apimiddleware.InternalServerError(err)
|
||||
}
|
||||
attData, err := json.Marshal(eventData.Aggregate)
|
||||
if err != nil {
|
||||
return apimiddleware.InternalServerError(err)
|
||||
var attData []byte
|
||||
var err error
|
||||
// If true, then we have an unaggregated attestation
|
||||
if aggEventData.Aggregate == nil {
|
||||
unaggEventData := &UnaggregatedAttReceivedDataJson{}
|
||||
if err := json.Unmarshal(msg.Data, unaggEventData); err != nil {
|
||||
return apimiddleware.InternalServerError(err)
|
||||
}
|
||||
attData, err = json.Marshal(unaggEventData)
|
||||
if err != nil {
|
||||
return apimiddleware.InternalServerError(err)
|
||||
}
|
||||
} else {
|
||||
attData, err = json.Marshal(aggEventData.Aggregate)
|
||||
if err != nil {
|
||||
return apimiddleware.InternalServerError(err)
|
||||
}
|
||||
}
|
||||
msg.Data = attData
|
||||
case events.VoluntaryExitTopic:
|
||||
|
@ -270,6 +270,98 @@ func TestReceiveEvents(t *testing.T) {
|
||||
|
||||
errJson := receiveEvents(ch, w, req)
|
||||
assert.Equal(t, true, errJson == nil)
|
||||
|
||||
expectedEvent := `event: finalized_checkpoint
|
||||
data: {"block":"0x666f6f","state":"0x666f6f","epoch":"1","execution_optimistic":false}
|
||||
|
||||
`
|
||||
assert.DeepEqual(t, expectedEvent, w.Body.String())
|
||||
}
|
||||
|
||||
func TestReceiveEvents_AggregatedAtt(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ch := make(chan *sse.Event)
|
||||
w := httptest.NewRecorder()
|
||||
w.Body = &bytes.Buffer{}
|
||||
req := httptest.NewRequest("GET", "http://foo.example", &bytes.Buffer{})
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
go func() {
|
||||
base64Val := "Zm9v"
|
||||
data := AggregatedAttReceivedDataJson{
|
||||
Aggregate: &AttestationJson{
|
||||
AggregationBits: base64Val,
|
||||
Data: &AttestationDataJson{
|
||||
Slot: "1",
|
||||
CommitteeIndex: "1",
|
||||
BeaconBlockRoot: base64Val,
|
||||
Source: nil,
|
||||
Target: nil,
|
||||
},
|
||||
Signature: base64Val,
|
||||
},
|
||||
}
|
||||
bData, err := json.Marshal(data)
|
||||
require.NoError(t, err)
|
||||
msg := &sse.Event{
|
||||
Data: bData,
|
||||
Event: []byte(events.AttestationTopic),
|
||||
}
|
||||
ch <- msg
|
||||
time.Sleep(time.Second)
|
||||
cancel()
|
||||
}()
|
||||
|
||||
errJson := receiveEvents(ch, w, req)
|
||||
assert.Equal(t, true, errJson == nil)
|
||||
|
||||
expectedEvent := `event: attestation
|
||||
data: {"aggregation_bits":"0x666f6f","data":{"slot":"1","index":"1","beacon_block_root":"0x666f6f","source":null,"target":null},"signature":"0x666f6f"}
|
||||
|
||||
`
|
||||
assert.DeepEqual(t, expectedEvent, w.Body.String())
|
||||
}
|
||||
|
||||
func TestReceiveEvents_UnaggregatedAtt(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ch := make(chan *sse.Event)
|
||||
w := httptest.NewRecorder()
|
||||
w.Body = &bytes.Buffer{}
|
||||
req := httptest.NewRequest("GET", "http://foo.example", &bytes.Buffer{})
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
go func() {
|
||||
base64Val := "Zm9v"
|
||||
data := UnaggregatedAttReceivedDataJson{
|
||||
AggregationBits: base64Val,
|
||||
Data: &AttestationDataJson{
|
||||
Slot: "1",
|
||||
CommitteeIndex: "1",
|
||||
BeaconBlockRoot: base64Val,
|
||||
Source: nil,
|
||||
Target: nil,
|
||||
},
|
||||
Signature: base64Val,
|
||||
}
|
||||
bData, err := json.Marshal(data)
|
||||
require.NoError(t, err)
|
||||
msg := &sse.Event{
|
||||
Data: bData,
|
||||
Event: []byte(events.AttestationTopic),
|
||||
}
|
||||
ch <- msg
|
||||
time.Sleep(time.Second)
|
||||
cancel()
|
||||
}()
|
||||
|
||||
errJson := receiveEvents(ch, w, req)
|
||||
assert.Equal(t, true, errJson == nil)
|
||||
|
||||
expectedEvent := `event: attestation
|
||||
data: {"aggregation_bits":"0x666f6f","data":{"slot":"1","index":"1","beacon_block_root":"0x666f6f","source":null,"target":null},"signature":"0x666f6f"}
|
||||
|
||||
`
|
||||
assert.DeepEqual(t, expectedEvent, w.Body.String())
|
||||
}
|
||||
|
||||
func TestReceiveEvents_EventNotSupported(t *testing.T) {
|
||||
|
@ -879,6 +879,12 @@ type AggregatedAttReceivedDataJson struct {
|
||||
Aggregate *AttestationJson `json:"aggregate"`
|
||||
}
|
||||
|
||||
type UnaggregatedAttReceivedDataJson struct {
|
||||
AggregationBits string `json:"aggregation_bits" hex:"true"`
|
||||
Data *AttestationDataJson `json:"data"`
|
||||
Signature string `json:"signature" hex:"true"`
|
||||
}
|
||||
|
||||
type EventFinalizedCheckpointJson struct {
|
||||
Block string `json:"block" hex:"true"`
|
||||
State string `json:"state" hex:"true"`
|
||||
|
Loading…
Reference in New Issue
Block a user