mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-10 19:51:20 +00:00
b7e0819f00
* init - getLocalPayload does not use the proposer ID from the cache but takes it from the block - Fixed tests in blockchain package - Fixed tests in the RPC package - Fixed spectests EpochProposers takes 256 bytes that can be avoided to be copied, but this optimization is not clear to be worth it. assginmentStatus can be optimized to use the cached version from the TrackedValidatorsCache We shouldn't cache the proposer duties when calling getDuties but when we update the epoch boundary instead * track validators on prepare proposers * more rpc tests * more rpc tests * initialize grpc caches * Add back fcu log Also fix two existing bugs wrong parent hash on pre Capella and wrong blockhashes on altair * use beacon default fee recipient if there is none in the vc * fix validator test * radek's review * push always proposer settings even if no flag is specified in the VC * Only register with the builder if the VC flag is set Great find by @terencechain * add regression test * Radek's review * change signature of registration builder
234 lines
9.5 KiB
Go
234 lines
9.5 KiB
Go
package validator
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
|
|
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
|
|
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
|
|
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
|
|
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
|
|
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
|
|
"github.com/prysmaticlabs/prysm/v4/config/params"
|
|
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
|
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
|
ethpbv1 "github.com/prysmaticlabs/prysm/v4/proto/eth/v1"
|
|
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
|
prysmTime "github.com/prysmaticlabs/prysm/v4/time"
|
|
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/protobuf/types/known/emptypb"
|
|
)
|
|
|
|
// GetDuties returns the duties assigned to a list of validators specified
|
|
// in the request object.
|
|
func (vs *Server) GetDuties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) {
|
|
if vs.SyncChecker.Syncing() {
|
|
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
|
|
}
|
|
return vs.duties(ctx, req)
|
|
}
|
|
|
|
// StreamDuties returns the duties assigned to a list of validators specified
|
|
// in the request object via a server-side stream. The stream sends out new assignments in case
|
|
// a chain re-org occurred.
|
|
func (vs *Server) StreamDuties(req *ethpb.DutiesRequest, stream ethpb.BeaconNodeValidator_StreamDutiesServer) error {
|
|
if vs.SyncChecker.Syncing() {
|
|
return status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
|
|
}
|
|
|
|
// If we are post-genesis time, then set the current epoch to
|
|
// the number epochs since the genesis time, otherwise 0 by default.
|
|
genesisTime := vs.TimeFetcher.GenesisTime()
|
|
if genesisTime.IsZero() {
|
|
return status.Error(codes.Unavailable, "genesis time is not set")
|
|
}
|
|
var currentEpoch primitives.Epoch
|
|
if genesisTime.Before(prysmTime.Now()) {
|
|
currentEpoch = slots.EpochsSinceGenesis(vs.TimeFetcher.GenesisTime())
|
|
}
|
|
req.Epoch = currentEpoch
|
|
res, err := vs.duties(stream.Context(), req)
|
|
if err != nil {
|
|
return status.Errorf(codes.Internal, "Could not compute validator duties: %v", err)
|
|
}
|
|
if err := stream.Send(res); err != nil {
|
|
return status.Errorf(codes.Internal, "Could not send response over stream: %v", err)
|
|
}
|
|
|
|
// We start a for loop which ticks on every epoch or a chain reorg.
|
|
stateChannel := make(chan *feed.Event, 1)
|
|
stateSub := vs.StateNotifier.StateFeed().Subscribe(stateChannel)
|
|
defer stateSub.Unsubscribe()
|
|
|
|
secondsPerEpoch := params.BeaconConfig().SecondsPerSlot * uint64(params.BeaconConfig().SlotsPerEpoch)
|
|
epochTicker := slots.NewSlotTicker(vs.TimeFetcher.GenesisTime(), secondsPerEpoch)
|
|
for {
|
|
select {
|
|
// Ticks every epoch to submit assignments to connected validator clients.
|
|
case slot := <-epochTicker.C():
|
|
req.Epoch = primitives.Epoch(slot)
|
|
res, err := vs.duties(stream.Context(), req)
|
|
if err != nil {
|
|
return status.Errorf(codes.Internal, "Could not compute validator duties: %v", err)
|
|
}
|
|
if err := stream.Send(res); err != nil {
|
|
return status.Errorf(codes.Internal, "Could not send response over stream: %v", err)
|
|
}
|
|
case ev := <-stateChannel:
|
|
// If a reorg occurred, we recompute duties for the connected validator clients
|
|
// and send another response over the server stream right away.
|
|
currentEpoch = slots.EpochsSinceGenesis(vs.TimeFetcher.GenesisTime())
|
|
if ev.Type == statefeed.Reorg {
|
|
data, ok := ev.Data.(*ethpbv1.EventChainReorg)
|
|
if !ok {
|
|
return status.Errorf(codes.Internal, "Received incorrect data type over reorg feed: %v", data)
|
|
}
|
|
req.Epoch = currentEpoch
|
|
res, err := vs.duties(stream.Context(), req)
|
|
if err != nil {
|
|
return status.Errorf(codes.Internal, "Could not compute validator duties: %v", err)
|
|
}
|
|
if err := stream.Send(res); err != nil {
|
|
return status.Errorf(codes.Internal, "Could not send response over stream: %v", err)
|
|
}
|
|
}
|
|
case <-stream.Context().Done():
|
|
return status.Error(codes.Canceled, "Stream context canceled")
|
|
case <-vs.Ctx.Done():
|
|
return status.Error(codes.Canceled, "RPC context canceled")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Compute the validator duties from the head state's corresponding epoch
|
|
// for validators public key / indices requested.
|
|
func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) {
|
|
currentEpoch := slots.ToEpoch(vs.TimeFetcher.CurrentSlot())
|
|
if req.Epoch > currentEpoch+1 {
|
|
return nil, status.Errorf(codes.Unavailable, "Request epoch %d can not be greater than next epoch %d", req.Epoch, currentEpoch+1)
|
|
}
|
|
|
|
s, err := vs.HeadFetcher.HeadState(ctx)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not get head state: %v", err)
|
|
}
|
|
|
|
// Advance state with empty transitions up to the requested epoch start slot.
|
|
epochStartSlot, err := slots.EpochStart(req.Epoch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if s.Slot() < epochStartSlot {
|
|
headRoot, err := vs.HeadFetcher.HeadRoot(ctx)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not retrieve head root: %v", err)
|
|
}
|
|
s, err = transition.ProcessSlotsUsingNextSlotCache(ctx, s, headRoot, epochStartSlot)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not process slots up to %d: %v", epochStartSlot, err)
|
|
}
|
|
}
|
|
committeeAssignments, proposerIndexToSlots, err := helpers.CommitteeAssignments(ctx, s, req.Epoch)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not compute committee assignments: %v", err)
|
|
}
|
|
// Query the next epoch assignments for committee subnet subscriptions.
|
|
nextCommitteeAssignments, _, err := helpers.CommitteeAssignments(ctx, s, req.Epoch+1)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not compute next committee assignments: %v", err)
|
|
}
|
|
|
|
validatorAssignments := make([]*ethpb.DutiesResponse_Duty, 0, len(req.PublicKeys))
|
|
nextValidatorAssignments := make([]*ethpb.DutiesResponse_Duty, 0, len(req.PublicKeys))
|
|
|
|
for _, pubKey := range req.PublicKeys {
|
|
if ctx.Err() != nil {
|
|
return nil, status.Errorf(codes.Aborted, "Could not continue fetching assignments: %v", ctx.Err())
|
|
}
|
|
assignment := ðpb.DutiesResponse_Duty{
|
|
PublicKey: pubKey,
|
|
}
|
|
nextAssignment := ðpb.DutiesResponse_Duty{
|
|
PublicKey: pubKey,
|
|
}
|
|
idx, ok := s.ValidatorIndexByPubkey(bytesutil.ToBytes48(pubKey))
|
|
if ok {
|
|
s := assignmentStatus(s, idx)
|
|
|
|
assignment.ValidatorIndex = idx
|
|
assignment.Status = s
|
|
assignment.ProposerSlots = proposerIndexToSlots[idx]
|
|
|
|
// The next epoch has no lookup for proposer indexes.
|
|
nextAssignment.ValidatorIndex = idx
|
|
nextAssignment.Status = s
|
|
|
|
ca, ok := committeeAssignments[idx]
|
|
if ok {
|
|
assignment.Committee = ca.Committee
|
|
assignment.AttesterSlot = ca.AttesterSlot
|
|
assignment.CommitteeIndex = ca.CommitteeIndex
|
|
}
|
|
// Save the next epoch assignments.
|
|
ca, ok = nextCommitteeAssignments[idx]
|
|
if ok {
|
|
nextAssignment.Committee = ca.Committee
|
|
nextAssignment.AttesterSlot = ca.AttesterSlot
|
|
nextAssignment.CommitteeIndex = ca.CommitteeIndex
|
|
}
|
|
} else {
|
|
// If the validator isn't in the beacon state, try finding their deposit to determine their status.
|
|
// We don't need the lastActiveValidatorFn because we don't use the response in this.
|
|
vStatus, _ := vs.validatorStatus(ctx, s, pubKey, nil)
|
|
assignment.Status = vStatus.Status
|
|
}
|
|
|
|
// Are the validators in current or next epoch sync committee.
|
|
if ok && coreTime.HigherEqualThanAltairVersionAndEpoch(s, req.Epoch) {
|
|
assignment.IsSyncCommittee, err = helpers.IsCurrentPeriodSyncCommittee(s, idx)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not determine current epoch sync committee: %v", err)
|
|
}
|
|
if assignment.IsSyncCommittee {
|
|
if err := core.RegisterSyncSubnetCurrentPeriodProto(s, req.Epoch, pubKey, assignment.Status); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
nextAssignment.IsSyncCommittee = assignment.IsSyncCommittee
|
|
|
|
// Next epoch sync committee duty is assigned with next period sync committee only during
|
|
// sync period epoch boundary (ie. EPOCHS_PER_SYNC_COMMITTEE_PERIOD - 1). Else wise
|
|
// next epoch sync committee duty is the same as current epoch.
|
|
nextSlotToEpoch := slots.ToEpoch(s.Slot() + 1)
|
|
currentEpoch := coreTime.CurrentEpoch(s)
|
|
if slots.SyncCommitteePeriod(nextSlotToEpoch) == slots.SyncCommitteePeriod(currentEpoch)+1 {
|
|
nextAssignment.IsSyncCommittee, err = helpers.IsNextPeriodSyncCommittee(s, idx)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "Could not determine next epoch sync committee: %v", err)
|
|
}
|
|
if nextAssignment.IsSyncCommittee {
|
|
if err := core.RegisterSyncSubnetNextPeriodProto(s, req.Epoch, pubKey, nextAssignment.Status); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
validatorAssignments = append(validatorAssignments, assignment)
|
|
nextValidatorAssignments = append(nextValidatorAssignments, nextAssignment)
|
|
}
|
|
return ðpb.DutiesResponse{
|
|
Duties: validatorAssignments,
|
|
CurrentEpochDuties: validatorAssignments,
|
|
NextEpochDuties: nextValidatorAssignments,
|
|
}, nil
|
|
}
|
|
|
|
// AssignValidatorToSubnet checks the status and pubkey of a particular validator
|
|
// to discern whether persistent subnets need to be registered for them.
|
|
func (vs *Server) AssignValidatorToSubnet(_ context.Context, req *ethpb.AssignValidatorToSubnetRequest) (*emptypb.Empty, error) {
|
|
return &emptypb.Empty{}, nil
|
|
}
|