fixing SSE payloads (#12154)

* fixing typo on casing

* WIP get missed slot

* missed in commit

* reverting changes onfeedback

* reverting bazel

* using the current or head slots for events

* fixing linting

* use emitSlot

* fixing time and preRando change

* updating based on feedback

* fixing linting

* clarifying variable

* removing useless if statement

* fixing function to use the current slot+1

* updating based on feedback

* fixing unit tests

* missed dependency injection

* fixing linting
This commit is contained in:
james-prysm 2023-03-17 14:25:36 -05:00 committed by GitHub
parent d17996f8b0
commit 67595d576c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 45 additions and 11 deletions

View File

@ -671,9 +671,13 @@ func (s *Service) fillMissingPayloadIDRoutine(ctx context.Context, stateFeed *ev
for {
select {
case <-ticker.C():
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.MissedSlot,
})
if err := s.fillMissingBlockPayloadId(ctx); err != nil {
log.WithError(err).Error("Could not fill missing payload ID")
}
case <-ctx.Done():
log.Debug("Context closed, exiting routine")
return

View File

@ -26,6 +26,8 @@ const (
FinalizedCheckpoint
// NewHead of the chain event.
NewHead
// MissedSlot is sent when we need to notify users that a slot was missed.
MissedSlot
)
// BlockProcessedData is the data sent with BlockProcessed events.

View File

@ -1166,8 +1166,8 @@ type EventPayloadAttributeStreamV1Json struct {
}
type EventPayloadAttributeStreamV2Json struct {
Version string `json:"version"`
Data *EventPayloadAttributeV2Json
Version string `json:"version"`
Data *EventPayloadAttributeV2Json `json:"data"`
}
type EventPayloadAttributeV1Json struct {

View File

@ -16,6 +16,7 @@ go_library(
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/eth/service:go_default_library",
"//proto/eth/v1:go_default_library",

View File

@ -11,6 +11,7 @@ import (
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
enginev1 "github.com/prysmaticlabs/prysm/v4/proto/engine/v1"
ethpbservice "github.com/prysmaticlabs/prysm/v4/proto/eth/service"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/eth/v1"
@ -225,6 +226,14 @@ func (s *Server) handleStateEvents(
return nil
}
return nil
case statefeed.MissedSlot:
if _, ok := requestedTopics[PayloadAttributesTopic]; ok {
if err := s.streamPayloadAttributes(stream); err != nil {
log.WithError(err).Error("Unable to obtain stream payload attributes")
}
return nil
}
return nil
case statefeed.FinalizedCheckpoint:
if _, ok := requestedTopics[FinalizedCheckpointTopic]; !ok {
return nil
@ -250,8 +259,14 @@ func (s *Server) handleStateEvents(
// streamPayloadAttributes on new head event.
// This event stream is intended to be used by builders and relays.
// parent_ fields are based on state at N_{current_slot}, while the rest of fields are based on state of N_{current_slot + 1}
func (s *Server) streamPayloadAttributes(stream ethpbservice.Events_StreamEventsServer) error {
headState, err := s.HeadFetcher.HeadStateReadOnly(s.Ctx)
st, err := s.HeadFetcher.HeadState(s.Ctx)
if err != nil {
return err
}
// advance the headstate
headState, err := transition.ProcessSlotsIfPossible(s.Ctx, st, s.ChainInfoFetcher.CurrentSlot()+1)
if err != nil {
return err
}
@ -281,12 +296,17 @@ func (s *Server) streamPayloadAttributes(stream ethpbservice.Events_StreamEvents
return err
}
proposerIndex, err := helpers.BeaconProposerIndex(s.Ctx, headState)
if err != nil {
return err
}
switch headState.Version() {
case version.Bellatrix:
return streamData(stream, PayloadAttributesTopic, &ethpb.EventPayloadAttributeV1{
Version: version.String(headState.Version()),
Data: &ethpb.EventPayloadAttributeV1_BasePayloadAttribute{
ProposerIndex: headBlock.Block().ProposerIndex(),
ProposerIndex: proposerIndex,
ProposalSlot: headState.Slot(),
ParentBlockNumber: headPayload.BlockNumber(),
ParentBlockRoot: headRoot,
@ -306,7 +326,7 @@ func (s *Server) streamPayloadAttributes(stream ethpbservice.Events_StreamEvents
return streamData(stream, PayloadAttributesTopic, &ethpb.EventPayloadAttributeV2{
Version: version.String(headState.Version()),
Data: &ethpb.EventPayloadAttributeV2_BasePayloadAttribute{
ProposerIndex: headBlock.Block().ProposerIndex(),
ProposerIndex: proposerIndex,
ProposalSlot: headState.Slot(),
ParentBlockNumber: headPayload.BlockNumber(),
ParentBlockRoot: headRoot,

View File

@ -337,7 +337,7 @@ func TestStreamEvents_StateEvents(t *testing.T) {
var scBits [fieldparams.SyncAggregateSyncCommitteeBytesLength]byte
blk := &eth.SignedBeaconBlockBellatrix{
Block: &eth.BeaconBlockBellatrix{
ProposerIndex: 1,
ProposerIndex: 0,
Slot: 1,
ParentRoot: parentRoot[:],
StateRoot: genesis.Block.StateRoot,
@ -365,13 +365,15 @@ func TestStreamEvents_StateEvents(t *testing.T) {
require.NoError(t, err)
srv, ctrl, mockStream := setupServer(ctx, t)
defer ctrl.Finish()
srv.HeadFetcher = &mockChain.ChainService{
fetcher := &mockChain.ChainService{
Genesis: time.Now(),
State: beaconState,
Block: signedBlk,
Root: make([]byte, 32),
ValidatorsRoot: [32]byte{},
}
srv.HeadFetcher = fetcher
srv.ChainInfoFetcher = fetcher
prevRando, err := helpers.RandaoMix(beaconState, prysmtime.CurrentEpoch(beaconState))
require.NoError(t, err)
@ -379,7 +381,7 @@ func TestStreamEvents_StateEvents(t *testing.T) {
wantedPayload := &ethpb.EventPayloadAttributeV1{
Version: version.String(version.Bellatrix),
Data: &ethpb.EventPayloadAttributeV1_BasePayloadAttribute{
ProposerIndex: 1,
ProposerIndex: 0,
ProposalSlot: 2,
ParentBlockNumber: 1,
ParentBlockRoot: make([]byte, 32),
@ -441,7 +443,7 @@ func TestStreamEvents_StateEvents(t *testing.T) {
var scBits [fieldparams.SyncAggregateSyncCommitteeBytesLength]byte
blk := &eth.SignedBeaconBlockCapella{
Block: &eth.BeaconBlockCapella{
ProposerIndex: 1,
ProposerIndex: 0,
Slot: 1,
ParentRoot: parentRoot[:],
StateRoot: genesis.Block.StateRoot,
@ -470,7 +472,7 @@ func TestStreamEvents_StateEvents(t *testing.T) {
require.NoError(t, err)
srv, ctrl, mockStream := setupServer(ctx, t)
defer ctrl.Finish()
srv.HeadFetcher = &mockChain.ChainService{
fetcher := &mockChain.ChainService{
Genesis: time.Now(),
State: beaconState,
Block: signedBlk,
@ -478,13 +480,16 @@ func TestStreamEvents_StateEvents(t *testing.T) {
ValidatorsRoot: [32]byte{},
}
srv.HeadFetcher = fetcher
srv.ChainInfoFetcher = fetcher
prevRando, err := helpers.RandaoMix(beaconState, prysmtime.CurrentEpoch(beaconState))
require.NoError(t, err)
wantedPayload := &ethpb.EventPayloadAttributeV2{
Version: version.String(version.Capella),
Data: &ethpb.EventPayloadAttributeV2_BasePayloadAttribute{
ProposerIndex: 1,
ProposerIndex: 0,
ProposalSlot: 2,
ParentBlockNumber: 1,
ParentBlockRoot: make([]byte, 32),

View File

@ -20,4 +20,5 @@ type Server struct {
BlockNotifier blockfeed.Notifier
OperationNotifier opfeed.Notifier
HeadFetcher blockchain.HeadFetcher
ChainInfoFetcher blockchain.ChainInfoFetcher
}

View File

@ -336,6 +336,7 @@ func (s *Service) Start() {
BlockNotifier: s.cfg.BlockNotifier,
OperationNotifier: s.cfg.OperationNotifier,
HeadFetcher: s.cfg.HeadFetcher,
ChainInfoFetcher: s.cfg.ChainInfoFetcher,
})
if s.cfg.EnableDebugRPCEndpoints {
log.Info("Enabled debug gRPC endpoints")