prysm-pulse/validator/client/validator.go
shayzluf 0732012459 Validator-multiple key (#2069)
* first version - broken

* working proto changes

* resolve review remarks

* fix goimport issues

* fix service issues

* first logic version-broken

* first running version - no new tests

* fix validator client test

* add wait group to goroutines

* remove unused var in function call

* fix review remarks and tests

* merge master changes and fix conflicts

* gazzele fix

* fix prestonvanloon requested changes

* merge and some of terenc3t remarks addressed

* _,pk bug fix in log

* fix account file name suffix and filter not active validator out

* merge with master and fix missing parameters

* run over all public keys in hasvalidators

* add test for error when no all the validators has index in the db and hasvalidators is called

* fix runner tests fail due to timing issues

* goimports

* smaller sleep time in proposer tests

* fix UpdateAssignments loging

* fix goimports

* added && false commented TestUpdateAssignments_DoesNothingWhenNotEpochStartAndAlreadyExistingAssignments

* hasvalidators without missing publickeys list

* fix some of prestone review remarks

* fixes for prestone comments

* review changes applied

* expect context call in TestWaitForActivation_ValidatorOriginallyExists

* changed hasvalidators to return true if one validator exists

* fix init problem to getkeys

* hasvalidators requiers all validators to be in db

* validator attest assignments update

* fix ap var name

* Change name to hasallvalidators

* fix tests

* update script, fix any vs all validator calls

* fix wait for activation

* filter validator

* reuse the reply block

* fix imports

* Remove dup

* better lookup of active validators

* better filter active vlaidators, still need to fix committee assignment tests

* lint

* use activated keys

* fix for postchainstart

* fix logging

* move state transitions

* hasanyvalidator and hasallvalidators

* fix tests with updatechainhead missing

* add tests

* fix TestCommitteeAssignment_OK

* fix test

* fix validator tests

* fix TestCommitteeAssignment_multipleKeys_OK and TestWaitForActivation_ValidatorOriginallyExists

* fix goimports

* removed unused param from assignment

* change string(pk) to hex.EncodeString(pk) fix change requests

* add inactive validator status to assignments

* fix logging mess due to multi validator setup

* set no assignment to debug level

* log assignments every epoch

* logging fixes

* fixed runtime by using the right assignments

* correct activation request

* fix the validator panic

* correct assignment

* fix test fail and waitforactivation

* performance log issue fix

* fix goimports

* add log message with truncated pk for attest

* add truncated pk to attest and propose logs

* Add comment to script, change 9 to 8

* Update assignment log

* Add comment, report number of assignments

* Use WithError, add validator as field, merge block proposal log

* Update validator_propose.go

* fix

* use entry.String()

* fix fmt
2019-04-18 12:23:38 -05:00

227 lines
7.6 KiB
Go

// Package client represents the functionality to act as a validator.
package client
import (
"context"
"encoding/hex"
"fmt"
"io"
"time"
ptypes "github.com/gogo/protobuf/types"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/shared/keystore"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
type validator struct {
genesisTime uint64
ticker *slotutil.SlotTicker
assignments *pb.CommitteeAssignmentResponse
proposerClient pb.ProposerServiceClient
validatorClient pb.ValidatorServiceClient
beaconClient pb.BeaconServiceClient
attesterClient pb.AttesterServiceClient
keys map[string]*keystore.Key
pubkeys [][]byte
prevBalance uint64
}
// Done cleans up the validator.
func (v *validator) Done() {
v.ticker.Done()
}
// WaitForChainStart checks whether the beacon node has started its runtime. That is,
// it calls to the beacon node which then verifies the ETH1.0 deposit contract logs to check
// for the ChainStart log to have been emitted. If so, it starts a ticker based on the ChainStart
// unix timestamp which will be used to keep track of time within the validator client.
func (v *validator) WaitForChainStart(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "validator.WaitForChainStart")
defer span.End()
// First, check if the beacon chain has started.
stream, err := v.beaconClient.WaitForChainStart(ctx, &ptypes.Empty{})
if err != nil {
return fmt.Errorf("could not setup beacon chain ChainStart streaming client: %v", err)
}
for {
log.Info("Waiting for beacon chain start log from the ETH 1.0 deposit contract...")
chainStartRes, err := stream.Recv()
// If the stream is closed, we stop the loop.
if err == io.EOF {
break
}
// If context is canceled we stop the loop.
if ctx.Err() == context.Canceled {
return fmt.Errorf("context has been canceled so shutting down the loop: %v", ctx.Err())
}
if err != nil {
return fmt.Errorf("could not receive ChainStart from stream: %v", err)
}
v.genesisTime = chainStartRes.GenesisTime
break
}
// Once the ChainStart log is received, we update the genesis time of the validator client
// and begin a slot ticker used to track the current slot the beacon node is in.
v.ticker = slotutil.GetSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot)
log.Infof("Beacon chain initialized at unix time: %v", time.Unix(int64(v.genesisTime), 0))
return nil
}
// WaitForActivation checks whether the validator pubkey is in the active
// validator set. If not, this operation will block until an activation message is
// received.
func (v *validator) WaitForActivation(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation")
defer span.End()
req := &pb.ValidatorActivationRequest{
PublicKeys: v.pubkeys,
}
stream, err := v.validatorClient.WaitForActivation(ctx, req)
if err != nil {
return fmt.Errorf("could not setup validator WaitForActivation streaming client: %v", err)
}
var validatorActivatedRecords [][]byte
for {
log.Info("Waiting for validator to be activated in the beacon chain")
res, err := stream.Recv()
// If the stream is closed, we stop the loop.
if err == io.EOF {
break
}
// If context is canceled we stop the loop.
if ctx.Err() == context.Canceled {
return fmt.Errorf("context has been canceled so shutting down the loop: %v", ctx.Err())
}
if err != nil {
return fmt.Errorf("could not receive validator activation from stream: %v", err)
}
if len(res.ActivatedPublicKeys) > 0 {
validatorActivatedRecords = res.ActivatedPublicKeys
break
}
}
for _, pk := range validatorActivatedRecords {
log.WithFields(logrus.Fields{
"public key": fmt.Sprintf("%#x", pk),
}).Info("Validator activated")
}
return nil
}
// CanonicalHeadSlot returns the slot of canonical block currently found in the
// beacon chain via RPC.
func (v *validator) CanonicalHeadSlot(ctx context.Context) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "validator.CanonicalHeadSlot")
defer span.End()
head, err := v.beaconClient.CanonicalHead(ctx, &ptypes.Empty{})
if err != nil {
return params.BeaconConfig().GenesisSlot, err
}
return head.Slot, nil
}
// NextSlot emits the next slot number at the start time of that slot.
func (v *validator) NextSlot() <-chan uint64 {
return v.ticker.C()
}
// SlotDeadline is the start time of the next slot.
func (v *validator) SlotDeadline(slot uint64) time.Time {
secs := (slot + 1 - params.BeaconConfig().GenesisSlot) * params.BeaconConfig().SecondsPerSlot
return time.Unix(int64(v.genesisTime), 0 /*ns*/).Add(time.Duration(secs) * time.Second)
}
// UpdateAssignments checks the slot number to determine if the validator's
// list of upcoming assignments needs to be updated. For example, at the
// beginning of a new epoch.
func (v *validator) UpdateAssignments(ctx context.Context, slot uint64) error {
// Testing run time for fetching every slot. This is not meant for production!
// https://github.com/prysmaticlabs/prysm/issues/2167
if slot%params.BeaconConfig().SlotsPerEpoch != 0 && v.assignments != nil && false {
// Do nothing if not epoch start AND assignments already exist.
return nil
}
ctx, span := trace.StartSpan(ctx, "validator.UpdateAssignments")
defer span.End()
req := &pb.CommitteeAssignmentsRequest{
EpochStart: slot,
PublicKeys: v.pubkeys,
}
resp, err := v.validatorClient.CommitteeAssignment(ctx, req)
if err != nil {
v.assignments = nil // Clear assignments so we know to retry the request.
return err
}
v.assignments = resp
// Only log the full assignments output on epoch start to be less verbose.
if slot%params.BeaconConfig().SlotsPerEpoch == 0 {
for _, assignment := range v.assignments.Assignment {
var proposerSlot uint64
var attesterSlot uint64
assignmentKey := hex.EncodeToString(assignment.PublicKey)
assignmentKey = assignmentKey[:12]
lFields := logrus.Fields{
"validator": assignmentKey,
"status": assignment.Status,
}
if assignment.Status != pb.ValidatorStatus_ACTIVE {
log.WithFields(lFields).Info("New assignment")
continue
} else if assignment.IsProposer {
proposerSlot = assignment.Slot
attesterSlot = assignment.Slot
} else {
attesterSlot = assignment.Slot
}
lFields["attesterSlot"] = attesterSlot - params.BeaconConfig().GenesisSlot
lFields["proposerSlot"] = "Not proposing"
lFields["shard"] = assignment.Shard
if assignment.IsProposer {
lFields["proposerSlot"] = proposerSlot - params.BeaconConfig().GenesisSlot
}
log.WithFields(lFields).Info("New assignment")
}
}
log.WithFields(logrus.Fields{
"assignments": len(v.assignments.Assignment),
}).Info("Updated validator assignments")
return nil
}
// RolesAt slot returns the validator roles at the given slot. Returns nil if the
// validator is known to not have a roles at the at slot. Returns UNKNOWN if the
// validator assignments are unknown. Otherwise returns a valid ValidatorRole map.
func (v *validator) RolesAt(slot uint64) map[string]pb.ValidatorRole {
rolesAt := make(map[string]pb.ValidatorRole)
for _, assignment := range v.assignments.Assignment {
var role pb.ValidatorRole
if assignment == nil {
role = pb.ValidatorRole_UNKNOWN
}
if assignment.Slot == slot {
// Note: A proposer also attests to the slot.
if assignment.IsProposer {
role = pb.ValidatorRole_PROPOSER
} else {
role = pb.ValidatorRole_ATTESTER
}
} else {
role = pb.ValidatorRole_UNKNOWN
}
rolesAt[hex.EncodeToString(assignment.PublicKey)] = role
}
return rolesAt
}