From c603d120d7e518373c52ad442f35a6214a74ebbb Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Fri, 20 May 2022 12:17:55 -0400 Subject: [PATCH] Refactor high complexity StreamBlocksAltair (#10726) * Refactor high complexity StreamBlocksAltair * catch nil data Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- .../rpc/prysm/v1alpha1/validator/blocks.go | 160 ++++++++++-------- 1 file changed, 87 insertions(+), 73 deletions(-) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/blocks.go b/beacon-chain/rpc/prysm/v1alpha1/validator/blocks.go index 41874110c..aea592079 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/blocks.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/blocks.go @@ -27,81 +27,12 @@ func (vs *Server) StreamBlocksAltair(req *ethpb.StreamBlocksRequest, stream ethp select { case blockEvent := <-blocksChannel: if req.VerifiedOnly { - if blockEvent.Type == statefeed.BlockProcessed { - data, ok := blockEvent.Data.(*statefeed.BlockProcessedData) - if !ok || data == nil { - continue - } - b := ðpb.StreamBlocksResponse{} - switch data.SignedBlock.Version() { - case version.Phase0: - phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlock) - if !ok { - log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlock") - continue - } - b.Block = ðpb.StreamBlocksResponse_Phase0Block{Phase0Block: phBlk} - case version.Altair: - phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockAltair) - if !ok { - log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockAltair") - continue - } - b.Block = ðpb.StreamBlocksResponse_AltairBlock{AltairBlock: phBlk} - case version.Bellatrix: - phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockBellatrix) - if !ok { - log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockBellatrix") - continue - } - b.Block = ðpb.StreamBlocksResponse_BellatrixBlock{BellatrixBlock: phBlk} - } - - if err := stream.Send(b); err != nil { - return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err) - } + if err := sendVerifiedBlocks(stream, blockEvent); err != nil { + return err } } else { - if blockEvent.Type == blockfeed.ReceivedBlock { - data, ok := blockEvent.Data.(*blockfeed.ReceivedBlockData) - if !ok { - // Got bad data over the stream. - continue - } - if data.SignedBlock == nil { - // One nil block shouldn't stop the stream. - continue - } - headState, err := vs.HeadFetcher.HeadState(vs.Ctx) - if err != nil { - log.WithError(err).WithField("blockSlot", data.SignedBlock.Block().Slot()).Error("Could not get head state") - continue - } - signed := data.SignedBlock - if err := blocks.VerifyBlockSignature(headState, signed.Block().ProposerIndex(), signed.Signature(), signed.Block().HashTreeRoot); err != nil { - log.WithError(err).WithField("blockSlot", data.SignedBlock.Block().Slot()).Error("Could not verify block signature") - continue - } - b := ðpb.StreamBlocksResponse{} - switch data.SignedBlock.Version() { - case version.Phase0: - phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlock) - if !ok { - log.Warn("Mismatch between version and block type, was expecting *ethpb.SignedBeaconBlock") - continue - } - b.Block = ðpb.StreamBlocksResponse_Phase0Block{Phase0Block: phBlk} - case version.Altair: - phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockAltair) - if !ok { - log.Warn("Mismatch between version and block type, was expecting *v2.SignedBeaconBlockAltair") - continue - } - b.Block = ðpb.StreamBlocksResponse_AltairBlock{AltairBlock: phBlk} - } - if err := stream.Send(b); err != nil { - return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err) - } + if err := vs.sendBlocks(stream, blockEvent); err != nil { + return err } } case <-blockSub.Err(): @@ -113,3 +44,86 @@ func (vs *Server) StreamBlocksAltair(req *ethpb.StreamBlocksRequest, stream ethp } } } + +func sendVerifiedBlocks(stream ethpb.BeaconNodeValidator_StreamBlocksAltairServer, blockEvent *feed.Event) error { + if blockEvent.Type != statefeed.BlockProcessed { + return nil + } + data, ok := blockEvent.Data.(*statefeed.BlockProcessedData) + if !ok || data == nil { + return nil + } + b := ðpb.StreamBlocksResponse{} + switch data.SignedBlock.Version() { + case version.Phase0: + phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlock) + if !ok { + log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlock") + return nil + } + b.Block = ðpb.StreamBlocksResponse_Phase0Block{Phase0Block: phBlk} + case version.Altair: + phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockAltair) + if !ok { + log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockAltair") + return nil + } + b.Block = ðpb.StreamBlocksResponse_AltairBlock{AltairBlock: phBlk} + case version.Bellatrix: + phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockBellatrix) + if !ok { + log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockBellatrix") + return nil + } + b.Block = ðpb.StreamBlocksResponse_BellatrixBlock{BellatrixBlock: phBlk} + } + + if err := stream.Send(b); err != nil { + return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err) + } + + return nil +} + +func (vs *Server) sendBlocks(stream ethpb.BeaconNodeValidator_StreamBlocksAltairServer, blockEvent *feed.Event) error { + if blockEvent.Type != blockfeed.ReceivedBlock { + return nil + } + + data, ok := blockEvent.Data.(*blockfeed.ReceivedBlockData) + if !ok || data == nil { + // Got bad data over the stream. + return nil + } + if data.SignedBlock == nil { + // One nil block shouldn't stop the stream. + return nil + } + log := log.WithField("blockSlot", data.SignedBlock.Block().Slot()) + headState, err := vs.HeadFetcher.HeadState(vs.Ctx) + if err != nil { + log.WithError(err).Error("Could not get head state") + return nil + } + signed := data.SignedBlock + if err := blocks.VerifyBlockSignature(headState, signed.Block().ProposerIndex(), signed.Signature(), signed.Block().HashTreeRoot); err != nil { + log.WithError(err).Error("Could not verify block signature") + return nil + } + b := ðpb.StreamBlocksResponse{} + switch p := data.SignedBlock.Proto().(type) { + case *ethpb.SignedBeaconBlock: + b.Block = ðpb.StreamBlocksResponse_Phase0Block{Phase0Block: p} + case *ethpb.SignedBeaconBlockAltair: + b.Block = ðpb.StreamBlocksResponse_AltairBlock{AltairBlock: p} + case *ethpb.SignedBeaconBlockBellatrix: + b.Block = ðpb.StreamBlocksResponse_BellatrixBlock{BellatrixBlock: p} + default: + log.Errorf("Unknown block type %T", p) + } + if err := stream.Send(b); err != nil { + return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err) + } + + return nil +}