mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-27 05:38:55 +00:00
85653335f1
* Standardize params for BeaconChain and Validator projects * gofmt * various changes to bring up to standards * lint * linter 2, not sure why travis complains * revert service_test.go
300 lines
9.6 KiB
Go
300 lines
9.6 KiB
Go
package beacon
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/ptypes"
|
|
"github.com/golang/protobuf/ptypes/empty"
|
|
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
|
|
"github.com/prysmaticlabs/prysm/shared/event"
|
|
"github.com/prysmaticlabs/prysm/shared/params"
|
|
"github.com/prysmaticlabs/prysm/shared/slotticker"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
var log = logrus.WithField("prefix", "beacon")
|
|
|
|
type rpcClientService interface {
|
|
BeaconServiceClient() pb.BeaconServiceClient
|
|
}
|
|
|
|
// Service that interacts with a beacon node via RPC.
|
|
type Service struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
pubKey []byte
|
|
rpcClient rpcClientService
|
|
assignedSlot uint64
|
|
shardID uint64
|
|
role pb.ValidatorRole
|
|
attesterAssignmentFeed *event.Feed
|
|
proposerAssignmentFeed *event.Feed
|
|
processedAttestationFeed *event.Feed
|
|
genesisTimestamp time.Time
|
|
}
|
|
|
|
// NewBeaconValidator instantiates a service that interacts with a beacon node
|
|
// via gRPC requests.
|
|
func NewBeaconValidator(ctx context.Context, pubKey []byte, rpcClient rpcClientService) *Service {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
return &Service{
|
|
ctx: ctx,
|
|
pubKey: pubKey,
|
|
cancel: cancel,
|
|
rpcClient: rpcClient,
|
|
attesterAssignmentFeed: new(event.Feed),
|
|
proposerAssignmentFeed: new(event.Feed),
|
|
processedAttestationFeed: new(event.Feed),
|
|
}
|
|
}
|
|
|
|
// Start the main routine for a beacon client service.
|
|
func (s *Service) Start() {
|
|
log.Info("Starting service")
|
|
beaconServiceClient := s.rpcClient.BeaconServiceClient()
|
|
|
|
// First thing the validator does is request the current validator assignments
|
|
// for the current beacon node cycle as well as the genesis timestamp
|
|
// from the beacon node. From here, a validator can keep an internal
|
|
// ticker that starts at the current slot the beacon node is in. This current slot
|
|
// value is determined by taking the time differential between the genesis block
|
|
// time and the current system time.
|
|
//
|
|
// Note: this does not validate the current system time against a global
|
|
// NTP server, which will be important to do in production.
|
|
// currently in a cycle we are supposed to participate in.
|
|
if err := s.fetchCurrentAssignmentsAndGenesisTime(beaconServiceClient); err != nil {
|
|
log.Error(err)
|
|
return
|
|
}
|
|
|
|
// We kick off a routine that listens for stream of validator assignment coming from
|
|
// beacon node. This will update validator client on which slot, shard ID and what
|
|
// responsbility to perform.
|
|
go s.listenForAssignmentChange(beaconServiceClient)
|
|
|
|
slotTicker := slotticker.GetSlotTicker(s.genesisTimestamp, params.DemoValidatorConfig().SlotDuration)
|
|
go func() {
|
|
s.waitForAssignment(slotTicker.C(), beaconServiceClient)
|
|
slotTicker.Done()
|
|
}()
|
|
|
|
go s.listenForProcessedAttestations(beaconServiceClient)
|
|
}
|
|
|
|
// Stop the main loop.
|
|
func (s *Service) Stop() error {
|
|
defer s.cancel()
|
|
log.Info("Stopping service")
|
|
return nil
|
|
}
|
|
|
|
// fetchCurrentAssignmentsAndGenesisTime fetches both the genesis timestamp as well
|
|
// as the current assignments for the current cycle in the beacon node. This allows
|
|
// the validator to do the following:
|
|
//
|
|
// (1) determine if it should act as an attester/proposer, at what slot,
|
|
// and what shard
|
|
//
|
|
// (2) determine the seconds since genesis by using the latest crystallized
|
|
// state recalc, then determine how many seconds have passed between that time
|
|
// and the current system time.
|
|
//
|
|
// From this, the validator client can deduce what slot interval the beacon
|
|
// node is in and determine when exactly it is time to propose or attest.
|
|
func (s *Service) fetchCurrentAssignmentsAndGenesisTime(client pb.BeaconServiceClient) error {
|
|
// Currently fetches assignments for all validators.
|
|
req := &pb.ValidatorAssignmentRequest{
|
|
AllValidators: true,
|
|
}
|
|
res, err := client.CurrentAssignmentsAndGenesisTime(s.ctx, req)
|
|
if err != nil {
|
|
// If this RPC request fails, the entire system should fatal as it is critical for
|
|
// the validator to begin this way.
|
|
return fmt.Errorf("could not fetch genesis time and latest canonical state from beacon node: %v", err)
|
|
}
|
|
|
|
// Determine what slot the beacon node is in by checking the number of seconds
|
|
// since the genesis block.
|
|
genesisTimestamp, err := ptypes.Timestamp(res.GetGenesisTimestamp())
|
|
if err != nil {
|
|
return fmt.Errorf("could not compute genesis timestamp: %v", err)
|
|
}
|
|
|
|
s.genesisTimestamp = genesisTimestamp
|
|
|
|
startSlot := s.startSlot()
|
|
if err := s.assignRole(res.Assignments, startSlot); err != nil {
|
|
return fmt.Errorf("unable to assign a role: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// listenForAssignmentChange listens for validator assignment changes via a RPC stream.
|
|
// when there's an assignment change, beacon service will update its shard ID, slot number and role.
|
|
func (s *Service) listenForAssignmentChange(client pb.BeaconServiceClient) {
|
|
req := &pb.ValidatorAssignmentRequest{PublicKeys: []*pb.PublicKey{{PublicKey: s.pubKey}}}
|
|
stream, err := client.ValidatorAssignments(s.ctx, req)
|
|
if err != nil {
|
|
log.Errorf("failed to fetch validator assignments stream: %v", err)
|
|
return
|
|
}
|
|
for {
|
|
assignment, err := stream.Recv()
|
|
// If the stream is closed, we stop the loop.
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
// If context is canceled we stop the loop.
|
|
if s.ctx.Err() != nil {
|
|
log.Debugf("Context has been canceled so shutting down the loop: %v", s.ctx.Err())
|
|
break
|
|
}
|
|
|
|
if err != nil {
|
|
log.Errorf("Could not receive latest validator assignment from stream: %v", err)
|
|
break
|
|
}
|
|
|
|
startSlot := s.startSlot()
|
|
if err := s.assignRole(assignment.Assignments, startSlot); err != nil {
|
|
log.Errorf("Could not assign a role for validator: %v", err)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// waitForAssignment waits till it's validator's role to attest or propose. Then it forwards
|
|
// the canonical block to the proposer service or attester service to process.
|
|
func (s *Service) waitForAssignment(ticker <-chan uint64, client pb.BeaconServiceClient) {
|
|
for {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return
|
|
|
|
case slot := <-ticker:
|
|
log = log.WithField("slot", slot)
|
|
log.Infof("tick")
|
|
|
|
// Special case: skip responsibilities if assigned to the genesis block.
|
|
if s.assignedSlot != slot || s.assignedSlot == 0 {
|
|
continue
|
|
}
|
|
|
|
block, err := client.CanonicalHead(s.ctx, &empty.Empty{})
|
|
if err != nil {
|
|
log.Errorf("Could not fetch canonical head via gRPC from beacon node: %v", err)
|
|
continue
|
|
}
|
|
|
|
if s.role == pb.ValidatorRole_ATTESTER {
|
|
log.Info("Assigned attestation slot number reached")
|
|
// We forward the latest canonical block to the attester service a feed.
|
|
s.attesterAssignmentFeed.Send(block)
|
|
} else if s.role == pb.ValidatorRole_PROPOSER {
|
|
log.Info("Assigned proposal slot number reached")
|
|
// We forward the latest canonical block to the proposer service a feed.
|
|
s.proposerAssignmentFeed.Send(block)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// listenForProcessedAttestations receives processed attestations from the
|
|
// the beacon node's RPC server via gRPC streams.
|
|
func (s *Service) listenForProcessedAttestations(client pb.BeaconServiceClient) {
|
|
stream, err := client.LatestAttestation(s.ctx, &empty.Empty{})
|
|
if err != nil {
|
|
log.Errorf("Could not setup beacon chain attestation streaming client: %v", err)
|
|
return
|
|
}
|
|
for {
|
|
attestation, err := stream.Recv()
|
|
|
|
// If the stream is closed, we stop the loop.
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
// If context is canceled we stop the loop.
|
|
if s.ctx.Err() != nil {
|
|
log.Debugf("Context has been canceled so shutting down the loop: %v", s.ctx.Err())
|
|
break
|
|
}
|
|
if err != nil {
|
|
log.Errorf("Could not receive latest attestation from stream: %v", err)
|
|
continue
|
|
}
|
|
|
|
log.WithField("slotNumber", attestation.GetSlot()).Info("Latest attestation slot number")
|
|
s.processedAttestationFeed.Send(attestation)
|
|
}
|
|
}
|
|
|
|
// startSlot returns the first slot of the given slot's cycle.
|
|
func (s *Service) startSlot() uint64 {
|
|
duration := params.DemoValidatorConfig().SlotDuration
|
|
cycleLength := params.DemoValidatorConfig().CycleLength
|
|
slot := slotticker.CurrentSlot(s.genesisTimestamp, duration, time.Since)
|
|
return slot - slot%cycleLength
|
|
}
|
|
|
|
func (s *Service) assignRole(assignments []*pb.Assignment, startSlot uint64) error {
|
|
var role pb.ValidatorRole
|
|
var assignedSlot uint64
|
|
var shardID uint64
|
|
for _, assign := range assignments {
|
|
if !bytes.Equal(assign.PublicKey.PublicKey, s.pubKey) {
|
|
continue
|
|
}
|
|
|
|
role = assign.Role
|
|
assignedSlot = startSlot + assign.AssignedSlot
|
|
shardID = assign.ShardId
|
|
|
|
log.Infof("Validator shuffled. Pub key %#x assigned to shard ID %d for %v duty at slot %d",
|
|
s.pubKey,
|
|
shardID,
|
|
role,
|
|
assignedSlot)
|
|
|
|
break
|
|
}
|
|
|
|
if role == pb.ValidatorRole_UNKNOWN {
|
|
return fmt.Errorf("validator role was not assigned for key: %x", s.pubKey)
|
|
}
|
|
|
|
s.role = role
|
|
s.assignedSlot = assignedSlot
|
|
s.shardID = shardID
|
|
|
|
log = log.WithFields(logrus.Fields{
|
|
"role": role,
|
|
"assignedSlot": assignedSlot,
|
|
"shardID": shardID,
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// AttesterAssignmentFeed returns a feed that is written to whenever it is the validator's
|
|
// slot to perform attestations.
|
|
func (s *Service) AttesterAssignmentFeed() *event.Feed {
|
|
return s.attesterAssignmentFeed
|
|
}
|
|
|
|
// ProposerAssignmentFeed returns a feed that is written to whenever it is the validator's
|
|
// slot to proposer blocks.
|
|
func (s *Service) ProposerAssignmentFeed() *event.Feed {
|
|
return s.proposerAssignmentFeed
|
|
}
|
|
|
|
// ProcessedAttestationFeed returns a feed that is written to whenever an attestation
|
|
// is processed by a beacon node.
|
|
func (s *Service) ProcessedAttestationFeed() *event.Feed {
|
|
return s.processedAttestationFeed
|
|
}
|