mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-03 16:37:39 +00:00
Refactor high complexity StreamBlocksAltair (#10726)
* Refactor high complexity StreamBlocksAltair * catch nil data Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
parent
39893bbe30
commit
c603d120d7
@ -27,81 +27,12 @@ func (vs *Server) StreamBlocksAltair(req *ethpb.StreamBlocksRequest, stream ethp
|
||||
select {
|
||||
case blockEvent := <-blocksChannel:
|
||||
if req.VerifiedOnly {
|
||||
if blockEvent.Type == statefeed.BlockProcessed {
|
||||
data, ok := blockEvent.Data.(*statefeed.BlockProcessedData)
|
||||
if !ok || data == nil {
|
||||
continue
|
||||
}
|
||||
b := ðpb.StreamBlocksResponse{}
|
||||
switch data.SignedBlock.Version() {
|
||||
case version.Phase0:
|
||||
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlock)
|
||||
if !ok {
|
||||
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlock")
|
||||
continue
|
||||
}
|
||||
b.Block = ðpb.StreamBlocksResponse_Phase0Block{Phase0Block: phBlk}
|
||||
case version.Altair:
|
||||
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockAltair)
|
||||
if !ok {
|
||||
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockAltair")
|
||||
continue
|
||||
}
|
||||
b.Block = ðpb.StreamBlocksResponse_AltairBlock{AltairBlock: phBlk}
|
||||
case version.Bellatrix:
|
||||
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockBellatrix)
|
||||
if !ok {
|
||||
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockBellatrix")
|
||||
continue
|
||||
}
|
||||
b.Block = ðpb.StreamBlocksResponse_BellatrixBlock{BellatrixBlock: phBlk}
|
||||
}
|
||||
|
||||
if err := stream.Send(b); err != nil {
|
||||
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
|
||||
}
|
||||
if err := sendVerifiedBlocks(stream, blockEvent); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if blockEvent.Type == blockfeed.ReceivedBlock {
|
||||
data, ok := blockEvent.Data.(*blockfeed.ReceivedBlockData)
|
||||
if !ok {
|
||||
// Got bad data over the stream.
|
||||
continue
|
||||
}
|
||||
if data.SignedBlock == nil {
|
||||
// One nil block shouldn't stop the stream.
|
||||
continue
|
||||
}
|
||||
headState, err := vs.HeadFetcher.HeadState(vs.Ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("blockSlot", data.SignedBlock.Block().Slot()).Error("Could not get head state")
|
||||
continue
|
||||
}
|
||||
signed := data.SignedBlock
|
||||
if err := blocks.VerifyBlockSignature(headState, signed.Block().ProposerIndex(), signed.Signature(), signed.Block().HashTreeRoot); err != nil {
|
||||
log.WithError(err).WithField("blockSlot", data.SignedBlock.Block().Slot()).Error("Could not verify block signature")
|
||||
continue
|
||||
}
|
||||
b := ðpb.StreamBlocksResponse{}
|
||||
switch data.SignedBlock.Version() {
|
||||
case version.Phase0:
|
||||
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlock)
|
||||
if !ok {
|
||||
log.Warn("Mismatch between version and block type, was expecting *ethpb.SignedBeaconBlock")
|
||||
continue
|
||||
}
|
||||
b.Block = ðpb.StreamBlocksResponse_Phase0Block{Phase0Block: phBlk}
|
||||
case version.Altair:
|
||||
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockAltair)
|
||||
if !ok {
|
||||
log.Warn("Mismatch between version and block type, was expecting *v2.SignedBeaconBlockAltair")
|
||||
continue
|
||||
}
|
||||
b.Block = ðpb.StreamBlocksResponse_AltairBlock{AltairBlock: phBlk}
|
||||
}
|
||||
if err := stream.Send(b); err != nil {
|
||||
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
|
||||
}
|
||||
if err := vs.sendBlocks(stream, blockEvent); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case <-blockSub.Err():
|
||||
@ -113,3 +44,86 @@ func (vs *Server) StreamBlocksAltair(req *ethpb.StreamBlocksRequest, stream ethp
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func sendVerifiedBlocks(stream ethpb.BeaconNodeValidator_StreamBlocksAltairServer, blockEvent *feed.Event) error {
|
||||
if blockEvent.Type != statefeed.BlockProcessed {
|
||||
return nil
|
||||
}
|
||||
data, ok := blockEvent.Data.(*statefeed.BlockProcessedData)
|
||||
if !ok || data == nil {
|
||||
return nil
|
||||
}
|
||||
b := ðpb.StreamBlocksResponse{}
|
||||
switch data.SignedBlock.Version() {
|
||||
case version.Phase0:
|
||||
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlock)
|
||||
if !ok {
|
||||
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlock")
|
||||
return nil
|
||||
}
|
||||
b.Block = ðpb.StreamBlocksResponse_Phase0Block{Phase0Block: phBlk}
|
||||
case version.Altair:
|
||||
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockAltair)
|
||||
if !ok {
|
||||
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockAltair")
|
||||
return nil
|
||||
}
|
||||
b.Block = ðpb.StreamBlocksResponse_AltairBlock{AltairBlock: phBlk}
|
||||
case version.Bellatrix:
|
||||
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockBellatrix)
|
||||
if !ok {
|
||||
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockBellatrix")
|
||||
return nil
|
||||
}
|
||||
b.Block = ðpb.StreamBlocksResponse_BellatrixBlock{BellatrixBlock: phBlk}
|
||||
}
|
||||
|
||||
if err := stream.Send(b); err != nil {
|
||||
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vs *Server) sendBlocks(stream ethpb.BeaconNodeValidator_StreamBlocksAltairServer, blockEvent *feed.Event) error {
|
||||
if blockEvent.Type != blockfeed.ReceivedBlock {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, ok := blockEvent.Data.(*blockfeed.ReceivedBlockData)
|
||||
if !ok || data == nil {
|
||||
// Got bad data over the stream.
|
||||
return nil
|
||||
}
|
||||
if data.SignedBlock == nil {
|
||||
// One nil block shouldn't stop the stream.
|
||||
return nil
|
||||
}
|
||||
log := log.WithField("blockSlot", data.SignedBlock.Block().Slot())
|
||||
headState, err := vs.HeadFetcher.HeadState(vs.Ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get head state")
|
||||
return nil
|
||||
}
|
||||
signed := data.SignedBlock
|
||||
if err := blocks.VerifyBlockSignature(headState, signed.Block().ProposerIndex(), signed.Signature(), signed.Block().HashTreeRoot); err != nil {
|
||||
log.WithError(err).Error("Could not verify block signature")
|
||||
return nil
|
||||
}
|
||||
b := ðpb.StreamBlocksResponse{}
|
||||
switch p := data.SignedBlock.Proto().(type) {
|
||||
case *ethpb.SignedBeaconBlock:
|
||||
b.Block = ðpb.StreamBlocksResponse_Phase0Block{Phase0Block: p}
|
||||
case *ethpb.SignedBeaconBlockAltair:
|
||||
b.Block = ðpb.StreamBlocksResponse_AltairBlock{AltairBlock: p}
|
||||
case *ethpb.SignedBeaconBlockBellatrix:
|
||||
b.Block = ðpb.StreamBlocksResponse_BellatrixBlock{BellatrixBlock: p}
|
||||
default:
|
||||
log.Errorf("Unknown block type %T", p)
|
||||
}
|
||||
if err := stream.Send(b); err != nil {
|
||||
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user