package validator import ( "context" "errors" "fmt" "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/crypto/bls" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/time/slots" "go.opencensus.io/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" ) // GetAttestationData requests that the beacon node produce an attestation data object, // which the validator acting as an attester will then sign. func (vs *Server) GetAttestationData(ctx context.Context, req *ethpb.AttestationDataRequest) (*ethpb.AttestationData, error) { ctx, span := trace.StartSpan(ctx, "AttesterServer.RequestAttestation") defer span.End() span.AddAttributes( trace.Int64Attribute("slot", int64(req.Slot)), trace.Int64Attribute("committeeIndex", int64(req.CommitteeIndex)), ) if vs.SyncChecker.Syncing() { return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond") } // An optimistic validator MUST NOT participate in attestation. (i.e., sign across the DOMAIN_BEACON_ATTESTER, DOMAIN_SELECTION_PROOF or DOMAIN_AGGREGATE_AND_PROOF domains). if err := vs.optimisticStatus(ctx); err != nil { return nil, err } if err := helpers.ValidateAttestationTime(req.Slot, vs.TimeFetcher.GenesisTime(), params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil { return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("invalid request: %v", err)) } res, err := vs.AttestationCache.Get(ctx, req) if err != nil { return nil, status.Errorf(codes.Internal, "Could not retrieve data from attestation cache: %v", err) } if res != nil { res.CommitteeIndex = req.CommitteeIndex return res, nil } if err := vs.AttestationCache.MarkInProgress(req); err != nil { if errors.Is(err, cache.ErrAlreadyInProgress) { res, err := vs.AttestationCache.Get(ctx, req) if err != nil { return nil, status.Errorf(codes.Internal, "Could not retrieve data from attestation cache: %v", err) } if res == nil { return nil, status.Error(codes.DataLoss, "A request was in progress and resolved to nil") } res.CommitteeIndex = req.CommitteeIndex return res, nil } return nil, status.Errorf(codes.Internal, "Could not mark attestation as in-progress: %v", err) } defer func() { if err := vs.AttestationCache.MarkNotInProgress(req); err != nil { log.WithError(err).Error("Could not mark cache not in progress") } }() headState, err := vs.HeadFetcher.HeadState(ctx) if err != nil { return nil, status.Errorf(codes.Internal, "Could not retrieve head state: %v", err) } headRoot, err := vs.HeadFetcher.HeadRoot(ctx) if err != nil { return nil, status.Errorf(codes.Internal, "Could not retrieve head root: %v", err) } // In the case that we receive an attestation request after a newer state/block has been processed. if headState.Slot() > req.Slot { headRoot, err = helpers.BlockRootAtSlot(headState, req.Slot) if err != nil { return nil, status.Errorf(codes.Internal, "Could not get historical head root: %v", err) } headState, err = vs.StateGen.StateByRoot(ctx, bytesutil.ToBytes32(headRoot)) if err != nil { return nil, status.Errorf(codes.Internal, "Could not get historical head state: %v", err) } } if headState == nil || headState.IsNil() { return nil, status.Error(codes.Internal, "Could not lookup parent state from head.") } if time.CurrentEpoch(headState) < slots.ToEpoch(req.Slot) { headState, err = transition.ProcessSlotsUsingNextSlotCache(ctx, headState, headRoot, req.Slot) if err != nil { return nil, status.Errorf(codes.Internal, "Could not process slots up to %d: %v", req.Slot, err) } } targetEpoch := time.CurrentEpoch(headState) epochStartSlot, err := slots.EpochStart(targetEpoch) if err != nil { return nil, err } var targetRoot []byte if epochStartSlot == headState.Slot() { targetRoot = headRoot } else { targetRoot, err = helpers.BlockRootAtSlot(headState, epochStartSlot) if err != nil { return nil, status.Errorf(codes.Internal, "Could not get target block for slot %d: %v", epochStartSlot, err) } if bytesutil.ToBytes32(targetRoot) == params.BeaconConfig().ZeroHash { targetRoot = headRoot } } res = ðpb.AttestationData{ Slot: req.Slot, CommitteeIndex: req.CommitteeIndex, BeaconBlockRoot: headRoot, Source: headState.CurrentJustifiedCheckpoint(), Target: ðpb.Checkpoint{ Epoch: targetEpoch, Root: targetRoot, }, } if err := vs.AttestationCache.Put(ctx, req, res); err != nil { return nil, status.Errorf(codes.Internal, "Could not store attestation data in cache: %v", err) } return res, nil } // ProposeAttestation is a function called by an attester to vote // on a block via an attestation object as defined in the Ethereum Serenity specification. func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation) (*ethpb.AttestResponse, error) { ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestation") defer span.End() if _, err := bls.SignatureFromBytes(att.Signature); err != nil { return nil, status.Error(codes.InvalidArgument, "Incorrect attestation signature") } root, err := att.Data.HashTreeRoot() if err != nil { return nil, status.Errorf(codes.Internal, "Could not tree hash attestation: %v", err) } // Broadcast the unaggregated attestation on a feed to notify other services in the beacon node // of a received unaggregated attestation. vs.OperationNotifier.OperationFeed().Send(&feed.Event{ Type: operation.UnaggregatedAttReceived, Data: &operation.UnAggregatedAttReceivedData{ Attestation: att, }, }) // Determine subnet to broadcast attestation to wantedEpoch := slots.ToEpoch(att.Data.Slot) vals, err := vs.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch) if err != nil { return nil, err } subnet := helpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot) // Broadcast the new attestation to the network. if err := vs.P2P.BroadcastAttestation(ctx, subnet, att); err != nil { return nil, status.Errorf(codes.Internal, "Could not broadcast attestation: %v", err) } go func() { ctx = trace.NewContext(context.Background(), trace.FromContext(ctx)) attCopy := ethpb.CopyAttestation(att) if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil { log.WithError(err).Error("Could not handle attestation in operations service") return } }() return ðpb.AttestResponse{ AttestationDataRoot: root[:], }, nil } // SubscribeCommitteeSubnets subscribes to the committee ID subnet given subscribe request. func (vs *Server) SubscribeCommitteeSubnets(ctx context.Context, req *ethpb.CommitteeSubnetsSubscribeRequest) (*emptypb.Empty, error) { ctx, span := trace.StartSpan(ctx, "AttesterServer.SubscribeCommitteeSubnets") defer span.End() if len(req.Slots) != len(req.CommitteeIds) || len(req.CommitteeIds) != len(req.IsAggregator) { return nil, status.Error(codes.InvalidArgument, "request fields are not the same length") } if len(req.Slots) == 0 { return nil, status.Error(codes.InvalidArgument, "no attester slots provided") } fetchValsLen := func(slot primitives.Slot) (uint64, error) { wantedEpoch := slots.ToEpoch(slot) vals, err := vs.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch) if err != nil { return 0, err } return uint64(len(vals)), nil } // Request the head validator indices of epoch represented by the first requested // slot. currValsLen, err := fetchValsLen(req.Slots[0]) if err != nil { return nil, status.Errorf(codes.Internal, "Could not retrieve head validator length: %v", err) } currEpoch := slots.ToEpoch(req.Slots[0]) for i := 0; i < len(req.Slots); i++ { // If epoch has changed, re-request active validators length if currEpoch != slots.ToEpoch(req.Slots[i]) { currValsLen, err = fetchValsLen(req.Slots[i]) if err != nil { return nil, status.Errorf(codes.Internal, "Could not retrieve head validator length: %v", err) } currEpoch = slots.ToEpoch(req.Slots[i]) } subnet := helpers.ComputeSubnetFromCommitteeAndSlot(currValsLen, req.CommitteeIds[i], req.Slots[i]) cache.SubnetIDs.AddAttesterSubnetID(req.Slots[i], subnet) if req.IsAggregator[i] { cache.SubnetIDs.AddAggregatorSubnetID(req.Slots[i], subnet) } } return &emptypb.Empty{}, nil }