prysm-pulse/beacon-chain/rpc/beacon_server.go
2019-08-01 19:27:38 -07:00

224 lines
7.1 KiB
Go

package rpc
import (
"context"
"fmt"
"time"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/trieutil"
)
// BeaconServer defines a server implementation of the gRPC Beacon service,
// providing RPC endpoints for obtaining the canonical beacon chain head,
// fetching latest observed attestations, and more.
type BeaconServer struct {
beaconDB *db.BeaconDB
ctx context.Context
powChainService powChainService
chainService chainService
targetsFetcher blockchain.TargetsFetcher
operationService operationService
incomingAttestation chan *ethpb.Attestation
canonicalStateChan chan *pbp2p.BeaconState
chainStartChan chan time.Time
}
// WaitForChainStart queries the logs of the Deposit Contract in order to verify the beacon chain
// has started its runtime and validators begin their responsibilities. If it has not, it then
// subscribes to an event stream triggered by the powchain service whenever the ChainStart log does
// occur in the Deposit Contract on ETH 1.0.
func (bs *BeaconServer) WaitForChainStart(req *ptypes.Empty, stream pb.BeaconService_WaitForChainStartServer) error {
ok := bs.powChainService.HasChainStarted()
if ok {
genesisTime := bs.powChainService.ETH2GenesisTime()
res := &pb.ChainStartResponse{
Started: true,
GenesisTime: genesisTime,
}
return stream.Send(res)
}
sub := bs.chainService.StateInitializedFeed().Subscribe(bs.chainStartChan)
defer sub.Unsubscribe()
for {
select {
case chainStartTime := <-bs.chainStartChan:
log.Info("Sending ChainStart log and genesis time to connected validator clients")
res := &pb.ChainStartResponse{
Started: true,
GenesisTime: uint64(chainStartTime.Unix()),
}
return stream.Send(res)
case <-sub.Err():
return errors.New("subscriber closed, exiting goroutine")
case <-bs.ctx.Done():
return errors.New("rpc context closed, exiting goroutine")
}
}
}
// CanonicalHead of the current beacon chain. This method is requested on-demand
// by a validator when it is their time to propose or attest.
func (bs *BeaconServer) CanonicalHead(ctx context.Context, req *ptypes.Empty) (*ethpb.BeaconBlock, error) {
block, err := bs.beaconDB.ChainHead()
if err != nil {
return nil, errors.Wrap(err, "could not get canonical head block")
}
return block, nil
}
// BlockTree returns the current tree of saved blocks and their votes starting from the justified state.
func (bs *BeaconServer) BlockTree(ctx context.Context, _ *ptypes.Empty) (*pb.BlockTreeResponse, error) {
justifiedState, err := bs.beaconDB.JustifiedState()
if err != nil {
return nil, errors.Wrap(err, "could not retrieve justified state")
}
attestationTargets, err := bs.targetsFetcher.AttestationTargets(justifiedState)
if err != nil {
return nil, errors.Wrap(err, "could not retrieve attestation target")
}
justifiedBlock, err := bs.beaconDB.JustifiedBlock()
if err != nil {
return nil, err
}
highestSlot := bs.beaconDB.HighestBlockSlot()
fullBlockTree := []*ethpb.BeaconBlock{}
for i := justifiedBlock.Slot + 1; i < highestSlot; i++ {
if ctx.Err() != nil {
return nil, ctx.Err()
}
nextLayer, err := bs.beaconDB.BlocksBySlot(ctx, i)
if err != nil {
return nil, err
}
fullBlockTree = append(fullBlockTree, nextLayer...)
}
tree := []*pb.BlockTreeResponse_TreeNode{}
for _, kid := range fullBlockTree {
if ctx.Err() != nil {
return nil, ctx.Err()
}
participatedVotes, err := blockchain.VoteCount(kid, justifiedState, attestationTargets, bs.beaconDB)
if err != nil {
return nil, err
}
blockRoot, err := ssz.SigningRoot(kid)
if err != nil {
return nil, err
}
tree = append(tree, &pb.BlockTreeResponse_TreeNode{
BlockRoot: blockRoot[:],
Block: kid,
ParticipatedVotes: uint64(participatedVotes),
})
}
return &pb.BlockTreeResponse{
Tree: tree,
}, nil
}
// BlockTreeBySlots returns the current tree of saved blocks and their
// votes starting from the justified state.
func (bs *BeaconServer) BlockTreeBySlots(ctx context.Context, req *pb.TreeBlockSlotRequest) (*pb.BlockTreeResponse, error) {
justifiedState, err := bs.beaconDB.JustifiedState()
if err != nil {
return nil, errors.Wrap(err, "could not retrieve justified state")
}
attestationTargets, err := bs.targetsFetcher.AttestationTargets(justifiedState)
if err != nil {
return nil, errors.Wrap(err, "could not retrieve attestation target")
}
justifiedBlock, err := bs.beaconDB.JustifiedBlock()
if err != nil {
return nil, err
}
if req == nil {
return nil, errors.New("argument 'TreeBlockSlotRequest' cannot be nil")
}
if !(req.SlotFrom <= req.SlotTo) {
return nil, fmt.Errorf("upper limit (%d) of slot range cannot be lower than the lower limit (%d)", req.SlotTo, req.SlotFrom)
}
highestSlot := bs.beaconDB.HighestBlockSlot()
fullBlockTree := []*ethpb.BeaconBlock{}
for i := justifiedBlock.Slot + 1; i < highestSlot; i++ {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if i >= req.SlotFrom && i <= req.SlotTo {
nextLayer, err := bs.beaconDB.BlocksBySlot(ctx, i)
if err != nil {
return nil, err
}
if nextLayer != nil {
fullBlockTree = append(fullBlockTree, nextLayer...)
}
}
if i > req.SlotTo {
break
}
}
tree := []*pb.BlockTreeResponse_TreeNode{}
for _, kid := range fullBlockTree {
if ctx.Err() != nil {
return nil, ctx.Err()
}
participatedVotes, err := blockchain.VoteCount(kid, justifiedState, attestationTargets, bs.beaconDB)
if err != nil {
return nil, err
}
blockRoot, err := ssz.SigningRoot(kid)
if err != nil {
return nil, err
}
hState, err := bs.beaconDB.HistoricalStateFromSlot(ctx, kid.Slot, blockRoot)
if err != nil {
return nil, err
}
if kid.Slot >= req.SlotFrom && kid.Slot <= req.SlotTo {
activeValidatorIndices, err := helpers.ActiveValidatorIndices(hState, helpers.CurrentEpoch(hState))
if err != nil {
return nil, err
}
totalVotes := helpers.TotalBalance(hState, activeValidatorIndices)
tree = append(tree, &pb.BlockTreeResponse_TreeNode{
BlockRoot: blockRoot[:],
Block: kid,
ParticipatedVotes: uint64(participatedVotes),
TotalVotes: uint64(totalVotes),
})
}
}
return &pb.BlockTreeResponse{
Tree: tree,
}, nil
}
func constructMerkleProof(trie *trieutil.MerkleTrie, index int, deposit *ethpb.Deposit) (*ethpb.Deposit, error) {
proof, err := trie.MerkleProof(index)
if err != nil {
return nil, fmt.Errorf(
"could not generate merkle proof for deposit at index %d: %v",
index,
err,
)
}
// For every deposit, we construct a Merkle proof using the powchain service's
// in-memory deposits trie, which is updated only once the state's LatestETH1Data
// property changes during a state transition after a voting period.
deposit.Proof = proof
return deposit, nil
}