package validator import ( "context" ptypes "github.com/gogo/protobuf/types" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/state" stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" "github.com/prysmaticlabs/prysm/shared/bls" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/params" "go.opencensus.io/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) const msgInvalidAttestationRequest = "Attestation request must be within current or previous epoch" // GetAttestationData requests that the beacon node produce an attestation data object, // which the validator acting as an attester will then sign. func (vs *Server) GetAttestationData(ctx context.Context, req *ethpb.AttestationDataRequest) (*ethpb.AttestationData, error) { ctx, span := trace.StartSpan(ctx, "AttesterServer.RequestAttestation") defer span.End() span.AddAttributes( trace.Int64Attribute("slot", int64(req.Slot)), trace.Int64Attribute("committeeIndex", int64(req.CommitteeIndex)), ) if vs.SyncChecker.Syncing() { return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond") } currentEpoch := helpers.SlotToEpoch(vs.GenesisTimeFetcher.CurrentSlot()) if currentEpoch > 0 && currentEpoch-1 != helpers.SlotToEpoch(req.Slot) && currentEpoch != helpers.SlotToEpoch(req.Slot) { return nil, status.Error(codes.InvalidArgument, msgInvalidAttestationRequest) } res, err := vs.AttestationCache.Get(ctx, req) if err != nil { return nil, status.Errorf(codes.Internal, "Could not retrieve data from attestation cache: %v", err) } if res != nil { return res, nil } if err := vs.AttestationCache.MarkInProgress(req); err != nil { if err == cache.ErrAlreadyInProgress { res, err := vs.AttestationCache.Get(ctx, req) if err != nil { return nil, status.Errorf(codes.Internal, "Could not retrieve data from attestation cache: %v", err) } if res == nil { return nil, status.Error(codes.DataLoss, "A request was in progress and resolved to nil") } return res, nil } return nil, status.Errorf(codes.Internal, "Could not mark attestation as in-progress: %v", err) } defer func() { if err := vs.AttestationCache.MarkNotInProgress(req); err != nil { log.WithError(err).Error("Failed to mark cache not in progress") } }() headState, err := vs.HeadFetcher.HeadState(ctx) if err != nil { return nil, status.Errorf(codes.Internal, "Could not retrieve head state: %v", err) } headRoot, err := vs.HeadFetcher.HeadRoot(ctx) if err != nil { return nil, status.Errorf(codes.Internal, "Could not retrieve head root: %v", err) } // In the case that we receive an attestation request after a newer state/block has been // processed, we walk up the chain until state.Slot <= req.Slot to prevent producing an // attestation that violates processing constraints. fetchState := vs.BeaconDB.State if !featureconfig.Get().DisableNewStateMgmt { fetchState = vs.StateGen.StateByRoot } for headState.Slot() > req.Slot { if ctx.Err() != nil { return nil, status.Errorf(codes.Aborted, ctx.Err().Error()) } parent := headState.ParentRoot() headRoot = parent[:] headState, err = fetchState(ctx, parent) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } if headState == nil { return nil, status.Error(codes.Internal, "Failed to lookup parent state from head.") } } if helpers.CurrentEpoch(headState) < helpers.SlotToEpoch(req.Slot) { headState, err = state.ProcessSlots(ctx, headState, helpers.StartSlot(helpers.SlotToEpoch(req.Slot))) if err != nil { return nil, status.Errorf(codes.Internal, "Could not process slots up to %d: %v", req.Slot, err) } } targetEpoch := helpers.CurrentEpoch(headState) epochStartSlot := helpers.StartSlot(targetEpoch) targetRoot := make([]byte, 32) if epochStartSlot == headState.Slot() { targetRoot = headRoot[:] } else { targetRoot, err = helpers.BlockRootAtSlot(headState, epochStartSlot) if err != nil { return nil, status.Errorf(codes.Internal, "Could not get target block for slot %d: %v", epochStartSlot, err) } if bytesutil.ToBytes32(targetRoot) == params.BeaconConfig().ZeroHash { targetRoot = headRoot } } res = ðpb.AttestationData{ Slot: req.Slot, CommitteeIndex: req.CommitteeIndex, BeaconBlockRoot: headRoot[:], Source: headState.CurrentJustifiedCheckpoint(), Target: ðpb.Checkpoint{ Epoch: targetEpoch, Root: targetRoot, }, } if err := vs.AttestationCache.Put(ctx, req, res); err != nil { return nil, status.Errorf(codes.Internal, "Could not store attestation data in cache: %v", err) } return res, nil } // ProposeAttestation is a function called by an attester to vote // on a block via an attestation object as defined in the Ethereum Serenity specification. func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation) (*ethpb.AttestResponse, error) { if _, err := bls.SignatureFromBytes(att.Signature); err != nil { return nil, status.Error(codes.InvalidArgument, "Incorrect attestation signature") } root, err := ssz.HashTreeRoot(att.Data) if err != nil { return nil, status.Errorf(codes.Internal, "Could not tree hash attestation: %v", err) } // Broadcast the unaggregated attestation on a feed to notify other services in the beacon node // of a received unaggregated attestation. vs.OperationNotifier.OperationFeed().Send(&feed.Event{ Type: operation.UnaggregatedAttReceived, Data: &operation.UnAggregatedAttReceivedData{ Attestation: att, }, }) // Broadcast the new attestation to the network. if err := vs.P2P.Broadcast(ctx, att); err != nil { return nil, status.Errorf(codes.Internal, "Could not broadcast attestation: %v", err) } go func() { ctx = trace.NewContext(context.Background(), trace.FromContext(ctx)) attCopy := stateTrie.CopyAttestation(att) if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil { log.WithError(err).Error("Could not handle attestation in operations service") return } }() return ðpb.AttestResponse{ AttestationDataRoot: root[:], }, nil } // SubscribeCommitteeSubnets subscribes to the committee ID subnet given subscribe request. func (vs *Server) SubscribeCommitteeSubnets(ctx context.Context, req *ethpb.CommitteeSubnetsSubscribeRequest) (*ptypes.Empty, error) { if len(req.Slots) != len(req.CommitteeIds) && len(req.CommitteeIds) != len(req.IsAggregator) { return nil, status.Error(codes.InvalidArgument, "request fields are not the same length") } for i := 0; i < len(req.Slots); i++ { cache.CommitteeIDs.AddAttesterCommiteeID(req.Slots[i], req.CommitteeIds[i]) if req.IsAggregator[i] { cache.CommitteeIDs.AddAggregatorCommiteeID(req.Slots[i], req.CommitteeIds[i]) } } return &ptypes.Empty{}, nil }