Use BlockProcessed event in Beacon API (#12625)

* Use `BlockProcessed` event in Beacon API

* log error

---------

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Radosław Kapka 2023-07-20 19:26:34 +02:00 committed by GitHub
parent 2217b45e16
commit 43378ae8d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 84 additions and 96 deletions

View File

@ -107,6 +107,12 @@ func (s *Service) postBlockProcess(ctx context.Context, signed interfaces.ReadOn
return err
}
optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(blockRoot)
if err != nil {
log.WithError(err).Error("Could not check if block is optimistic")
optimistic = true
}
// Send notification of the processed block to the state feed.
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.BlockProcessed,
@ -115,6 +121,7 @@ func (s *Service) postBlockProcess(ctx context.Context, signed interfaces.ReadOn
BlockRoot: blockRoot,
SignedBlock: signed,
Verified: true,
Optimistic: optimistic,
},
})

View File

@ -168,6 +168,11 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []interfaces.Rea
if err != nil {
return err
}
optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(blkRoots[i])
if err != nil {
log.WithError(err).Error("Could not check if block is optimistic")
optimistic = true
}
// Send notification of the processed block to the state feed.
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.BlockProcessed,
@ -176,6 +181,7 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []interfaces.Rea
BlockRoot: blkRoots[i],
SignedBlock: blockCopy,
Verified: true,
Optimistic: optimistic,
},
})

View File

@ -39,6 +39,8 @@ type BlockProcessedData struct {
SignedBlock interfaces.ReadOnlySignedBeaconBlock
// Verified is true if the block's BLS contents have been verified.
Verified bool
// Optimistic is true if the block is optimistic.
Optimistic bool
}
// ChainStartedData is the data sent with ChainStarted events.

View File

