prysm-pulse/beacon-chain/rpc/beacon_chain_server.go
Raul Jordan 305d0299dd
Revamp GetValidators to Retrieve Historical Validators By Epoch (#3563)
* archive participation begin implementation

* validator participation compute

* comments

* compute participation common func

* full test for archiving data

* gazelle

* complete tests

* gaz

* properly retrieve the validators

* revert weird change

* historical validator fetching

* resolves issues with current tests

* adding test for old epoch validators

* tests in
2019-09-23 18:00:38 -05:00

607 lines
23 KiB
Go

package rpc
import (
"context"
"fmt"
"sort"
"time"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/epoch"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/pagination"
"github.com/prysmaticlabs/prysm/shared/params"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// BeaconChainServer defines a server implementation of the gRPC Beacon Chain service,
// providing RPC endpoints to access data relevant to the Ethereum 2.0 phase 0
// beacon chain.
type BeaconChainServer struct {
beaconDB db.Database
ctx context.Context
chainStartFetcher powchain.ChainStartFetcher
headFetcher blockchain.HeadFetcher
finalizationFetcher blockchain.FinalizationFetcher
stateFeedListener blockchain.ChainFeeds
pool operations.Pool
incomingAttestation chan *ethpb.Attestation
canonicalStateChan chan *pbp2p.BeaconState
chainStartChan chan time.Time
}
// sortableAttestations implements the Sort interface to sort attestations
// by shard as the canonical sorting attribute.
type sortableAttestations []*ethpb.Attestation
func (s sortableAttestations) Len() int { return len(s) }
func (s sortableAttestations) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s sortableAttestations) Less(i, j int) bool {
return s[i].Data.Crosslink.Shard < s[j].Data.Crosslink.Shard
}
// ListAttestations retrieves attestations by block root, slot, or epoch.
// Attestations are sorted by crosslink shard by default.
//
// The server may return an empty list when no attestations match the given
// filter criteria. This RPC should not return NOT_FOUND. Only one filter
// criteria should be used.
func (bs *BeaconChainServer) ListAttestations(
ctx context.Context, req *ethpb.ListAttestationsRequest,
) (*ethpb.ListAttestationsResponse, error) {
if int(req.PageSize) > params.BeaconConfig().MaxPageSize {
return nil, status.Errorf(codes.InvalidArgument, "requested page size %d can not be greater than max size %d",
req.PageSize, params.BeaconConfig().MaxPageSize)
}
var atts []*ethpb.Attestation
var err error
switch q := req.QueryFilter.(type) {
case *ethpb.ListAttestationsRequest_HeadBlockRoot:
atts, err = bs.beaconDB.Attestations(ctx, filters.NewFilter().SetHeadBlockRoot(q.HeadBlockRoot))
if err != nil {
return nil, status.Errorf(codes.Internal, "could not fetch attestations: %v", err)
}
case *ethpb.ListAttestationsRequest_SourceEpoch:
atts, err = bs.beaconDB.Attestations(ctx, filters.NewFilter().SetSourceEpoch(q.SourceEpoch))
if err != nil {
return nil, status.Errorf(codes.Internal, "could not fetch attestations: %v", err)
}
case *ethpb.ListAttestationsRequest_SourceRoot:
atts, err = bs.beaconDB.Attestations(ctx, filters.NewFilter().SetSourceRoot(q.SourceRoot))
if err != nil {
return nil, status.Errorf(codes.Internal, "could not fetch attestations: %v", err)
}
case *ethpb.ListAttestationsRequest_TargetEpoch:
atts, err = bs.beaconDB.Attestations(ctx, filters.NewFilter().SetTargetEpoch(q.TargetEpoch))
if err != nil {
return nil, status.Errorf(codes.Internal, "could not fetch attestations: %v", err)
}
case *ethpb.ListAttestationsRequest_TargetRoot:
atts, err = bs.beaconDB.Attestations(ctx, filters.NewFilter().SetTargetRoot(q.TargetRoot))
if err != nil {
return nil, status.Errorf(codes.Internal, "could not fetch attestations: %v", err)
}
default:
atts, err = bs.beaconDB.Attestations(ctx, nil)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not fetch attestations: %v", err)
}
}
// We sort attestations according to the Sortable interface.
sort.Sort(sortableAttestations(atts))
numAttestations := len(atts)
start, end, nextPageToken, err := pagination.StartAndEndPage(req.PageToken, int(req.PageSize), numAttestations)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not paginate attestations: %v", err)
}
return &ethpb.ListAttestationsResponse{
Attestations: atts[start:end],
TotalSize: int32(numAttestations),
NextPageToken: nextPageToken,
}, nil
}
// AttestationPool retrieves pending attestations.
//
// The server returns a list of attestations that have been seen but not
// yet processed. Pool attestations eventually expire as the slot
// advances, so an attestation missing from this request does not imply
// that it was included in a block. The attestation may have expired.
// Refer to the ethereum 2.0 specification for more details on how
// attestations are processed and when they are no longer valid.
// https://github.com/ethereum/eth2.0-specs/blob/dev/specs/core/0_beacon-chain.md#attestations
func (bs *BeaconChainServer) AttestationPool(
ctx context.Context, _ *ptypes.Empty,
) (*ethpb.AttestationPoolResponse, error) {
atts, err := bs.pool.AttestationPoolNoVerify(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not fetch attestations: %v", err)
}
return &ethpb.AttestationPoolResponse{
Attestations: atts,
}, nil
}
// ListBlocks retrieves blocks by root, slot, or epoch.
//
// The server may return multiple blocks in the case that a slot or epoch is
// provided as the filter criteria. The server may return an empty list when
// no blocks in their database match the filter criteria. This RPC should
// not return NOT_FOUND. Only one filter criteria should be used.
func (bs *BeaconChainServer) ListBlocks(
ctx context.Context, req *ethpb.ListBlocksRequest,
) (*ethpb.ListBlocksResponse, error) {
if int(req.PageSize) > params.BeaconConfig().MaxPageSize {
return nil, status.Errorf(codes.InvalidArgument, "requested page size %d can not be greater than max size %d",
req.PageSize, params.BeaconConfig().MaxPageSize)
}
switch q := req.QueryFilter.(type) {
case *ethpb.ListBlocksRequest_Epoch:
startSlot := q.Epoch * params.BeaconConfig().SlotsPerEpoch
endSlot := startSlot + params.BeaconConfig().SlotsPerEpoch - 1
blks, err := bs.beaconDB.Blocks(ctx, filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot))
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get blocks: %v", err)
}
numBlks := len(blks)
if numBlks == 0 {
return &ethpb.ListBlocksResponse{Blocks: make([]*ethpb.BeaconBlock, 0), TotalSize: 0}, nil
}
start, end, nextPageToken, err := pagination.StartAndEndPage(req.PageToken, int(req.PageSize), numBlks)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not paginate blocks: %v", err)
}
return &ethpb.ListBlocksResponse{
Blocks: blks[start:end],
TotalSize: int32(numBlks),
NextPageToken: nextPageToken,
}, nil
case *ethpb.ListBlocksRequest_Root:
blk, err := bs.beaconDB.Block(ctx, bytesutil.ToBytes32(q.Root))
if err != nil {
return nil, status.Errorf(codes.Internal, "could not retrieve block: %v", err)
}
if blk == nil {
return &ethpb.ListBlocksResponse{Blocks: []*ethpb.BeaconBlock{}, TotalSize: 0}, nil
}
return &ethpb.ListBlocksResponse{
Blocks: []*ethpb.BeaconBlock{blk},
TotalSize: 1,
}, nil
case *ethpb.ListBlocksRequest_Slot:
blks, err := bs.beaconDB.Blocks(ctx, filters.NewFilter().SetStartSlot(q.Slot).SetEndSlot(q.Slot))
if err != nil {
return nil, status.Errorf(codes.Internal, "could not retrieve blocks for slot %d: %v", q.Slot, err)
}
numBlks := len(blks)
if numBlks == 0 {
return &ethpb.ListBlocksResponse{Blocks: []*ethpb.BeaconBlock{}, TotalSize: 0}, nil
}
start, end, nextPageToken, err := pagination.StartAndEndPage(req.PageToken, int(req.PageSize), numBlks)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not paginate blocks: %v", err)
}
return &ethpb.ListBlocksResponse{
Blocks: blks[start:end],
TotalSize: int32(numBlks),
NextPageToken: nextPageToken,
}, nil
}
return nil, status.Errorf(codes.InvalidArgument, "must satisfy one of the filter requirement")
}
// GetChainHead retrieves information about the head of the beacon chain from
// the view of the beacon chain node.
//
// This includes the head block slot and root as well as information about
// the most recent finalized and justified slots.
func (bs *BeaconChainServer) GetChainHead(ctx context.Context, _ *ptypes.Empty) (*ethpb.ChainHead, error) {
finalizedCheckpoint := bs.headFetcher.HeadState().FinalizedCheckpoint
justifiedCheckpoint := bs.headFetcher.HeadState().CurrentJustifiedCheckpoint
prevJustifiedCheckpoint := bs.headFetcher.HeadState().PreviousJustifiedCheckpoint
return &ethpb.ChainHead{
BlockRoot: bs.headFetcher.HeadRoot(),
BlockSlot: bs.headFetcher.HeadSlot(),
FinalizedBlockRoot: finalizedCheckpoint.Root,
FinalizedSlot: finalizedCheckpoint.Epoch * params.BeaconConfig().SlotsPerEpoch,
JustifiedBlockRoot: justifiedCheckpoint.Root,
JustifiedSlot: justifiedCheckpoint.Epoch * params.BeaconConfig().SlotsPerEpoch,
PreviousJustifiedBlockRoot: prevJustifiedCheckpoint.Root,
PreviousJustifiedSlot: prevJustifiedCheckpoint.Epoch * params.BeaconConfig().SlotsPerEpoch,
}, nil
}
// ListValidatorBalances retrieves the validator balances for a given set of public keys.
// An optional Epoch parameter is provided to request historical validator balances from
// archived, persistent data.
func (bs *BeaconChainServer) ListValidatorBalances(
ctx context.Context,
req *ethpb.GetValidatorBalancesRequest) (*ethpb.ValidatorBalances, error) {
res := make([]*ethpb.ValidatorBalances_Balance, 0, len(req.PublicKeys)+len(req.Indices))
filtered := map[uint64]bool{} // track filtered validators to prevent duplication in the response.
var requestingGenesis bool
var epoch uint64
switch q := req.QueryFilter.(type) {
case *ethpb.GetValidatorBalancesRequest_Epoch:
epoch = q.Epoch
case *ethpb.GetValidatorBalancesRequest_Genesis:
requestingGenesis = q.Genesis
default:
}
var balances []uint64
var err error
headState := bs.headFetcher.HeadState()
validators := headState.Validators
if requestingGenesis {
balances, err = bs.beaconDB.ArchivedBalances(ctx, 0 /* genesis epoch */)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "could not retrieve balances for epoch %d", epoch)
}
} else if !requestingGenesis && epoch < helpers.CurrentEpoch(headState) {
balances, err = bs.beaconDB.ArchivedBalances(ctx, epoch)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "could not retrieve balances for epoch %d", epoch)
}
} else {
balances = headState.Balances
}
for _, pubKey := range req.PublicKeys {
// Skip empty public key
if len(pubKey) == 0 {
continue
}
index, ok, err := bs.beaconDB.ValidatorIndex(ctx, bytesutil.ToBytes48(pubKey))
if err != nil {
return nil, status.Errorf(codes.Internal, "could not retrieve validator index: %v", err)
}
if !ok {
return nil, status.Errorf(codes.Internal, "could not find validator index for public key %#x not found", pubKey)
}
filtered[index] = true
if int(index) >= len(balances) {
return nil, status.Errorf(codes.OutOfRange, "validator index %d >= balance list %d",
index, len(balances))
}
res = append(res, &ethpb.ValidatorBalances_Balance{
PublicKey: pubKey,
Index: index,
Balance: balances[index],
})
}
for _, index := range req.Indices {
if int(index) >= len(balances) {
if epoch <= helpers.CurrentEpoch(headState) {
return nil, status.Errorf(codes.OutOfRange, "validator index %d does not exist in historical balances",
index)
}
return nil, status.Errorf(codes.OutOfRange, "validator index %d >= balance list %d",
index, len(balances))
}
if !filtered[index] {
res = append(res, &ethpb.ValidatorBalances_Balance{
PublicKey: validators[index].PublicKey,
Index: index,
Balance: balances[index],
})
}
}
return &ethpb.ValidatorBalances{Balances: res}, nil
}
// GetValidators retrieves the current list of active validators with an optional historical epoch flag to
// to retrieve validator set in time.
func (bs *BeaconChainServer) GetValidators(
ctx context.Context,
req *ethpb.GetValidatorsRequest,
) (*ethpb.Validators, error) {
if int(req.PageSize) > params.BeaconConfig().MaxPageSize {
return nil, status.Errorf(codes.InvalidArgument, "requested page size %d can not be greater than max size %d",
req.PageSize, params.BeaconConfig().MaxPageSize)
}
headState := bs.headFetcher.HeadState()
requestedEpoch := helpers.CurrentEpoch(headState)
switch q := req.QueryFilter.(type) {
case *ethpb.GetValidatorsRequest_Genesis:
if q.Genesis {
requestedEpoch = 0
}
case *ethpb.GetValidatorsRequest_Epoch:
requestedEpoch = q.Epoch
}
finalizedEpoch := bs.finalizationFetcher.FinalizedCheckpt().Epoch
validators := headState.Validators
if requestedEpoch < finalizedEpoch {
stopIdx := len(validators)
for idx, val := range validators {
// The first time we see a validator with an activation epoch > the requested epoch,
// we know this validator is from the future relative to what the request wants.
if val.ActivationEpoch > requestedEpoch {
stopIdx = idx
break
}
}
validators = validators[:stopIdx]
}
validatorCount := len(validators)
start, end, nextPageToken, err := pagination.StartAndEndPage(req.PageToken, int(req.PageSize), validatorCount)
if err != nil {
return nil, err
}
return &ethpb.Validators{
Validators: validators[start:end],
TotalSize: int32(validatorCount),
NextPageToken: nextPageToken,
}, nil
}
// GetValidatorActiveSetChanges retrieves the active set changes for a given epoch.
//
// This data includes any activations, voluntary exits, and involuntary
// ejections.
func (bs *BeaconChainServer) GetValidatorActiveSetChanges(
ctx context.Context, req *ethpb.GetValidatorActiveSetChangesRequest,
) (*ethpb.ActiveSetChanges, error) {
return nil, status.Error(codes.Unimplemented, "not implemented")
}
// GetValidatorQueue retrieves the current validator queue information.
func (bs *BeaconChainServer) GetValidatorQueue(
ctx context.Context, _ *ptypes.Empty,
) (*ethpb.ValidatorQueue, error) {
return nil, status.Error(codes.Unimplemented, "not implemented")
}
// ListValidatorAssignments retrieves the validator assignments for a given epoch,
// optional validator indices or public keys may be included to filter validator assignments.
func (bs *BeaconChainServer) ListValidatorAssignments(
ctx context.Context, req *ethpb.ListValidatorAssignmentsRequest,
) (*ethpb.ValidatorAssignments, error) {
if int(req.PageSize) > params.BeaconConfig().MaxPageSize {
return nil, status.Errorf(codes.InvalidArgument, "requested page size %d can not be greater than max size %d",
req.PageSize, params.BeaconConfig().MaxPageSize)
}
var res []*ethpb.ValidatorAssignments_CommitteeAssignment
headState := bs.headFetcher.HeadState()
filtered := map[uint64]bool{} // track filtered validators to prevent duplication in the response.
filteredIndices := make([]uint64, 0)
requestedEpoch := helpers.CurrentEpoch(headState)
switch q := req.QueryFilter.(type) {
case *ethpb.ListValidatorAssignmentsRequest_Genesis:
if q.Genesis {
requestedEpoch = 0
}
case *ethpb.ListValidatorAssignmentsRequest_Epoch:
requestedEpoch = q.Epoch
}
// Filter out assignments by public keys.
for _, pubKey := range req.PublicKeys {
index, ok, err := bs.beaconDB.ValidatorIndex(ctx, bytesutil.ToBytes48(pubKey))
if err != nil {
return nil, status.Errorf(codes.Internal, "could not retrieve validator index: %v", err)
}
if !ok {
return nil, status.Errorf(codes.NotFound, "could not find validator index for public key %#x not found", pubKey)
}
filtered[index] = true
filteredIndices = append(filteredIndices, index)
}
// Filter out assignments by validator indices.
for _, index := range req.Indices {
if !filtered[index] {
filteredIndices = append(filteredIndices, index)
}
}
activeIndices, err := helpers.ActiveValidatorIndices(headState, requestedEpoch)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not retrieve active validator indices: %v", err)
}
if len(filteredIndices) == 0 {
// If no filter was specified, return assignments from active validator indices with pagination.
filteredIndices = activeIndices
}
start, end, nextPageToken, err := pagination.StartAndEndPage(req.PageToken, int(req.PageSize), len(filteredIndices))
if err != nil {
return nil, err
}
shouldFetchFromArchive := requestedEpoch < bs.finalizationFetcher.FinalizedCheckpt().Epoch
for _, index := range filteredIndices[start:end] {
if int(index) >= len(headState.Validators) {
return nil, status.Errorf(codes.InvalidArgument, "validator index %d >= validator count %d",
index, len(headState.Validators))
}
var committee []uint64
var shard uint64
var slot uint64
var isProposer bool
if shouldFetchFromArchive {
archivedInfo, err := bs.beaconDB.ArchivedCommitteeInfo(ctx, requestedEpoch)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"could not retrieve archived committee info for epoch %d",
requestedEpoch,
)
}
committee, shard, slot, err = bs.archivedValidatorCommittee(requestedEpoch, index, archivedInfo, activeIndices)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not retrieve assignment for validator %d: %v", index, err)
}
isProposer = archivedInfo.ProposerIndex == index
} else {
committee, shard, slot, isProposer, err = helpers.CommitteeAssignment(headState, requestedEpoch, index)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not retrieve assignment for validator %d: %v", index, err)
}
}
res = append(res, &ethpb.ValidatorAssignments_CommitteeAssignment{
CrosslinkCommittees: committee,
Shard: shard,
Slot: slot,
Proposer: isProposer,
PublicKey: headState.Validators[index].PublicKey,
})
}
return &ethpb.ValidatorAssignments{
Epoch: requestedEpoch,
Assignments: res,
NextPageToken: nextPageToken,
TotalSize: int32(len(filteredIndices)),
}, nil
}
// Computes validator assignments for an epoch and validator index using archived committee
// information and a set of active validators.
func (bs *BeaconChainServer) archivedValidatorCommittee(
epoch uint64,
validatorIndex uint64,
archivedInfo *ethpb.ArchivedCommitteeInfo,
activeIndices []uint64,
) ([]uint64, uint64, uint64, error) {
startSlot := helpers.StartSlot(epoch)
committeeCount := archivedInfo.CommitteeCount
committeesPerSlot := committeeCount / params.BeaconConfig().SlotsPerEpoch
epochStartShard := archivedInfo.StartShard
seed := bytesutil.ToBytes32(archivedInfo.Seed)
shardCount := params.BeaconConfig().ShardCount
for slot := startSlot; slot < startSlot+params.BeaconConfig().SlotsPerEpoch; slot++ {
offset := committeesPerSlot * (slot % params.BeaconConfig().SlotsPerEpoch)
slotStartShard := (epochStartShard + offset) % params.BeaconConfig().ShardCount
for i := uint64(0); i < committeesPerSlot; i++ {
shard := (slotStartShard + i) % params.BeaconConfig().ShardCount
currentShard := (shard + shardCount - epochStartShard) % shardCount
committee, err := helpers.ComputeCommittee(activeIndices, seed, currentShard, committeeCount)
if err != nil {
return nil, 0, 0, errors.Wrap(err, "could not compute committee")
}
for _, index := range committee {
if validatorIndex == index {
return committee, shard, slot, nil
}
}
}
}
return nil, 0, 0, fmt.Errorf("could not find committee for validator index %d", validatorIndex)
}
// GetValidatorParticipation retrieves the validator participation information for a given epoch,
// it returns the information about validator's participation rate in voting on the proof of stake
// rules based on their balance compared to the total active validator balance.
func (bs *BeaconChainServer) GetValidatorParticipation(
ctx context.Context, req *ethpb.GetValidatorParticipationRequest,
) (*ethpb.ValidatorParticipationResponse, error) {
headState := bs.headFetcher.HeadState()
currentEpoch := helpers.SlotToEpoch(headState.Slot)
var requestedEpoch uint64
var isGenesis bool
switch q := req.QueryFilter.(type) {
case *ethpb.GetValidatorParticipationRequest_Genesis:
isGenesis = q.Genesis
case *ethpb.GetValidatorParticipationRequest_Epoch:
requestedEpoch = q.Epoch
default:
requestedEpoch = currentEpoch
}
if requestedEpoch > helpers.SlotToEpoch(headState.Slot) {
return nil, status.Errorf(
codes.FailedPrecondition,
"cannot request data from an epoch in the future: req.Epoch %d, currentEpoch %d", requestedEpoch, currentEpoch,
)
}
// If the request is from genesis or another past epoch, we look into our archived
// data to find it and return it if it exists.
if isGenesis {
participation, err := bs.beaconDB.ArchivedValidatorParticipation(ctx, 0)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not fetch archived participation: %v", err)
}
if participation == nil {
return nil, status.Error(codes.NotFound, "could not find archival data for epoch 0")
}
return &ethpb.ValidatorParticipationResponse{
Epoch: 0,
Finalized: true,
Participation: participation,
}, nil
} else if requestedEpoch < helpers.SlotToEpoch(headState.Slot) {
participation, err := bs.beaconDB.ArchivedValidatorParticipation(ctx, requestedEpoch)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not fetch archived participation: %v", err)
}
if participation == nil {
return nil, status.Errorf(codes.NotFound, "could not find archival data for epoch %d", requestedEpoch)
}
finalizedEpoch := bs.finalizationFetcher.FinalizedCheckpt().Epoch
// If the epoch we requested is <= the finalized epoch, we consider it finalized as well.
finalized := requestedEpoch <= finalizedEpoch
return &ethpb.ValidatorParticipationResponse{
Epoch: requestedEpoch,
Finalized: finalized,
Participation: participation,
}, nil
}
// Else if the request is for the current epoch, we compute validator participation
// right away and return the result based on the head state.
participation, err := epoch.ComputeValidatorParticipation(headState)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not compute participation: %v", err)
}
return &ethpb.ValidatorParticipationResponse{
Epoch: currentEpoch,
Finalized: false, // The current epoch can never be finalized.
Participation: participation,
}, nil
}