mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-07 18:21:20 +00:00
c6acf0a28c
* no more head fetchre * no need for head fetcher * nil checks
404 lines
15 KiB
Go
404 lines
15 KiB
Go
package beacon
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"strconv"
|
|
"time"
|
|
|
|
ptypes "github.com/gogo/protobuf/types"
|
|
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
|
"github.com/prysmaticlabs/go-ssz"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
|
|
"github.com/prysmaticlabs/prysm/shared/attestationutil"
|
|
"github.com/prysmaticlabs/prysm/shared/pagination"
|
|
"github.com/prysmaticlabs/prysm/shared/params"
|
|
"github.com/sirupsen/logrus"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// sortableAttestations implements the Sort interface to sort attestations
|
|
// by slot as the canonical sorting attribute.
|
|
type sortableAttestations []*ethpb.Attestation
|
|
|
|
func (s sortableAttestations) Len() int { return len(s) }
|
|
func (s sortableAttestations) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
|
func (s sortableAttestations) Less(i, j int) bool {
|
|
return s[i].Data.Slot < s[j].Data.Slot
|
|
}
|
|
|
|
// ListAttestations retrieves attestations by block root, slot, or epoch.
|
|
// Attestations are sorted by data slot by default.
|
|
//
|
|
// The server may return an empty list when no attestations match the given
|
|
// filter criteria. This RPC should not return NOT_FOUND. Only one filter
|
|
// criteria should be used.
|
|
func (bs *Server) ListAttestations(
|
|
ctx context.Context, req *ethpb.ListAttestationsRequest,
|
|
) (*ethpb.ListAttestationsResponse, error) {
|
|
if int(req.PageSize) > flags.Get().MaxPageSize {
|
|
return nil, status.Errorf(codes.InvalidArgument, "Requested page size %d can not be greater than max size %d",
|
|
req.PageSize, flags.Get().MaxPageSize)
|
|
}
|
|
var atts []*ethpb.Attestation
|
|
var err error
|
|
switch q := req.QueryFilter.(type) {
|
|
case *ethpb.ListAttestationsRequest_Genesis:
|
|
genBlk, err := bs.BeaconDB.GenesisBlock(ctx)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not genesis block: %v", err)
|
|
}
|
|
if genBlk == nil {
|
|
return nil, status.Error(codes.Internal, "Could not find genesis block")
|
|
}
|
|
genesisRoot, err := ssz.HashTreeRoot(genBlk.Block)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetHeadBlockRoot(genesisRoot[:]))
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not fetch genesis attestations: %v", err)
|
|
}
|
|
case *ethpb.ListAttestationsRequest_HeadBlockRoot:
|
|
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetHeadBlockRoot(q.HeadBlockRoot))
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
|
|
}
|
|
case *ethpb.ListAttestationsRequest_SourceEpoch:
|
|
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetSourceEpoch(q.SourceEpoch))
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
|
|
}
|
|
case *ethpb.ListAttestationsRequest_SourceRoot:
|
|
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetSourceRoot(q.SourceRoot))
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
|
|
}
|
|
case *ethpb.ListAttestationsRequest_TargetEpoch:
|
|
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetTargetEpoch(q.TargetEpoch))
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
|
|
}
|
|
case *ethpb.ListAttestationsRequest_TargetRoot:
|
|
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetTargetRoot(q.TargetRoot))
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
|
|
}
|
|
default:
|
|
return nil, status.Error(codes.InvalidArgument, "Must specify a filter criteria for fetching attestations")
|
|
}
|
|
// We sort attestations according to the Sortable interface.
|
|
sort.Sort(sortableAttestations(atts))
|
|
numAttestations := len(atts)
|
|
|
|
// If there are no attestations, we simply return a response specifying this.
|
|
// Otherwise, attempting to paginate 0 attestations below would result in an error.
|
|
if numAttestations == 0 {
|
|
return ðpb.ListAttestationsResponse{
|
|
Attestations: make([]*ethpb.Attestation, 0),
|
|
TotalSize: int32(0),
|
|
NextPageToken: strconv.Itoa(0),
|
|
}, nil
|
|
}
|
|
|
|
start, end, nextPageToken, err := pagination.StartAndEndPage(req.PageToken, int(req.PageSize), numAttestations)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not paginate attestations: %v", err)
|
|
}
|
|
return ðpb.ListAttestationsResponse{
|
|
Attestations: atts[start:end],
|
|
TotalSize: int32(numAttestations),
|
|
NextPageToken: nextPageToken,
|
|
}, nil
|
|
}
|
|
|
|
// ListIndexedAttestations retrieves indexed attestations by target epoch.
|
|
// IndexedAttestationsForEpoch are sorted by data slot by default. Either a target epoch filter
|
|
// or a boolean filter specifying a request for genesis epoch attestations may be used.
|
|
//
|
|
// The server may return an empty list when no attestations match the given
|
|
// filter criteria. This RPC should not return NOT_FOUND. Only one filter
|
|
// criteria should be used.
|
|
func (bs *Server) ListIndexedAttestations(
|
|
ctx context.Context, req *ethpb.ListIndexedAttestationsRequest,
|
|
) (*ethpb.ListIndexedAttestationsResponse, error) {
|
|
atts := make([]*ethpb.Attestation, 0)
|
|
var err error
|
|
epoch := helpers.SlotToEpoch(bs.GenesisTimeFetcher.CurrentSlot())
|
|
switch q := req.QueryFilter.(type) {
|
|
case *ethpb.ListIndexedAttestationsRequest_TargetEpoch:
|
|
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetTargetEpoch(q.TargetEpoch))
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
|
|
}
|
|
epoch = q.TargetEpoch
|
|
case *ethpb.ListIndexedAttestationsRequest_GenesisEpoch:
|
|
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetTargetEpoch(0))
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
|
|
}
|
|
epoch = 0
|
|
default:
|
|
return nil, status.Error(codes.InvalidArgument, "Must specify a filter criteria for fetching attestations")
|
|
}
|
|
// We sort attestations according to the Sortable interface.
|
|
sort.Sort(sortableAttestations(atts))
|
|
numAttestations := len(atts)
|
|
|
|
// If there are no attestations, we simply return a response specifying this.
|
|
// Otherwise, attempting to paginate 0 attestations below would result in an error.
|
|
if numAttestations == 0 {
|
|
return ðpb.ListIndexedAttestationsResponse{
|
|
IndexedAttestations: make([]*ethpb.IndexedAttestation, 0),
|
|
TotalSize: int32(0),
|
|
NextPageToken: strconv.Itoa(0),
|
|
}, nil
|
|
}
|
|
|
|
committeesBySlot, _, err := bs.retrieveCommitteesForEpoch(ctx, epoch)
|
|
if err != nil {
|
|
return nil, status.Errorf(
|
|
codes.Internal,
|
|
"Could not retrieve committees for epoch %d: %v",
|
|
epoch,
|
|
err,
|
|
)
|
|
}
|
|
|
|
// We use the retrieved committees for the epoch to convert all attestations
|
|
// into indexed form effectively.
|
|
indexedAtts := make([]*ethpb.IndexedAttestation, numAttestations, numAttestations)
|
|
startSlot := helpers.StartSlot(epoch)
|
|
endSlot := startSlot + params.BeaconConfig().SlotsPerEpoch
|
|
for i := 0; i < len(indexedAtts); i++ {
|
|
att := atts[i]
|
|
// Out of range check, the attestation slot cannot be greater
|
|
// the last slot of the requested epoch or smaller than its start slot
|
|
// given committees are accessed as a map of slot -> commitees list, where there are
|
|
// SLOTS_PER_EPOCH keys in the map.
|
|
if att.Data.Slot < startSlot || att.Data.Slot > endSlot {
|
|
continue
|
|
}
|
|
committee := committeesBySlot[att.Data.Slot].Committees[att.Data.CommitteeIndex]
|
|
idxAtt, err := attestationutil.ConvertToIndexed(ctx, atts[i], committee.ValidatorIndices)
|
|
if err != nil {
|
|
return nil, status.Errorf(
|
|
codes.Internal,
|
|
"Could not convert attestation with slot %d to indexed form: %v",
|
|
att.Data.Slot,
|
|
err,
|
|
)
|
|
}
|
|
indexedAtts[i] = idxAtt
|
|
}
|
|
|
|
start, end, nextPageToken, err := pagination.StartAndEndPage(req.PageToken, int(req.PageSize), len(indexedAtts))
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not paginate attestations: %v", err)
|
|
}
|
|
return ðpb.ListIndexedAttestationsResponse{
|
|
IndexedAttestations: indexedAtts[start:end],
|
|
TotalSize: int32(len(indexedAtts)),
|
|
NextPageToken: nextPageToken,
|
|
}, nil
|
|
}
|
|
|
|
// StreamAttestations to clients at the end of every slot. This method retrieves the
|
|
// aggregated attestations currently in the pool at the start of a slot and sends
|
|
// them over a gRPC stream.
|
|
func (bs *Server) StreamAttestations(
|
|
_ *ptypes.Empty, stream ethpb.BeaconChain_StreamAttestationsServer,
|
|
) error {
|
|
attestationsChannel := make(chan *feed.Event, 1)
|
|
attSub := bs.AttestationNotifier.OperationFeed().Subscribe(attestationsChannel)
|
|
defer attSub.Unsubscribe()
|
|
for {
|
|
select {
|
|
case event := <-attestationsChannel:
|
|
if event.Type == operation.UnaggregatedAttReceived {
|
|
data, ok := event.Data.(*operation.UnAggregatedAttReceivedData)
|
|
if !ok {
|
|
// Got bad data over the stream.
|
|
continue
|
|
}
|
|
if data.Attestation == nil {
|
|
// One nil attestation shouldn't stop the stream.
|
|
continue
|
|
}
|
|
if err := stream.Send(data.Attestation); err != nil {
|
|
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
|
|
}
|
|
}
|
|
case <-bs.Ctx.Done():
|
|
return status.Error(codes.Canceled, "Context canceled")
|
|
case <-stream.Context().Done():
|
|
return status.Error(codes.Canceled, "Context canceled")
|
|
}
|
|
}
|
|
}
|
|
|
|
// StreamIndexedAttestations to clients at the end of every slot. This method retrieves the
|
|
// aggregated attestations currently in the pool, converts them into indexed form, and
|
|
// sends them over a gRPC stream.
|
|
func (bs *Server) StreamIndexedAttestations(
|
|
_ *ptypes.Empty, stream ethpb.BeaconChain_StreamIndexedAttestationsServer,
|
|
) error {
|
|
attestationsChannel := make(chan *feed.Event, 1)
|
|
attSub := bs.AttestationNotifier.OperationFeed().Subscribe(attestationsChannel)
|
|
defer attSub.Unsubscribe()
|
|
go bs.collectReceivedAttestations(stream.Context())
|
|
for {
|
|
select {
|
|
case event := <-attestationsChannel:
|
|
if event.Type == operation.UnaggregatedAttReceived {
|
|
data, ok := event.Data.(*operation.UnAggregatedAttReceivedData)
|
|
if !ok {
|
|
// Got bad data over the stream.
|
|
continue
|
|
}
|
|
if data.Attestation == nil {
|
|
// One nil attestation shouldn't stop the stream.
|
|
continue
|
|
}
|
|
bs.ReceivedAttestationsBuffer <- data.Attestation
|
|
}
|
|
case atts := <-bs.CollectedAttestationsBuffer:
|
|
// We aggregate the received attestations.
|
|
aggAtts, err := helpers.AggregateAttestations(atts)
|
|
if err != nil {
|
|
return status.Errorf(
|
|
codes.Internal,
|
|
"Could not aggregate attestations: %v",
|
|
err,
|
|
)
|
|
}
|
|
if len(aggAtts) == 0 {
|
|
continue
|
|
}
|
|
// All attestations we receive have the same target epoch given they
|
|
// have the same data root, so we just use the target epoch from
|
|
// the first one to determine committees for converting into indexed
|
|
// form.
|
|
epoch := aggAtts[0].Data.Target.Epoch
|
|
committeesBySlot, _, err := bs.retrieveCommitteesForEpoch(stream.Context(), epoch)
|
|
if err != nil {
|
|
return status.Errorf(
|
|
codes.Internal,
|
|
"Could not retrieve committees for epoch %d: %v",
|
|
epoch,
|
|
err,
|
|
)
|
|
}
|
|
// We use the retrieved committees for the epoch to convert all attestations
|
|
// into indexed form effectively.
|
|
startSlot := helpers.StartSlot(epoch)
|
|
endSlot := startSlot + params.BeaconConfig().SlotsPerEpoch
|
|
for _, att := range aggAtts {
|
|
// Out of range check, the attestation slot cannot be greater
|
|
// the last slot of the requested epoch or smaller than its start slot
|
|
// given committees are accessed as a map of slot -> commitees list, where there are
|
|
// SLOTS_PER_EPOCH keys in the map.
|
|
if att.Data.Slot < startSlot || att.Data.Slot > endSlot {
|
|
continue
|
|
}
|
|
committeesForSlot, ok := committeesBySlot[att.Data.Slot]
|
|
if !ok || committeesForSlot.Committees == nil {
|
|
continue
|
|
}
|
|
committee := committeesForSlot.Committees[att.Data.CommitteeIndex]
|
|
idxAtt, err := attestationutil.ConvertToIndexed(stream.Context(), att, committee.ValidatorIndices)
|
|
if err != nil {
|
|
return status.Errorf(
|
|
codes.Internal,
|
|
"Could not convert attestation with slot %d to indexed form: %v",
|
|
att.Data.Slot,
|
|
err,
|
|
)
|
|
}
|
|
if err := stream.Send(idxAtt); err != nil {
|
|
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
|
|
}
|
|
}
|
|
case <-bs.Ctx.Done():
|
|
return status.Error(codes.Canceled, "Context canceled")
|
|
case <-stream.Context().Done():
|
|
return status.Error(codes.Canceled, "Context canceled")
|
|
}
|
|
}
|
|
}
|
|
|
|
// TODO(#5031): Instead of doing aggregation here, leverage the aggregation
|
|
// already being done by the attestation pool in the operations service.
|
|
func (bs *Server) collectReceivedAttestations(ctx context.Context) {
|
|
attsByRoot := make(map[[32]byte][]*ethpb.Attestation)
|
|
halfASlot := time.Duration(params.BeaconConfig().SecondsPerSlot / 2)
|
|
ticker := time.NewTicker(time.Second * halfASlot)
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
for root, atts := range attsByRoot {
|
|
if len(atts) > 0 {
|
|
bs.CollectedAttestationsBuffer <- atts
|
|
attsByRoot[root] = make([]*ethpb.Attestation, 0)
|
|
}
|
|
}
|
|
case att := <-bs.ReceivedAttestationsBuffer:
|
|
attDataRoot, err := ssz.HashTreeRoot(att.Data)
|
|
if err != nil {
|
|
logrus.Errorf("Could not hash tree root data: %v", err)
|
|
continue
|
|
}
|
|
attsByRoot[attDataRoot] = append(attsByRoot[attDataRoot], att)
|
|
case <-ctx.Done():
|
|
return
|
|
case <-bs.Ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// AttestationPool retrieves pending attestations.
|
|
//
|
|
// The server returns a list of attestations that have been seen but not
|
|
// yet processed. Pool attestations eventually expire as the slot
|
|
// advances, so an attestation missing from this request does not imply
|
|
// that it was included in a block. The attestation may have expired.
|
|
// Refer to the ethereum 2.0 specification for more details on how
|
|
// attestations are processed and when they are no longer valid.
|
|
// https://github.com/ethereum/eth2.0-specs/blob/dev/specs/core/0_beacon-chain.md#attestations
|
|
func (bs *Server) AttestationPool(
|
|
ctx context.Context, req *ethpb.AttestationPoolRequest,
|
|
) (*ethpb.AttestationPoolResponse, error) {
|
|
if int(req.PageSize) > flags.Get().MaxPageSize {
|
|
return nil, status.Errorf(
|
|
codes.InvalidArgument,
|
|
"Requested page size %d can not be greater than max size %d",
|
|
req.PageSize,
|
|
flags.Get().MaxPageSize,
|
|
)
|
|
}
|
|
atts := bs.AttestationsPool.AggregatedAttestations()
|
|
numAtts := len(atts)
|
|
if numAtts == 0 {
|
|
return ðpb.AttestationPoolResponse{
|
|
Attestations: make([]*ethpb.Attestation, 0),
|
|
TotalSize: int32(0),
|
|
NextPageToken: strconv.Itoa(0),
|
|
}, nil
|
|
}
|
|
start, end, nextPageToken, err := pagination.StartAndEndPage(req.PageToken, int(req.PageSize), numAtts)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not paginate attestations: %v", err)
|
|
}
|
|
return ðpb.AttestationPoolResponse{
|
|
Attestations: atts[start:end],
|
|
TotalSize: int32(numAtts),
|
|
NextPageToken: nextPageToken,
|
|
}, nil
|
|
}
|