@ -11,7 +11,6 @@ go_library(
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
@ -42,7 +41,6 @@ go_test(
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",

View File

@ -6,7 +6,6 @@ import (
gwpb "github.com/grpc-ecosystem/grpc-gateway/v2/proto/gateway"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
blockfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/block"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
@ -80,26 +79,18 @@ func (s *Server) StreamEvents(
}
// Subscribe to event feeds from information received in the beacon node runtime.
blockChan := make(chan *feed.Event, 1)
blockSub := s.BlockNotifier.BlockFeed().Subscribe(blockChan)
opsChan := make(chan *feed.Event, 1)
opsSub := s.OperationNotifier.OperationFeed().Subscribe(opsChan)
stateChan := make(chan *feed.Event, 1)
stateSub := s.StateNotifier.StateFeed().Subscribe(stateChan)
defer blockSub.Unsubscribe()
defer opsSub.Unsubscribe()
defer stateSub.Unsubscribe()
// Handle each event received and context cancelation.
for {
select {
case event := <-blockChan:
if err := handleBlockEvents(stream, requestedTopics, event); err != nil {
return status.Errorf(codes.Internal, "Could not handle block event: %v", err)
}
case event := <-opsChan:
if err := handleBlockOperationEvents(stream, requestedTopics, event); err != nil {
return status.Errorf(codes.Internal, "Could not handle block operations event: %v", err)
@ -116,37 +107,6 @@ func (s *Server) StreamEvents(
}
}
func handleBlockEvents(
stream ethpbservice.Events_StreamEventsServer, requestedTopics map[string]bool, event *feed.Event,
) error {
switch event.Type {
case blockfeed.ReceivedBlock:
if _, ok := requestedTopics[BlockTopic]; !ok {
return nil
}
blkData, ok := event.Data.(*blockfeed.ReceivedBlockData)
if !ok {
return nil
}
v1Data, err := migration.BlockIfaceToV1BlockHeader(blkData.SignedBlock)
if err != nil {
return err
}
item, err := v1Data.Message.HashTreeRoot()
if err != nil {
return errors.Wrap(err, "could not hash tree root block")
}
eventBlock := &ethpb.EventBlock{
Slot: v1Data.Message.Slot,
Block: item[:],
ExecutionOptimistic: blkData.IsOptimistic,
}
return streamData(stream, BlockTopic, eventBlock)
default:
return nil
}
}
func handleBlockOperationEvents(
stream ethpbservice.Events_StreamEventsServer, requestedTopics map[string]bool, event *feed.Event,
) error {
@ -252,6 +212,28 @@ func (s *Server) handleStateEvents(
return nil
}
return streamData(stream, ChainReorgTopic, reorg)
case statefeed.BlockProcessed:
if _, ok := requestedTopics[BlockTopic]; !ok {
return nil
}
blkData, ok := event.Data.(*statefeed.BlockProcessedData)
if !ok {
return nil
}
v1Data, err := migration.BlockIfaceToV1BlockHeader(blkData.SignedBlock)
if err != nil {
return err
}
item, err := v1Data.Message.HashTreeRoot()
if err != nil {
return errors.Wrap(err, "could not hash tree root block")
}
eventBlock := &ethpb.EventBlock{
Slot: blkData.Slot,
Block: item[:],
ExecutionOptimistic: blkData.Optimistic,
}
return streamData(stream, BlockTopic, eventBlock)
default:
return nil
}

View File

@ -13,7 +13,6 @@ import (
mockChain "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing"
b "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
blockfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/block"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
@ -51,55 +50,6 @@ func TestStreamEvents_Preconditions(t *testing.T) {
})
}
func TestStreamEvents_BlockEvents(t *testing.T) {
t.Run(BlockTopic, func(t *testing.T) {
ctx := context.Background()
srv, ctrl, mockStream := setupServer(ctx, t)
defer ctrl.Finish()
blk := util.HydrateSignedBeaconBlock(&eth.SignedBeaconBlock{
Block: &eth.BeaconBlock{
Slot: 8,
},
})
bodyRoot, err := blk.Block.Body.HashTreeRoot()
require.NoError(t, err)
wantedHeader := util.HydrateBeaconHeader(&eth.BeaconBlockHeader{
Slot: 8,
BodyRoot: bodyRoot[:],
})
wantedBlockRoot, err := wantedHeader.HashTreeRoot()
require.NoError(t, err)
genericResponse, err := anypb.New(&ethpb.EventBlock{
Slot: 8,
Block: wantedBlockRoot[:],
ExecutionOptimistic: true,
})
require.NoError(t, err)
wantedMessage := &gateway.EventSource{
Event: BlockTopic,
Data: genericResponse,
}
wsb, err := blocks.NewSignedBeaconBlock(blk)
require.NoError(t, err)
assertFeedSendAndReceive(ctx, &assertFeedArgs{
t: t,
srv: srv,
topics: []string{BlockTopic},
stream: mockStream,
shouldReceive: wantedMessage,
itemToSend: &feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{
SignedBlock: wsb,
IsOptimistic: true,
},
},
feed: srv.BlockNotifier.BlockFeed(),
})
})
}
func TestStreamEvents_OperationsEvents(t *testing.T) {
t.Run("attestation_unaggregated", func(t *testing.T) {
ctx := context.Background()
@ -588,6 +538,53 @@ func TestStreamEvents_StateEvents(t *testing.T) {
feed: srv.StateNotifier.StateFeed(),
})
})
t.Run(BlockTopic, func(t *testing.T) {
ctx := context.Background()
srv, ctrl, mockStream := setupServer(ctx, t)
defer ctrl.Finish()
blk := util.HydrateSignedBeaconBlock(&eth.SignedBeaconBlock{
Block: &eth.BeaconBlock{
Slot: 8,
},
})
bodyRoot, err := blk.Block.Body.HashTreeRoot()
require.NoError(t, err)
wantedHeader := util.HydrateBeaconHeader(&eth.BeaconBlockHeader{
Slot: 8,
BodyRoot: bodyRoot[:],
})
wantedBlockRoot, err := wantedHeader.HashTreeRoot()
require.NoError(t, err)
genericResponse, err := anypb.New(&ethpb.EventBlock{
Slot: 8,
Block: wantedBlockRoot[:],
ExecutionOptimistic: true,
})
require.NoError(t, err)
wantedMessage := &gateway.EventSource{
Event: BlockTopic,
Data: genericResponse,
}
wsb, err := blocks.NewSignedBeaconBlock(blk)
require.NoError(t, err)
assertFeedSendAndReceive(ctx, &assertFeedArgs{
t: t,
srv: srv,
topics: []string{BlockTopic},
stream: mockStream,
shouldReceive: wantedMessage,
itemToSend: &feed.Event{
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: 8,
SignedBlock: wsb,
Optimistic: true,
},
},
feed: srv.StateNotifier.StateFeed(),
})
})
}
func TestStreamEvents_CommaSeparatedTopics(t *testing.T) {
@ -651,7 +648,6 @@ func TestStreamEvents_CommaSeparatedTopics(t *testing.T) {
func setupServer(ctx context.Context, t testing.TB) (*Server, *gomock.Controller, *mock.MockEvents_StreamEventsServer) {
srv := &Server{
BlockNotifier: &mockChain.MockBlockNotifier{},
StateNotifier: &mockChain.MockStateNotifier{},
OperationNotifier: &mockChain.MockOperationNotifier{},
Ctx: ctx,

View File

@ -7,7 +7,6 @@ import (
"context"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
blockfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/block"
opfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
)
@ -17,7 +16,6 @@ import (
type Server struct {
Ctx context.Context
StateNotifier statefeed.Notifier
BlockNotifier blockfeed.Notifier
OperationNotifier opfeed.Notifier
HeadFetcher blockchain.HeadFetcher
ChainInfoFetcher blockchain.ChainInfoFetcher

View File

@ -391,7 +391,6 @@ func (s *Service) Start() {
ethpbservice.RegisterEventsServer(s.grpcServer, &events.Server{
Ctx: s.ctx,
StateNotifier: s.cfg.StateNotifier,
BlockNotifier: s.cfg.BlockNotifier,
OperationNotifier: s.cfg.OperationNotifier,
HeadFetcher: s.cfg.HeadFetcher,
ChainInfoFetcher: s.cfg.ChainInfoFetcher,