package rpc import ( "context" "errors" "fmt" "math/big" "time" ptypes "github.com/gogo/protobuf/types" "github.com/prysmaticlabs/prysm/beacon-chain/db" pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1" "github.com/prysmaticlabs/prysm/shared/params" ) // BeaconServer defines a server implementation of the gRPC Beacon service, // providing RPC endpoints for obtaining the canonical beacon chain head, // fetching latest observed attestations, and more. type BeaconServer struct { beaconDB *db.BeaconDB ctx context.Context powChainService powChainService operationService operationService incomingAttestation chan *pbp2p.Attestation canonicalStateChan chan *pbp2p.BeaconState chainStartChan chan time.Time } // WaitForChainStart queries the logs of the Deposit Contract in order to verify the beacon chain // has started its runtime and validators begin their responsibilities. If it has not, it then // subscribes to an event stream triggered by the powchain service whenever the ChainStart log does // occur in the Deposit Contract on ETH 1.0. func (bs *BeaconServer) WaitForChainStart(req *ptypes.Empty, stream pb.BeaconService_WaitForChainStartServer) error { ok, genesisTime, err := bs.powChainService.HasChainStartLogOccurred() if err != nil { return fmt.Errorf("could not determine if ChainStart log has occurred: %v", err) } if ok { res := &pb.ChainStartResponse{ Started: true, GenesisTime: genesisTime, } return stream.Send(res) } sub := bs.powChainService.ChainStartFeed().Subscribe(bs.chainStartChan) defer sub.Unsubscribe() for { select { case chainStartTime := <-bs.chainStartChan: log.Info("Sending ChainStart log and genesis time to connected validator clients") res := &pb.ChainStartResponse{ Started: true, GenesisTime: uint64(chainStartTime.Unix()), } return stream.Send(res) case <-sub.Err(): log.Debug("Subscriber closed, exiting goroutine") return nil case <-bs.ctx.Done(): log.Debug("RPC context closed, exiting goroutine") return nil } } } // CanonicalHead of the current beacon chain. This method is requested on-demand // by a validator when it is their time to propose or attest. func (bs *BeaconServer) CanonicalHead(ctx context.Context, req *ptypes.Empty) (*pbp2p.BeaconBlock, error) { block, err := bs.beaconDB.ChainHead() if err != nil { return nil, fmt.Errorf("could not get canonical head block: %v", err) } return block, nil } // LatestAttestation streams the latest processed attestations to the rpc clients. func (bs *BeaconServer) LatestAttestation(req *ptypes.Empty, stream pb.BeaconService_LatestAttestationServer) error { sub := bs.operationService.IncomingAttFeed().Subscribe(bs.incomingAttestation) defer sub.Unsubscribe() for { select { case attestation := <-bs.incomingAttestation: log.Info("Sending attestation to RPC clients") if err := stream.Send(attestation); err != nil { return err } case <-sub.Err(): log.Debug("Subscriber closed, exiting goroutine") return nil case <-bs.ctx.Done(): log.Debug("RPC context closed, exiting goroutine") return nil } } } // Eth1Data fetches the current ETH 1 data which should be used when voting via // block proposal. // TODO(1463): Implement this. func (bs *BeaconServer) Eth1Data(ctx context.Context, _ *ptypes.Empty) (*pb.Eth1DataResponse, error) { return &pb.Eth1DataResponse{}, nil } // PendingDeposits returns a list of pending deposits that are ready for // inclusion in the next beacon block. func (bs *BeaconServer) PendingDeposits(ctx context.Context, _ *ptypes.Empty) (*pb.PendingDepositsResponse, error) { bNum := bs.powChainService.LatestBlockNumber() if bNum == nil { return nil, errors.New("latest PoW block number is unknown") } // Only request deposits that have passed the ETH1 follow distance window. bNum = bNum.Sub(bNum, big.NewInt(int64(params.BeaconConfig().Eth1FollowDistance))) return &pb.PendingDepositsResponse{PendingDeposits: bs.beaconDB.PendingDeposits(ctx, bNum)}, nil }