mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-29 06:37:17 +00:00
11a1f681e0
* async packages * change pkg * build * working Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
109 lines
4.0 KiB
Go
109 lines
4.0 KiB
Go
package validator
|
|
|
|
import (
|
|
"github.com/prysmaticlabs/prysm/async/event"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
|
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
|
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
|
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
|
"github.com/prysmaticlabs/prysm/runtime/version"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// StreamBlocksAltair to clients every single time a block is received by the beacon node.
|
|
func (bs *Server) StreamBlocksAltair(req *ethpb.StreamBlocksRequest, stream ethpb.BeaconNodeValidator_StreamBlocksAltairServer) error {
|
|
blocksChannel := make(chan *feed.Event, 1)
|
|
var blockSub event.Subscription
|
|
if req.VerifiedOnly {
|
|
blockSub = bs.StateNotifier.StateFeed().Subscribe(blocksChannel)
|
|
} else {
|
|
blockSub = bs.BlockNotifier.BlockFeed().Subscribe(blocksChannel)
|
|
}
|
|
defer blockSub.Unsubscribe()
|
|
|
|
for {
|
|
select {
|
|
case blockEvent := <-blocksChannel:
|
|
if req.VerifiedOnly {
|
|
if blockEvent.Type == statefeed.BlockProcessed {
|
|
data, ok := blockEvent.Data.(*statefeed.BlockProcessedData)
|
|
if !ok || data == nil {
|
|
continue
|
|
}
|
|
b := ðpb.StreamBlocksResponse{}
|
|
switch data.SignedBlock.Version() {
|
|
case version.Phase0:
|
|
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlock)
|
|
if !ok {
|
|
log.Warn("Mismatch between version and block type, was expecting *ethpb.SignedBeaconBlock")
|
|
continue
|
|
}
|
|
b.Block = ðpb.StreamBlocksResponse_Phase0Block{Phase0Block: phBlk}
|
|
case version.Altair:
|
|
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockAltair)
|
|
if !ok {
|
|
log.Warn("Mismatch between version and block type, was expecting *v2.SignedBeaconBlockAltair")
|
|
continue
|
|
}
|
|
b.Block = ðpb.StreamBlocksResponse_AltairBlock{AltairBlock: phBlk}
|
|
}
|
|
|
|
if err := stream.Send(b); err != nil {
|
|
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
|
|
}
|
|
}
|
|
} else {
|
|
if blockEvent.Type == blockfeed.ReceivedBlock {
|
|
data, ok := blockEvent.Data.(*blockfeed.ReceivedBlockData)
|
|
if !ok {
|
|
// Got bad data over the stream.
|
|
continue
|
|
}
|
|
if data.SignedBlock == nil {
|
|
// One nil block shouldn't stop the stream.
|
|
continue
|
|
}
|
|
headState, err := bs.HeadFetcher.HeadState(bs.Ctx)
|
|
if err != nil {
|
|
log.WithError(err).WithField("blockSlot", data.SignedBlock.Block().Slot()).Error("Could not get head state")
|
|
continue
|
|
}
|
|
signed := data.SignedBlock
|
|
if err := blocks.VerifyBlockSignature(headState, signed.Block().ProposerIndex(), signed.Signature(), signed.Block().HashTreeRoot); err != nil {
|
|
log.WithError(err).WithField("blockSlot", data.SignedBlock.Block().Slot()).Error("Could not verify block signature")
|
|
continue
|
|
}
|
|
b := ðpb.StreamBlocksResponse{}
|
|
switch data.SignedBlock.Version() {
|
|
case version.Phase0:
|
|
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlock)
|
|
if !ok {
|
|
log.Warn("Mismatch between version and block type, was expecting *ethpb.SignedBeaconBlock")
|
|
continue
|
|
}
|
|
b.Block = ðpb.StreamBlocksResponse_Phase0Block{Phase0Block: phBlk}
|
|
case version.Altair:
|
|
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockAltair)
|
|
if !ok {
|
|
log.Warn("Mismatch between version and block type, was expecting *v2.SignedBeaconBlockAltair")
|
|
continue
|
|
}
|
|
b.Block = ðpb.StreamBlocksResponse_AltairBlock{AltairBlock: phBlk}
|
|
}
|
|
if err := stream.Send(b); err != nil {
|
|
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
|
|
}
|
|
}
|
|
}
|
|
case <-blockSub.Err():
|
|
return status.Error(codes.Aborted, "Subscriber closed, exiting goroutine")
|
|
case <-bs.Ctx.Done():
|
|
return status.Error(codes.Canceled, "Context canceled")
|
|
case <-stream.Context().Done():
|
|
return status.Error(codes.Canceled, "Context canceled")
|
|
}
|
|
}
|
|
}
|