Validator Client WaitForActivation Implementation (#1898)

* create wait for activation stream

* wait for activation server stream open

* complete server side logic

* formatting

* first test passing

* context closed test

* lint

* fix build failure

* imports

* implemented wait for activation on client side

* all tests pass for validator client

* weird spacing
This commit is contained in:
Raul Jordan 2019-03-05 18:09:41 -06:00 committed by GitHub
parent b6ad702f3f
commit 44edbd4fbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 153 additions and 9 deletions

View File

@ -35,8 +35,9 @@ func (fv *fakeValidator) WaitForChainStart(_ context.Context) error {
return nil
}
func (fv *fakeValidator) WaitForActivation(_ context.Context) {
func (fv *fakeValidator) WaitForActivation(_ context.Context) error {
fv.WaitForActivationCalled = true
return nil
}
func (fv *fakeValidator) NextSlot() <-chan uint64 {

View File

@ -13,7 +13,7 @@ import (
type Validator interface {
Done()
WaitForChainStart(ctx context.Context) error
WaitForActivation(ctx context.Context)
WaitForActivation(ctx context.Context) error
NextSlot() <-chan uint64
UpdateAssignments(ctx context.Context, slot uint64) error
RoleAt(slot uint64) pb.ValidatorRole
@ -36,7 +36,9 @@ func run(ctx context.Context, v Validator) {
if err := v.WaitForChainStart(ctx); err != nil {
log.Fatalf("Could not determine if beacon chain started: %v", err)
}
v.WaitForActivation(ctx)
if err := v.WaitForActivation(ctx); err != nil {
log.Fatalf("Could not wait for validator activation: %v", err)
}
if err := v.UpdateAssignments(ctx, params.BeaconConfig().GenesisSlot); err != nil {
log.WithField("error", err).Error("Failed to update assignments")
}

View File

@ -81,14 +81,38 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
// WaitForActivation checks whether the validator pubkey is in the active
// validator set. If not, this operation will block until an activation message is
// received.
//
// WIP - not done.
func (v *validator) WaitForActivation(ctx context.Context) {
func (v *validator) WaitForActivation(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation")
defer span.End()
// First, check if the validator has deposited into the Deposit Contract.
// If the validator has deposited, subscribe to a stream receiving the activation status.
// of the validator until a final ACTIVATED check if received, then this function can return.
req := &pb.ValidatorActivationRequest{
Pubkey: v.key.PublicKey.Marshal(),
}
stream, err := v.validatorClient.WaitForActivation(ctx, req)
if err != nil {
return fmt.Errorf("could not setup validator WaitForActivation streaming client: %v", err)
}
var validatorActivatedRecord *pbp2p.Validator
for {
log.Info("Waiting for validator to be activated in the beacon chain")
res, err := stream.Recv()
// If the stream is closed, we stop the loop.
if err == io.EOF {
break
}
// If context is canceled we stop the loop.
if ctx.Err() == context.Canceled {
return fmt.Errorf("context has been canceled so shutting down the loop: %v", ctx.Err())
}
if err != nil {
return fmt.Errorf("could not receive validator activation from stream: %v", err)
}
validatorActivatedRecord = res.Validator
break
}
log.WithFields(logrus.Fields{
"activationEpoch": validatorActivatedRecord.ActivationEpoch - params.BeaconConfig().GenesisEpoch,
}).Info("Validator activated")
return nil
}
// NextSlot emits the next slot number at the start time of that slot.

View File

@ -10,10 +10,13 @@ import (
ptypes "github.com/gogo/protobuf/types"
"github.com/golang/mock/gomock"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/validator/internal"
"github.com/sirupsen/logrus"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func init() {
@ -133,6 +136,120 @@ func TestWaitForChainStart_ReceiveErrorFromStream(t *testing.T) {
}
}
func TestWaitActivation_ContextCanceled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := internal.NewMockValidatorServiceClient(ctrl)
v := validator{
key: validatorKey,
validatorClient: client,
}
clientStream := internal.NewMockValidatorService_WaitForActivationClient(ctrl)
client.EXPECT().WaitForActivation(
gomock.Any(),
&pb.ValidatorActivationRequest{
Pubkey: v.key.PublicKey.Marshal(),
},
).Return(clientStream, nil)
clientStream.EXPECT().Recv().Return(
&pb.ValidatorActivationResponse{
Validator: &pbp2p.Validator{
ActivationEpoch: params.BeaconConfig().GenesisEpoch,
},
},
nil,
)
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := v.WaitForActivation(ctx)
want := "context has been canceled"
if !strings.Contains(err.Error(), want) {
t.Errorf("Expected %v, received %v", want, err)
}
}
func TestWaitActivation_StreamSetupFails(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := internal.NewMockValidatorServiceClient(ctrl)
v := validator{
key: validatorKey,
validatorClient: client,
}
clientStream := internal.NewMockValidatorService_WaitForActivationClient(ctrl)
client.EXPECT().WaitForActivation(
gomock.Any(),
&pb.ValidatorActivationRequest{
Pubkey: v.key.PublicKey.Marshal(),
},
).Return(clientStream, errors.New("failed stream"))
err := v.WaitForActivation(context.Background())
want := "could not setup validator WaitForActivation streaming client"
if !strings.Contains(err.Error(), want) {
t.Errorf("Expected %v, received %v", want, err)
}
}
func TestWaitActivation_ReceiveErrorFromStream(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := internal.NewMockValidatorServiceClient(ctrl)
v := validator{
key: validatorKey,
validatorClient: client,
}
clientStream := internal.NewMockValidatorService_WaitForActivationClient(ctrl)
client.EXPECT().WaitForActivation(
gomock.Any(),
&pb.ValidatorActivationRequest{
Pubkey: v.key.PublicKey.Marshal(),
},
).Return(clientStream, nil)
clientStream.EXPECT().Recv().Return(
nil,
errors.New("fails"),
)
err := v.WaitForActivation(context.Background())
want := "could not receive validator activation from stream"
if !strings.Contains(err.Error(), want) {
t.Errorf("Expected %v, received %v", want, err)
}
}
func TestWaitActivation_LogsActivationEpochOK(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := internal.NewMockValidatorServiceClient(ctrl)
v := validator{
key: validatorKey,
validatorClient: client,
}
clientStream := internal.NewMockValidatorService_WaitForActivationClient(ctrl)
client.EXPECT().WaitForActivation(
gomock.Any(),
&pb.ValidatorActivationRequest{
Pubkey: v.key.PublicKey.Marshal(),
},
).Return(clientStream, nil)
clientStream.EXPECT().Recv().Return(
&pb.ValidatorActivationResponse{
Validator: &pbp2p.Validator{
ActivationEpoch: params.BeaconConfig().GenesisEpoch,
},
},
nil,
)
if err := v.WaitForActivation(context.Background()); err != nil {
t.Errorf("Could not wait for activation: %v", err)
}
testutil.AssertLogsContain(t, hook, "Validator activated")
}
func TestUpdateAssignments_DoesNothingWhenNotEpochStartAndAlreadyExistingAssignments(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()