prysm-pulse/beacon-chain/rpc/beacon/attestations.go
Raul Jordan 0c9e99e04a
Aggregate Attestations Before Streaming to Slasher (#5029)
* rem slasher proto
* Merge branch 'master' of github.com:prysmaticlabs/prysm
* Merge branch 'master' of github.com:prysmaticlabs/prysm
* Merge branch 'master' of github.com:prysmaticlabs/prysm
* Merge branch 'master' of github.com:prysmaticlabs/prysm
* Merge branch 'master' of github.com:prysmaticlabs/prysm
* Merge branch 'master' of github.com:prysmaticlabs/prysm
* aggregate before streaming
* Merge branch 'agg-idx-atts' of github.com:prysmaticlabs/prysm into agg-idx-atts
* collect atts and increase buffer size
* fix test for func
* Merge refs/heads/master into agg-idx-atts
* Update beacon-chain/rpc/beacon/attestations.go
* Merge refs/heads/master into agg-idx-atts
* naming
* Merge branch 'agg-idx-atts' of github.com:prysmaticlabs/prysm into agg-idx-atts
* Merge refs/heads/master into agg-idx-atts
* comment terence feedback
* Merge refs/heads/master into agg-idx-atts
* Merge refs/heads/master into agg-idx-atts
* Merge refs/heads/master into agg-idx-atts
* Merge refs/heads/master into agg-idx-atts
* Merge refs/heads/master into agg-idx-atts
* Merge refs/heads/master into agg-idx-atts
* Merge refs/heads/master into agg-idx-atts
* Merge refs/heads/master into agg-idx-atts
* Merge refs/heads/master into agg-idx-atts
* Merge refs/heads/master into agg-idx-atts
* Fix tests
2020-03-08 21:39:54 +00:00

397 lines
15 KiB
Go

package beacon
import (
"context"
"sort"
"strconv"
"time"
ptypes "github.com/gogo/protobuf/types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/pagination"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// sortableAttestations implements the Sort interface to sort attestations
// by slot as the canonical sorting attribute.
type sortableAttestations []*ethpb.Attestation
func (s sortableAttestations) Len() int { return len(s) }
func (s sortableAttestations) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s sortableAttestations) Less(i, j int) bool {
return s[i].Data.Slot < s[j].Data.Slot
}
// ListAttestations retrieves attestations by block root, slot, or epoch.
// Attestations are sorted by data slot by default.
//
// The server may return an empty list when no attestations match the given
// filter criteria. This RPC should not return NOT_FOUND. Only one filter
// criteria should be used.
func (bs *Server) ListAttestations(
ctx context.Context, req *ethpb.ListAttestationsRequest,
) (*ethpb.ListAttestationsResponse, error) {
if int(req.PageSize) > flags.Get().MaxPageSize {
return nil, status.Errorf(codes.InvalidArgument, "Requested page size %d can not be greater than max size %d",
req.PageSize, flags.Get().MaxPageSize)
}
var atts []*ethpb.Attestation
var err error
switch q := req.QueryFilter.(type) {
case *ethpb.ListAttestationsRequest_Genesis:
genBlk, err := bs.BeaconDB.GenesisBlock(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not genesis block: %v", err)
}
if genBlk == nil {
return nil, status.Error(codes.Internal, "Could not find genesis block")
}
genesisRoot, err := ssz.HashTreeRoot(genBlk.Block)
if err != nil {
return nil, err
}
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetHeadBlockRoot(genesisRoot[:]))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not fetch genesis attestations: %v", err)
}
case *ethpb.ListAttestationsRequest_HeadBlockRoot:
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetHeadBlockRoot(q.HeadBlockRoot))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
}
case *ethpb.ListAttestationsRequest_SourceEpoch:
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetSourceEpoch(q.SourceEpoch))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
}
case *ethpb.ListAttestationsRequest_SourceRoot:
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetSourceRoot(q.SourceRoot))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
}
case *ethpb.ListAttestationsRequest_TargetEpoch:
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetTargetEpoch(q.TargetEpoch))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
}
case *ethpb.ListAttestationsRequest_TargetRoot:
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetTargetRoot(q.TargetRoot))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
}
default:
return nil, status.Error(codes.InvalidArgument, "Must specify a filter criteria for fetching attestations")
}
// We sort attestations according to the Sortable interface.
sort.Sort(sortableAttestations(atts))
numAttestations := len(atts)
// If there are no attestations, we simply return a response specifying this.
// Otherwise, attempting to paginate 0 attestations below would result in an error.
if numAttestations == 0 {
return &ethpb.ListAttestationsResponse{
Attestations: make([]*ethpb.Attestation, 0),
TotalSize: int32(0),
NextPageToken: strconv.Itoa(0),
}, nil
}
start, end, nextPageToken, err := pagination.StartAndEndPage(req.PageToken, int(req.PageSize), numAttestations)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not paginate attestations: %v", err)
}
return &ethpb.ListAttestationsResponse{
Attestations: atts[start:end],
TotalSize: int32(numAttestations),
NextPageToken: nextPageToken,
}, nil
}
// ListIndexedAttestations retrieves indexed attestations by target epoch.
// IndexedAttestationsForEpoch are sorted by data slot by default. Either a target epoch filter
// or a boolean filter specifying a request for genesis epoch attestations may be used.
//
// The server may return an empty list when no attestations match the given
// filter criteria. This RPC should not return NOT_FOUND. Only one filter
// criteria should be used.
func (bs *Server) ListIndexedAttestations(
ctx context.Context, req *ethpb.ListIndexedAttestationsRequest,
) (*ethpb.ListIndexedAttestationsResponse, error) {
atts := make([]*ethpb.Attestation, 0)
var err error
epoch := helpers.SlotToEpoch(bs.GenesisTimeFetcher.CurrentSlot())
switch q := req.QueryFilter.(type) {
case *ethpb.ListIndexedAttestationsRequest_TargetEpoch:
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetTargetEpoch(q.TargetEpoch))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
}
epoch = q.TargetEpoch
case *ethpb.ListIndexedAttestationsRequest_GenesisEpoch:
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetTargetEpoch(0))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
}
epoch = 0
default:
return nil, status.Error(codes.InvalidArgument, "Must specify a filter criteria for fetching attestations")
}
// We sort attestations according to the Sortable interface.
sort.Sort(sortableAttestations(atts))
numAttestations := len(atts)
// If there are no attestations, we simply return a response specifying this.
// Otherwise, attempting to paginate 0 attestations below would result in an error.
if numAttestations == 0 {
return &ethpb.ListIndexedAttestationsResponse{
IndexedAttestations: make([]*ethpb.IndexedAttestation, 0),
TotalSize: int32(0),
NextPageToken: strconv.Itoa(0),
}, nil
}
committeesBySlot, _, err := bs.retrieveCommitteesForEpoch(ctx, epoch)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"Could not retrieve committees for epoch %d: %v",
epoch,
err,
)
}
// We use the retrieved committees for the epoch to convert all attestations
// into indexed form effectively.
indexedAtts := make([]*ethpb.IndexedAttestation, numAttestations, numAttestations)
startSlot := helpers.StartSlot(epoch)
endSlot := startSlot + params.BeaconConfig().SlotsPerEpoch
for i := 0; i < len(indexedAtts); i++ {
att := atts[i]
// Out of range check, the attestation slot cannot be greater
// the last slot of the requested epoch or smaller than its start slot
// given committees are accessed as a map of slot -> commitees list, where there are
// SLOTS_PER_EPOCH keys in the map.
if att.Data.Slot < startSlot || att.Data.Slot > endSlot {
continue
}
committee := committeesBySlot[att.Data.Slot].Committees[att.Data.CommitteeIndex]
idxAtt, err := attestationutil.ConvertToIndexed(ctx, atts[i], committee.ValidatorIndices)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"Could not convert attestation with slot %d to indexed form: %v",
att.Data.Slot,
err,
)
}
indexedAtts[i] = idxAtt
}
start, end, nextPageToken, err := pagination.StartAndEndPage(req.PageToken, int(req.PageSize), len(indexedAtts))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not paginate attestations: %v", err)
}
return &ethpb.ListIndexedAttestationsResponse{
IndexedAttestations: indexedAtts[start:end],
TotalSize: int32(len(indexedAtts)),
NextPageToken: nextPageToken,
}, nil
}
// StreamAttestations to clients at the end of every slot. This method retrieves the
// aggregated attestations currently in the pool at the start of a slot and sends
// them over a gRPC stream.
func (bs *Server) StreamAttestations(
_ *ptypes.Empty, stream ethpb.BeaconChain_StreamAttestationsServer,
) error {
attestationsChannel := make(chan *feed.Event, 1)
attSub := bs.AttestationNotifier.OperationFeed().Subscribe(attestationsChannel)
defer attSub.Unsubscribe()
for {
select {
case event := <-attestationsChannel:
if event.Type == operation.UnaggregatedAttReceived {
data, ok := event.Data.(*operation.UnAggregatedAttReceivedData)
if !ok {
// Got bad data over the stream.
continue
}
if data.Attestation == nil {
// One nil attestation shouldn't stop the stream.
continue
}
if err := stream.Send(data.Attestation); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
}
case <-bs.Ctx.Done():
return status.Error(codes.Canceled, "Context canceled")
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Context canceled")
}
}
}
// StreamIndexedAttestations to clients at the end of every slot. This method retrieves the
// aggregated attestations currently in the pool, converts them into indexed form, and
// sends them over a gRPC stream.
func (bs *Server) StreamIndexedAttestations(
_ *ptypes.Empty, stream ethpb.BeaconChain_StreamIndexedAttestationsServer,
) error {
attestationsChannel := make(chan *feed.Event, 1)
attSub := bs.AttestationNotifier.OperationFeed().Subscribe(attestationsChannel)
defer attSub.Unsubscribe()
go bs.collectReceivedAttestations(stream.Context())
for {
select {
case event := <-attestationsChannel:
if event.Type == operation.UnaggregatedAttReceived {
data, ok := event.Data.(*operation.UnAggregatedAttReceivedData)
if !ok {
// Got bad data over the stream.
continue
}
if data.Attestation == nil {
// One nil attestation shouldn't stop the stream.
continue
}
bs.ReceivedAttestationsBuffer <- data.Attestation
}
case atts := <-bs.CollectedAttestationsBuffer:
// We aggregate the received attestations.
aggAtts, err := helpers.AggregateAttestations(atts)
if err != nil {
return status.Errorf(
codes.Internal,
"Could not aggregate attestations: %v",
err,
)
}
epoch := helpers.SlotToEpoch(bs.HeadFetcher.HeadSlot())
committeesBySlot, _, err := bs.retrieveCommitteesForEpoch(stream.Context(), epoch)
if err != nil {
return status.Errorf(
codes.Internal,
"Could not retrieve committees for epoch %d: %v",
epoch,
err,
)
}
// We use the retrieved committees for the epoch to convert all attestations
// into indexed form effectively.
startSlot := helpers.StartSlot(epoch)
endSlot := startSlot + params.BeaconConfig().SlotsPerEpoch
for _, att := range aggAtts {
// Out of range check, the attestation slot cannot be greater
// the last slot of the requested epoch or smaller than its start slot
// given committees are accessed as a map of slot -> commitees list, where there are
// SLOTS_PER_EPOCH keys in the map.
if att.Data.Slot < startSlot || att.Data.Slot > endSlot {
continue
}
committeesForSlot, ok := committeesBySlot[att.Data.Slot]
if !ok || committeesForSlot.Committees == nil {
continue
}
committee := committeesForSlot.Committees[att.Data.CommitteeIndex]
idxAtt, err := attestationutil.ConvertToIndexed(stream.Context(), att, committee.ValidatorIndices)
if err != nil {
return status.Errorf(
codes.Internal,
"Could not convert attestation with slot %d to indexed form: %v",
att.Data.Slot,
err,
)
}
if err := stream.Send(idxAtt); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
}
case <-bs.Ctx.Done():
return status.Error(codes.Canceled, "Context canceled")
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Context canceled")
}
}
}
// TODO(#5031): Instead of doing aggregation here, leverage the aggregation
// already being done by the attestation pool in the operations service.
func (bs *Server) collectReceivedAttestations(ctx context.Context) {
attsByRoot := make(map[[32]byte][]*ethpb.Attestation)
halfASlot := time.Duration(params.BeaconConfig().SecondsPerSlot / 2)
ticker := time.NewTicker(time.Second * halfASlot)
for {
select {
case <-ticker.C:
for root, atts := range attsByRoot {
if len(atts) > 0 {
bs.CollectedAttestationsBuffer <- atts
attsByRoot[root] = make([]*ethpb.Attestation, 0)
}
}
case att := <-bs.ReceivedAttestationsBuffer:
attDataRoot, err := ssz.HashTreeRoot(att.Data)
if err != nil {
logrus.Errorf("Could not hash tree root data: %v", err)
continue
}
attsByRoot[attDataRoot] = append(attsByRoot[attDataRoot], att)
case <-ctx.Done():
return
case <-bs.Ctx.Done():
return
}
}
}
// AttestationPool retrieves pending attestations.
//
// The server returns a list of attestations that have been seen but not
// yet processed. Pool attestations eventually expire as the slot
// advances, so an attestation missing from this request does not imply
// that it was included in a block. The attestation may have expired.
// Refer to the ethereum 2.0 specification for more details on how
// attestations are processed and when they are no longer valid.
// https://github.com/ethereum/eth2.0-specs/blob/dev/specs/core/0_beacon-chain.md#attestations
func (bs *Server) AttestationPool(
ctx context.Context, req *ethpb.AttestationPoolRequest,
) (*ethpb.AttestationPoolResponse, error) {
if int(req.PageSize) > flags.Get().MaxPageSize {
return nil, status.Errorf(
codes.InvalidArgument,
"Requested page size %d can not be greater than max size %d",
req.PageSize,
flags.Get().MaxPageSize,
)
}
atts := bs.AttestationsPool.AggregatedAttestations()
numAtts := len(atts)
if numAtts == 0 {
return &ethpb.AttestationPoolResponse{
Attestations: make([]*ethpb.Attestation, 0),
TotalSize: int32(0),
NextPageToken: strconv.Itoa(0),
}, nil
}
start, end, nextPageToken, err := pagination.StartAndEndPage(req.PageToken, int(req.PageSize), numAtts)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not paginate attestations: %v", err)
}
return &ethpb.AttestationPoolResponse{
Attestations: atts[start:end],
TotalSize: int32(numAtts),
NextPageToken: nextPageToken,
}, nil
}