prysm-pulse/beacon-chain/rpc/beacon_server.go
Preston Van Loon 4add403335
Validator Proposer Rewrite (#1462)
* WIP - with TODOs

* interface w/ test

* basic test for broadcast

* Add computeStateRoot funciton

* remove custody challenge

* resolve TODO lint issues

* more TODOs

* revert new line in types.proto

* broadcaster comment

* one of several failure condition tests

* Add test cases

* handle compute state error test case

* fix config in validator helpers

* fix tests too

* fix conflict

* partial PR feedback

* remove p2p

* gazelle

* package comment

* Better godoc
2019-02-05 08:47:25 -05:00

109 lines
3.7 KiB
Go

package rpc
import (
"context"
"fmt"
"time"
ptypes "github.com/gogo/protobuf/types"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
)
// BeaconServer defines a server implementation of the gRPC Beacon service,
// providing RPC endpoints for obtaining the canonical beacon chain head,
// fetching latest observed attestations, and more.
type BeaconServer struct {
beaconDB *db.BeaconDB
ctx context.Context
powChainService powChainService
operationService operationService
incomingAttestation chan *pbp2p.Attestation
canonicalStateChan chan *pbp2p.BeaconState
chainStartChan chan time.Time
}
// WaitForChainStart queries the logs of the Deposit Contract in order to verify the beacon chain
// has started its runtime and validators begin their responsibilities. If it has not, it then
// subscribes to an event stream triggered by the powchain service whenever the ChainStart log does
// occur in the Deposit Contract on ETH 1.0.
func (bs *BeaconServer) WaitForChainStart(req *ptypes.Empty, stream pb.BeaconService_WaitForChainStartServer) error {
ok, genesisTime, err := bs.powChainService.HasChainStartLogOccurred()
if err != nil {
return fmt.Errorf("could not determine if ChainStart log has occurred: %v", err)
}
if ok {
res := &pb.ChainStartResponse{
Started: true,
GenesisTime: genesisTime,
}
return stream.Send(res)
}
sub := bs.powChainService.ChainStartFeed().Subscribe(bs.chainStartChan)
defer sub.Unsubscribe()
for {
select {
case chainStartTime := <-bs.chainStartChan:
log.Info("Sending ChainStart log and genesis time to connected validator clients")
res := &pb.ChainStartResponse{
Started: true,
GenesisTime: uint64(chainStartTime.Unix()),
}
return stream.Send(res)
case <-sub.Err():
log.Debug("Subscriber closed, exiting goroutine")
return nil
case <-bs.ctx.Done():
log.Debug("RPC context closed, exiting goroutine")
return nil
}
}
}
// CanonicalHead of the current beacon chain. This method is requested on-demand
// by a validator when it is their time to propose or attest.
func (bs *BeaconServer) CanonicalHead(ctx context.Context, req *ptypes.Empty) (*pbp2p.BeaconBlock, error) {
block, err := bs.beaconDB.ChainHead()
if err != nil {
return nil, fmt.Errorf("could not get canonical head block: %v", err)
}
return block, nil
}
// LatestAttestation streams the latest processed attestations to the rpc clients.
func (bs *BeaconServer) LatestAttestation(req *ptypes.Empty, stream pb.BeaconService_LatestAttestationServer) error {
sub := bs.operationService.IncomingAttFeed().Subscribe(bs.incomingAttestation)
defer sub.Unsubscribe()
for {
select {
case attestation := <-bs.incomingAttestation:
log.Info("Sending attestation to RPC clients")
if err := stream.Send(attestation); err != nil {
return err
}
case <-sub.Err():
log.Debug("Subscriber closed, exiting goroutine")
return nil
case <-bs.ctx.Done():
log.Debug("RPC context closed, exiting goroutine")
return nil
}
}
}
// Eth1Data fetches the current ETH 1 data which should be used when voting via
// block proposal.
// TODO(1463): Implement this.
func (bs *BeaconServer) Eth1Data(ctx context.Context, _ *ptypes.Empty) (*pb.Eth1DataResponse, error) {
return &pb.Eth1DataResponse{}, nil
}
// PendingDeposits returns a list of pending deposits that are ready for
// inclusion in the next beacon block.
// TODO(1464): Implement this.
func (bs *BeaconServer) PendingDeposits(ctx context.Context, _ *ptypes.Empty) (*pb.PendingDepositsResponse, error) {
return &pb.PendingDepositsResponse{PendingDeposits: []*pbp2p.Deposit{}}, nil
}