From 55f311eb732133dcd7c4d31ea7efc14da2631c60 Mon Sep 17 00:00:00 2001 From: Patrice Vignola Date: Wed, 18 Jan 2023 13:21:07 -0800 Subject: [PATCH] Add REST implementation for Validator's `SubscribeCommitteeSubnets` (#11804) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * WIP * Add REST implementation for Validator's SubscribeCommitteeSubnets * Remove redundant test * Initialize dutiesProvider * Remove duplicate import * Fix build break * Address PR comments * Remove file committed by mistake * Fix broken test Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com> Co-authored-by: RadosÅ‚aw Kapka --- hack/update-mockgen.sh | 1 + testing/mock/BUILD.bazel | 1 + testing/mock/validator_client_mock.go | 9 +- validator/client/beacon-api/BUILD.bazel | 5 + .../beacon-api/beacon_api_validator_client.go | 12 +- validator/client/beacon-api/duties.go | 47 +++ validator/client/beacon-api/duties_test.go | 133 ++++++++ validator/client/beacon-api/mock/BUILD.bazel | 2 + .../client/beacon-api/mock/duties_mock.go | 52 +++ .../beacon-api/subscribe_committee_subnets.go | 81 +++++ .../subscribe_committee_subnets_test.go | 295 ++++++++++++++++++ validator/client/grpc-api/BUILD.bazel | 1 + .../client/grpc-api/grpc_validator_client.go | 3 +- validator/client/iface/validator_client.go | 3 +- validator/client/validator.go | 18 +- validator/client/validator_test.go | 6 +- 16 files changed, 649 insertions(+), 20 deletions(-) create mode 100644 validator/client/beacon-api/duties.go create mode 100644 validator/client/beacon-api/duties_test.go create mode 100644 validator/client/beacon-api/mock/duties_mock.go create mode 100644 validator/client/beacon-api/subscribe_committee_subnets.go create mode 100644 validator/client/beacon-api/subscribe_committee_subnets_test.go diff --git a/hack/update-mockgen.sh b/hack/update-mockgen.sh index e7664acdf..734b9a959 100755 --- a/hack/update-mockgen.sh +++ b/hack/update-mockgen.sh @@ -73,6 +73,7 @@ gofmt -s -w "$mock_path/." beacon_api_mock_path="validator/client/beacon-api/mock" beacon_api_mocks=( "$beacon_api_mock_path/genesis_mock.go genesis.go" + "$beacon_api_mock_path/duties_mock.go duties.go" "$beacon_api_mock_path/json_rest_handler_mock.go json_rest_handler.go" "$beacon_api_mock_path/state_validators_mock.go state_validators.go" ) diff --git a/testing/mock/BUILD.bazel b/testing/mock/BUILD.bazel index 016581bcb..c9045a7a7 100644 --- a/testing/mock/BUILD.bazel +++ b/testing/mock/BUILD.bazel @@ -22,6 +22,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/v3/testing/mock", visibility = ["//visibility:public"], deps = [ + "//consensus-types/primitives:go_default_library", "//proto/eth/service:go_default_library", "//proto/eth/v1:go_default_library", "//proto/prysm/v1alpha1:go_default_library", diff --git a/testing/mock/validator_client_mock.go b/testing/mock/validator_client_mock.go index 36a3f94c3..6ed9600ba 100644 --- a/testing/mock/validator_client_mock.go +++ b/testing/mock/validator_client_mock.go @@ -9,6 +9,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" + types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" eth "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" emptypb "google.golang.org/protobuf/types/known/emptypb" ) @@ -352,18 +353,18 @@ func (mr *MockValidatorClientMockRecorder) SubmitValidatorRegistrations(arg0, ar } // SubscribeCommitteeSubnets mocks base method. -func (m *MockValidatorClient) SubscribeCommitteeSubnets(arg0 context.Context, arg1 *eth.CommitteeSubnetsSubscribeRequest) (*emptypb.Empty, error) { +func (m *MockValidatorClient) SubscribeCommitteeSubnets(arg0 context.Context, arg1 *eth.CommitteeSubnetsSubscribeRequest, arg2 []types.ValidatorIndex) (*emptypb.Empty, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SubscribeCommitteeSubnets", arg0, arg1) + ret := m.ctrl.Call(m, "SubscribeCommitteeSubnets", arg0, arg1, arg2) ret0, _ := ret[0].(*emptypb.Empty) ret1, _ := ret[1].(error) return ret0, ret1 } // SubscribeCommitteeSubnets indicates an expected call of SubscribeCommitteeSubnets. -func (mr *MockValidatorClientMockRecorder) SubscribeCommitteeSubnets(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockValidatorClientMockRecorder) SubscribeCommitteeSubnets(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeCommitteeSubnets", reflect.TypeOf((*MockValidatorClient)(nil).SubscribeCommitteeSubnets), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeCommitteeSubnets", reflect.TypeOf((*MockValidatorClient)(nil).SubscribeCommitteeSubnets), arg0, arg1, arg2) } // ValidatorIndex mocks base method. diff --git a/validator/client/beacon-api/BUILD.bazel b/validator/client/beacon-api/BUILD.bazel index 83af1a21a..9eab32e36 100644 --- a/validator/client/beacon-api/BUILD.bazel +++ b/validator/client/beacon-api/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "beacon_block_proto_helpers.go", "domain_data.go", "doppelganger.go", + "duties.go", "genesis.go", "get_beacon_block.go", "index.go", @@ -25,6 +26,7 @@ go_library( "status.go", "submit_signed_aggregate_proof.go", "submit_signed_contribution_and_proof.go", + "subscribe_committee_subnets.go", "sync_committee.go", ], importpath = "github.com/prysmaticlabs/prysm/v3/validator/client/beacon-api", @@ -62,6 +64,7 @@ go_test( "beacon_block_proto_helpers_test.go", "domain_data_test.go", "doppelganger_test.go", + "duties_test.go", "genesis_test.go", "get_beacon_block_altair_test.go", "get_beacon_block_bellatrix_test.go", @@ -85,6 +88,7 @@ go_test( "status_test.go", "submit_signed_aggregate_proof_test.go", "submit_signed_contribution_and_proof_test.go", + "subscribe_committee_subnets_test.go", "sync_committee_test.go", "wait_for_chain_start_test.go", ], @@ -100,6 +104,7 @@ go_test( "//proto/prysm/v1alpha1:go_default_library", "//testing/assert:go_default_library", "//testing/require:go_default_library", + "//time/slots:go_default_library", "//validator/client/beacon-api/mock:go_default_library", "//validator/client/beacon-api/test-helpers:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", diff --git a/validator/client/beacon-api/beacon_api_validator_client.go b/validator/client/beacon-api/beacon_api_validator_client.go index de3645cd4..6d8f2fbee 100644 --- a/validator/client/beacon-api/beacon_api_validator_client.go +++ b/validator/client/beacon-api/beacon_api_validator_client.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" + types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v3/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v3/validator/client/iface" @@ -15,6 +16,7 @@ import ( type beaconApiValidatorClient struct { genesisProvider genesisProvider + dutiesProvider dutiesProvider stateValidatorsProvider stateValidatorsProvider jsonRestHandler jsonRestHandler fallbackClient iface.ValidatorClient @@ -32,6 +34,7 @@ func NewBeaconApiValidatorClientWithFallback(host string, timeout time.Duration, return &beaconApiValidatorClient{ genesisProvider: beaconApiGenesisProvider{jsonRestHandler: jsonRestHandler}, + dutiesProvider: beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler}, stateValidatorsProvider: beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler}, jsonRestHandler: jsonRestHandler, fallbackClient: fallbackClient, @@ -166,13 +169,8 @@ func (c *beaconApiValidatorClient) SubmitValidatorRegistrations(ctx context.Cont return new(empty.Empty), c.submitValidatorRegistrations(ctx, in.Messages) } -func (c *beaconApiValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest) (*empty.Empty, error) { - if c.fallbackClient != nil { - return c.fallbackClient.SubscribeCommitteeSubnets(ctx, in) - } - - // TODO: Implement me - panic("beaconApiValidatorClient.SubscribeCommitteeSubnets is not implemented. To use a fallback client, create this validator with NewBeaconApiValidatorClientWithFallback instead.") +func (c *beaconApiValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, validatorIndices []types.ValidatorIndex) (*empty.Empty, error) { + return new(empty.Empty), c.subscribeCommitteeSubnets(ctx, in, validatorIndices) } func (c *beaconApiValidatorClient) ValidatorIndex(ctx context.Context, in *ethpb.ValidatorIndexRequest) (*ethpb.ValidatorIndexResponse, error) { diff --git a/validator/client/beacon-api/duties.go b/validator/client/beacon-api/duties.go new file mode 100644 index 000000000..c35b095b1 --- /dev/null +++ b/validator/client/beacon-api/duties.go @@ -0,0 +1,47 @@ +package beacon_api + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "strconv" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/apimiddleware" + types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" +) + +type dutiesProvider interface { + GetAttesterDuties(ctx context.Context, epoch types.Epoch, validatorIndices []types.ValidatorIndex) ([]*apimiddleware.AttesterDutyJson, error) +} + +type beaconApiDutiesProvider struct { + jsonRestHandler jsonRestHandler +} + +func (c beaconApiDutiesProvider) GetAttesterDuties(ctx context.Context, epoch types.Epoch, validatorIndices []types.ValidatorIndex) ([]*apimiddleware.AttesterDutyJson, error) { + + jsonValidatorIndices := make([]string, len(validatorIndices)) + for index, validatorIndex := range validatorIndices { + jsonValidatorIndices[index] = strconv.FormatUint(uint64(validatorIndex), 10) + } + + validatorIndicesBytes, err := json.Marshal(jsonValidatorIndices) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal validator indices") + } + + attesterDuties := &apimiddleware.AttesterDutiesResponseJson{} + if _, err := c.jsonRestHandler.PostRestJson(ctx, fmt.Sprintf("/eth/v1/validator/duties/attester/%d", epoch), nil, bytes.NewBuffer(validatorIndicesBytes), attesterDuties); err != nil { + return nil, errors.Wrap(err, "failed to send POST data to REST endpoint") + } + + for index, attesterDuty := range attesterDuties.Data { + if attesterDuty == nil { + return nil, errors.Errorf("attester duty at index `%d` is nil", index) + } + } + + return attesterDuties.Data, nil +} diff --git a/validator/client/beacon-api/duties_test.go b/validator/client/beacon-api/duties_test.go new file mode 100644 index 000000000..d555aba3e --- /dev/null +++ b/validator/client/beacon-api/duties_test.go @@ -0,0 +1,133 @@ +package beacon_api + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "testing" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/golang/mock/gomock" + "github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/apimiddleware" + types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v3/testing/assert" + "github.com/prysmaticlabs/prysm/v3/testing/require" + "github.com/prysmaticlabs/prysm/v3/validator/client/beacon-api/mock" +) + +const getAttesterDutiesTestEndpoint = "/eth/v1/validator/duties/attester" + +func TestGetAttesterDuties_Valid(t *testing.T) { + stringValidatorIndices := []string{"2", "9"} + const epoch = types.Epoch(1) + + validatorIndicesBytes, err := json.Marshal(stringValidatorIndices) + require.NoError(t, err) + + expectedAttesterDuties := apimiddleware.AttesterDutiesResponseJson{ + Data: []*apimiddleware.AttesterDutyJson{ + { + Pubkey: hexutil.Encode([]byte{1}), + ValidatorIndex: "2", + CommitteeIndex: "3", + CommitteeLength: "4", + CommitteesAtSlot: "5", + ValidatorCommitteeIndex: "6", + Slot: "7", + }, + { + Pubkey: hexutil.Encode([]byte{8}), + ValidatorIndex: "9", + CommitteeIndex: "10", + CommitteeLength: "11", + CommitteesAtSlot: "12", + ValidatorCommitteeIndex: "13", + Slot: "14", + }, + }, + } + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx := context.Background() + + validatorIndices := []types.ValidatorIndex{2, 9} + jsonRestHandler := mock.NewMockjsonRestHandler(ctrl) + jsonRestHandler.EXPECT().PostRestJson( + ctx, + fmt.Sprintf("%s/%d", getAttesterDutiesTestEndpoint, epoch), + nil, + bytes.NewBuffer(validatorIndicesBytes), + &apimiddleware.AttesterDutiesResponseJson{}, + ).Return( + nil, + nil, + ).SetArg( + 4, + expectedAttesterDuties, + ).Times(1) + + dutiesProvider := &beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler} + attesterDuties, err := dutiesProvider.GetAttesterDuties(ctx, epoch, validatorIndices) + require.NoError(t, err) + assert.DeepEqual(t, expectedAttesterDuties.Data, attesterDuties) +} + +func TestGetAttesterDuties_HttpError(t *testing.T) { + const epoch = types.Epoch(1) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx := context.Background() + + jsonRestHandler := mock.NewMockjsonRestHandler(ctrl) + jsonRestHandler.EXPECT().PostRestJson( + ctx, + fmt.Sprintf("%s/%d", getAttesterDutiesTestEndpoint, epoch), + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return( + nil, + errors.New("foo error"), + ).Times(1) + + dutiesProvider := &beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler} + _, err := dutiesProvider.GetAttesterDuties(ctx, epoch, nil) + assert.ErrorContains(t, "foo error", err) + assert.ErrorContains(t, "failed to send POST data to REST endpoint", err) +} + +func TestGetAttesterDuties_NilAttesterDuty(t *testing.T) { + const epoch = types.Epoch(1) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx := context.Background() + + jsonRestHandler := mock.NewMockjsonRestHandler(ctrl) + jsonRestHandler.EXPECT().PostRestJson( + ctx, + fmt.Sprintf("%s/%d", getAttesterDutiesTestEndpoint, epoch), + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return( + nil, + nil, + ).SetArg( + 4, + apimiddleware.AttesterDutiesResponseJson{ + Data: []*apimiddleware.AttesterDutyJson{nil}, + }, + ).Times(1) + + dutiesProvider := &beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler} + _, err := dutiesProvider.GetAttesterDuties(ctx, epoch, nil) + assert.ErrorContains(t, "attester duty at index `0` is nil", err) +} diff --git a/validator/client/beacon-api/mock/BUILD.bazel b/validator/client/beacon-api/mock/BUILD.bazel index aafe083dc..614cde870 100644 --- a/validator/client/beacon-api/mock/BUILD.bazel +++ b/validator/client/beacon-api/mock/BUILD.bazel @@ -3,6 +3,7 @@ load("@prysm//tools/go:def.bzl", "go_library") go_library( name = "go_default_library", srcs = [ + "duties_mock.go", "genesis_mock.go", "json_rest_handler_mock.go", "state_validators_mock.go", @@ -12,6 +13,7 @@ go_library( deps = [ "//api/gateway/apimiddleware:go_default_library", "//beacon-chain/rpc/apimiddleware:go_default_library", + "//consensus-types/primitives:go_default_library", "@com_github_golang_mock//gomock:go_default_library", ], ) diff --git a/validator/client/beacon-api/mock/duties_mock.go b/validator/client/beacon-api/mock/duties_mock.go new file mode 100644 index 000000000..992c6d6be --- /dev/null +++ b/validator/client/beacon-api/mock/duties_mock.go @@ -0,0 +1,52 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: validator/client/beacon-api/duties.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + apimiddleware "github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/apimiddleware" + types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" +) + +// MockdutiesProvider is a mock of dutiesProvider interface. +type MockdutiesProvider struct { + ctrl *gomock.Controller + recorder *MockdutiesProviderMockRecorder +} + +// MockdutiesProviderMockRecorder is the mock recorder for MockdutiesProvider. +type MockdutiesProviderMockRecorder struct { + mock *MockdutiesProvider +} + +// NewMockdutiesProvider creates a new mock instance. +func NewMockdutiesProvider(ctrl *gomock.Controller) *MockdutiesProvider { + mock := &MockdutiesProvider{ctrl: ctrl} + mock.recorder = &MockdutiesProviderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockdutiesProvider) EXPECT() *MockdutiesProviderMockRecorder { + return m.recorder +} + +// GetAttesterDuties mocks base method. +func (m *MockdutiesProvider) GetAttesterDuties(ctx context.Context, epoch types.Epoch, validatorIndices []types.ValidatorIndex) ([]*apimiddleware.AttesterDutyJson, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAttesterDuties", ctx, epoch, validatorIndices) + ret0, _ := ret[0].([]*apimiddleware.AttesterDutyJson) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAttesterDuties indicates an expected call of GetAttesterDuties. +func (mr *MockdutiesProviderMockRecorder) GetAttesterDuties(ctx, epoch, validatorIndices interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAttesterDuties", reflect.TypeOf((*MockdutiesProvider)(nil).GetAttesterDuties), ctx, epoch, validatorIndices) +} diff --git a/validator/client/beacon-api/subscribe_committee_subnets.go b/validator/client/beacon-api/subscribe_committee_subnets.go new file mode 100644 index 000000000..9baa6a584 --- /dev/null +++ b/validator/client/beacon-api/subscribe_committee_subnets.go @@ -0,0 +1,81 @@ +package beacon_api + +import ( + "bytes" + "context" + "encoding/json" + "strconv" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/apimiddleware" + types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v3/time/slots" +) + +func (c beaconApiValidatorClient) subscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, validatorIndices []types.ValidatorIndex) error { + if in == nil { + return errors.New("committee subnets subscribe request is nil") + } + + if len(in.CommitteeIds) != len(in.Slots) || len(in.CommitteeIds) != len(in.IsAggregator) || len(in.CommitteeIds) != len(validatorIndices) { + return errors.New("arrays `in.CommitteeIds`, `in.Slots`, `in.IsAggregator` and `validatorIndices` don't have the same length") + } + + slotToCommitteesAtSlotMap := make(map[types.Slot]uint64) + jsonCommitteeSubscriptions := make([]*apimiddleware.BeaconCommitteeSubscribeJson, len(in.CommitteeIds)) + for index := range in.CommitteeIds { + subscribeSlot := in.Slots[index] + subscribeCommitteeId := in.CommitteeIds[index] + subscribeIsAggregator := in.IsAggregator[index] + subscribeValidatorIndex := validatorIndices[index] + + committeesAtSlot, foundSlot := slotToCommitteesAtSlotMap[subscribeSlot] + if !foundSlot { + // Lazily fetch the committeesAtSlot from the beacon node if they are not already in the map + epoch := slots.ToEpoch(subscribeSlot) + duties, err := c.dutiesProvider.GetAttesterDuties(ctx, epoch, validatorIndices) + if err != nil { + return errors.Wrapf(err, "failed to get duties for epoch `%d`", epoch) + } + + for _, duty := range duties { + dutySlot, err := strconv.ParseUint(duty.Slot, 10, 64) + if err != nil { + return errors.Wrapf(err, "failed to parse slot `%s`", duty.Slot) + } + + committees, err := strconv.ParseUint(duty.CommitteesAtSlot, 10, 64) + if err != nil { + return errors.Wrapf(err, "failed to parse CommitteesAtSlot `%s`", duty.CommitteesAtSlot) + } + + slotToCommitteesAtSlotMap[types.Slot(dutySlot)] = committees + } + + // If the slot still isn't in the map, we either received bad data from the beacon node or the caller of this function gave us bad data + if committeesAtSlot, foundSlot = slotToCommitteesAtSlotMap[subscribeSlot]; !foundSlot { + return errors.Errorf("failed to get committees for slot `%d`", subscribeSlot) + } + } + + jsonCommitteeSubscriptions[index] = &apimiddleware.BeaconCommitteeSubscribeJson{ + CommitteeIndex: strconv.FormatUint(uint64(subscribeCommitteeId), 10), + CommitteesAtSlot: strconv.FormatUint(committeesAtSlot, 10), + Slot: strconv.FormatUint(uint64(subscribeSlot), 10), + IsAggregator: subscribeIsAggregator, + ValidatorIndex: strconv.FormatUint(uint64(subscribeValidatorIndex), 10), + } + } + + committeeSubscriptionsBytes, err := json.Marshal(jsonCommitteeSubscriptions) + if err != nil { + return errors.Wrap(err, "failed to marshal committees subscriptions") + } + + if _, err := c.jsonRestHandler.PostRestJson(ctx, "/eth/v1/validator/beacon_committee_subscriptions", nil, bytes.NewBuffer(committeeSubscriptionsBytes), nil); err != nil { + return errors.Wrap(err, "failed to send POST data to REST endpoint") + } + + return nil +} diff --git a/validator/client/beacon-api/subscribe_committee_subnets_test.go b/validator/client/beacon-api/subscribe_committee_subnets_test.go new file mode 100644 index 000000000..d45c8f9fe --- /dev/null +++ b/validator/client/beacon-api/subscribe_committee_subnets_test.go @@ -0,0 +1,295 @@ +package beacon_api + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "strconv" + "testing" + + "github.com/golang/mock/gomock" + "github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/apimiddleware" + types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v3/testing/assert" + "github.com/prysmaticlabs/prysm/v3/testing/require" + "github.com/prysmaticlabs/prysm/v3/time/slots" + "github.com/prysmaticlabs/prysm/v3/validator/client/beacon-api/mock" +) + +const subscribeCommitteeSubnetsTestEndpoint = "/eth/v1/validator/beacon_committee_subscriptions" + +func TestSubscribeCommitteeSubnets_Valid(t *testing.T) { + subscribeSlots := []types.Slot{0, 1, 100} + validatorIndices := []types.ValidatorIndex{2, 3, 4} + committeesAtSlot := []uint64{5, 6, 7} + isAggregator := []bool{false, true, false} + committeeIndices := []types.CommitteeIndex{8, 9, 10} + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + jsonCommitteeSubscriptions := make([]*apimiddleware.BeaconCommitteeSubscribeJson, len(subscribeSlots)) + for index := range jsonCommitteeSubscriptions { + jsonCommitteeSubscriptions[index] = &apimiddleware.BeaconCommitteeSubscribeJson{ + ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[index]), 10), + CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[index]), 10), + CommitteesAtSlot: strconv.FormatUint(committeesAtSlot[index], 10), + Slot: strconv.FormatUint(uint64(subscribeSlots[index]), 10), + IsAggregator: isAggregator[index], + } + } + + committeeSubscriptionsBytes, err := json.Marshal(jsonCommitteeSubscriptions) + require.NoError(t, err) + + ctx := context.Background() + + jsonRestHandler := mock.NewMockjsonRestHandler(ctrl) + jsonRestHandler.EXPECT().PostRestJson( + ctx, + subscribeCommitteeSubnetsTestEndpoint, + nil, + bytes.NewBuffer(committeeSubscriptionsBytes), + nil, + ).Return( + nil, + nil, + ).Times(1) + + duties := make([]*apimiddleware.AttesterDutyJson, len(subscribeSlots)) + for index := range duties { + duties[index] = &apimiddleware.AttesterDutyJson{ + ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[index]), 10), + CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[index]), 10), + CommitteesAtSlot: strconv.FormatUint(committeesAtSlot[index], 10), + Slot: strconv.FormatUint(uint64(subscribeSlots[index]), 10), + } + } + + // Even though we have 3 distinct slots, the first 2 ones are in the same epoch so we should only send 2 requests to the beacon node + dutiesProvider := mock.NewMockdutiesProvider(ctrl) + dutiesProvider.EXPECT().GetAttesterDuties( + ctx, + slots.ToEpoch(subscribeSlots[0]), + validatorIndices, + ).Return( + []*apimiddleware.AttesterDutyJson{ + { + CommitteesAtSlot: strconv.FormatUint(committeesAtSlot[0], 10), + Slot: strconv.FormatUint(uint64(subscribeSlots[0]), 10), + }, + { + CommitteesAtSlot: strconv.FormatUint(committeesAtSlot[1], 10), + Slot: strconv.FormatUint(uint64(subscribeSlots[1]), 10), + }, + }, + nil, + ).Times(1) + + dutiesProvider.EXPECT().GetAttesterDuties( + ctx, + slots.ToEpoch(subscribeSlots[2]), + validatorIndices, + ).Return( + []*apimiddleware.AttesterDutyJson{ + { + CommitteesAtSlot: strconv.FormatUint(committeesAtSlot[2], 10), + Slot: strconv.FormatUint(uint64(subscribeSlots[2]), 10), + }, + }, + nil, + ).Times(1) + + validatorClient := &beaconApiValidatorClient{ + jsonRestHandler: jsonRestHandler, + dutiesProvider: dutiesProvider, + } + err = validatorClient.subscribeCommitteeSubnets( + ctx, + ðpb.CommitteeSubnetsSubscribeRequest{ + Slots: subscribeSlots, + CommitteeIds: committeeIndices, + IsAggregator: isAggregator, + }, + validatorIndices, + ) + require.NoError(t, err) +} + +func TestSubscribeCommitteeSubnets_Error(t *testing.T) { + const arraySizeMismatchErrorMessage = "arrays `in.CommitteeIds`, `in.Slots`, `in.IsAggregator` and `validatorIndices` don't have the same length" + + testCases := []struct { + name string + subscribeRequest *ethpb.CommitteeSubnetsSubscribeRequest + validatorIndices []types.ValidatorIndex + attesterDuty *apimiddleware.AttesterDutyJson + dutiesError error + expectGetDutiesQuery bool + expectSubscribeRestCall bool + expectedErrorMessage string + }{ + { + name: "nil subscribe request", + subscribeRequest: nil, + expectedErrorMessage: "committee subnets subscribe request is nil", + }, + { + name: "CommitteeIds size mismatch", + subscribeRequest: ðpb.CommitteeSubnetsSubscribeRequest{ + CommitteeIds: []types.CommitteeIndex{1}, + Slots: []types.Slot{1, 2}, + IsAggregator: []bool{false, true}, + }, + validatorIndices: []types.ValidatorIndex{1, 2}, + expectedErrorMessage: arraySizeMismatchErrorMessage, + }, + { + name: "Slots size mismatch", + subscribeRequest: ðpb.CommitteeSubnetsSubscribeRequest{ + CommitteeIds: []types.CommitteeIndex{1, 2}, + Slots: []types.Slot{1}, + IsAggregator: []bool{false, true}, + }, + validatorIndices: []types.ValidatorIndex{1, 2}, + expectedErrorMessage: arraySizeMismatchErrorMessage, + }, + { + name: "IsAggregator size mismatch", + subscribeRequest: ðpb.CommitteeSubnetsSubscribeRequest{ + CommitteeIds: []types.CommitteeIndex{1, 2}, + Slots: []types.Slot{1, 2}, + IsAggregator: []bool{false}, + }, + validatorIndices: []types.ValidatorIndex{1, 2}, + expectedErrorMessage: arraySizeMismatchErrorMessage, + }, + { + name: "ValidatorIndices size mismatch", + subscribeRequest: ðpb.CommitteeSubnetsSubscribeRequest{ + CommitteeIds: []types.CommitteeIndex{1, 2}, + Slots: []types.Slot{1, 2}, + IsAggregator: []bool{false, true}, + }, + validatorIndices: []types.ValidatorIndex{1}, + expectedErrorMessage: arraySizeMismatchErrorMessage, + }, + { + name: "bad duties query", + subscribeRequest: ðpb.CommitteeSubnetsSubscribeRequest{ + Slots: []types.Slot{1}, + CommitteeIds: []types.CommitteeIndex{2}, + IsAggregator: []bool{false}, + }, + validatorIndices: []types.ValidatorIndex{3}, + dutiesError: errors.New("foo error"), + expectGetDutiesQuery: true, + expectedErrorMessage: "failed to get duties for epoch `0`: foo error", + }, + { + name: "bad duty slot", + subscribeRequest: ðpb.CommitteeSubnetsSubscribeRequest{ + Slots: []types.Slot{1}, + CommitteeIds: []types.CommitteeIndex{2}, + IsAggregator: []bool{false}, + }, + validatorIndices: []types.ValidatorIndex{3}, + attesterDuty: &apimiddleware.AttesterDutyJson{ + Slot: "foo", + CommitteesAtSlot: "1", + }, + expectGetDutiesQuery: true, + expectedErrorMessage: "failed to parse slot `foo`", + }, + { + name: "bad duty committees at slot", + subscribeRequest: ðpb.CommitteeSubnetsSubscribeRequest{ + Slots: []types.Slot{1}, + CommitteeIds: []types.CommitteeIndex{2}, + IsAggregator: []bool{false}, + }, + validatorIndices: []types.ValidatorIndex{3}, + attesterDuty: &apimiddleware.AttesterDutyJson{ + Slot: "1", + CommitteesAtSlot: "foo", + }, + expectGetDutiesQuery: true, + expectedErrorMessage: "failed to parse CommitteesAtSlot `foo`", + }, + { + name: "missing slot in duties", + subscribeRequest: ðpb.CommitteeSubnetsSubscribeRequest{ + Slots: []types.Slot{1}, + CommitteeIds: []types.CommitteeIndex{2}, + IsAggregator: []bool{false}, + }, + validatorIndices: []types.ValidatorIndex{3}, + attesterDuty: &apimiddleware.AttesterDutyJson{ + Slot: "2", + CommitteesAtSlot: "3", + }, + expectGetDutiesQuery: true, + expectedErrorMessage: "failed to get committees for slot `1`", + }, + { + name: "bad POST request", + subscribeRequest: ðpb.CommitteeSubnetsSubscribeRequest{ + Slots: []types.Slot{1}, + CommitteeIds: []types.CommitteeIndex{2}, + IsAggregator: []bool{false}, + }, + validatorIndices: []types.ValidatorIndex{3}, + attesterDuty: &apimiddleware.AttesterDutyJson{ + Slot: "1", + CommitteesAtSlot: "2", + }, + expectGetDutiesQuery: true, + expectSubscribeRestCall: true, + expectedErrorMessage: "failed to send POST data to REST endpoint: foo error", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx := context.Background() + + dutiesProvider := mock.NewMockdutiesProvider(ctrl) + if testCase.expectGetDutiesQuery { + dutiesProvider.EXPECT().GetAttesterDuties( + ctx, + gomock.Any(), + gomock.Any(), + ).Return( + []*apimiddleware.AttesterDutyJson{testCase.attesterDuty}, + testCase.dutiesError, + ).Times(1) + } + + jsonRestHandler := mock.NewMockjsonRestHandler(ctrl) + if testCase.expectSubscribeRestCall { + jsonRestHandler.EXPECT().PostRestJson( + ctx, + subscribeCommitteeSubnetsTestEndpoint, + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return( + nil, + errors.New("foo error"), + ).Times(1) + } + + validatorClient := &beaconApiValidatorClient{ + jsonRestHandler: jsonRestHandler, + dutiesProvider: dutiesProvider, + } + err := validatorClient.subscribeCommitteeSubnets(ctx, testCase.subscribeRequest, testCase.validatorIndices) + assert.ErrorContains(t, testCase.expectedErrorMessage, err) + }) + } +} diff --git a/validator/client/grpc-api/BUILD.bazel b/validator/client/grpc-api/BUILD.bazel index 19da666ec..f36b50c5c 100644 --- a/validator/client/grpc-api/BUILD.bazel +++ b/validator/client/grpc-api/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/v3/validator/client/grpc-api", visibility = ["//validator:__subpackages__"], deps = [ + "//consensus-types/primitives:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//validator/client/iface:go_default_library", "@com_github_pkg_errors//:go_default_library", diff --git a/validator/client/grpc-api/grpc_validator_client.go b/validator/client/grpc-api/grpc_validator_client.go index 4b39a9cac..f051bf7d5 100644 --- a/validator/client/grpc-api/grpc_validator_client.go +++ b/validator/client/grpc-api/grpc_validator_client.go @@ -5,6 +5,7 @@ import ( "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" + types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" iface "github.com/prysmaticlabs/prysm/v3/validator/client/iface" "google.golang.org/grpc" @@ -98,7 +99,7 @@ func (c *grpcValidatorClient) SubmitValidatorRegistrations(ctx context.Context, return c.beaconNodeValidatorClient.SubmitValidatorRegistrations(ctx, in) } -func (c *grpcValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest) (*empty.Empty, error) { +func (c *grpcValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, _ []types.ValidatorIndex) (*empty.Empty, error) { return c.beaconNodeValidatorClient.SubscribeCommitteeSubnets(ctx, in) } diff --git a/validator/client/iface/validator_client.go b/validator/client/iface/validator_client.go index 27608af9c..d71edc5f0 100644 --- a/validator/client/iface/validator_client.go +++ b/validator/client/iface/validator_client.go @@ -4,6 +4,7 @@ import ( "context" "github.com/golang/protobuf/ptypes/empty" + types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" ) @@ -25,7 +26,7 @@ type ValidatorClient interface { SubmitAggregateSelectionProof(ctx context.Context, in *ethpb.AggregateSelectionRequest) (*ethpb.AggregateSelectionResponse, error) SubmitSignedAggregateSelectionProof(ctx context.Context, in *ethpb.SignedAggregateSubmitRequest) (*ethpb.SignedAggregateSubmitResponse, error) ProposeExit(ctx context.Context, in *ethpb.SignedVoluntaryExit) (*ethpb.ProposeExitResponse, error) - SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest) (*empty.Empty, error) + SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, validatorIndices []types.ValidatorIndex) (*empty.Empty, error) CheckDoppelGanger(ctx context.Context, in *ethpb.DoppelGangerRequest) (*ethpb.DoppelGangerResponse, error) GetSyncMessageBlockRoot(ctx context.Context, in *empty.Empty) (*ethpb.SyncMessageBlockRootResponse, error) SubmitSyncMessage(ctx context.Context, in *ethpb.SyncCommitteeMessage) (*empty.Empty, error) diff --git a/validator/client/validator.go b/validator/client/validator.go index 76ad72861..2a8853e7f 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -628,6 +628,7 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes subscribeSlots := make([]types.Slot, 0, len(res.CurrentEpochDuties)+len(res.NextEpochDuties)) subscribeCommitteeIndices := make([]types.CommitteeIndex, 0, len(res.CurrentEpochDuties)+len(res.NextEpochDuties)) subscribeIsAggregator := make([]bool, 0, len(res.CurrentEpochDuties)+len(res.NextEpochDuties)) + subscribeValidatorIndices := make([]types.ValidatorIndex, 0, len(res.CurrentEpochDuties)+len(res.NextEpochDuties)) alreadySubscribed := make(map[[64]byte]bool) for _, duty := range res.CurrentEpochDuties { @@ -635,6 +636,7 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes if duty.Status == ethpb.ValidatorStatus_ACTIVE || duty.Status == ethpb.ValidatorStatus_EXITING { attesterSlot := duty.AttesterSlot committeeIndex := duty.CommitteeIndex + validatorIndex := duty.ValidatorIndex alreadySubscribedKey := validatorSubscribeKey(attesterSlot, committeeIndex) if _, ok := alreadySubscribed[alreadySubscribedKey]; ok { @@ -652,6 +654,7 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes subscribeSlots = append(subscribeSlots, attesterSlot) subscribeCommitteeIndices = append(subscribeCommitteeIndices, committeeIndex) subscribeIsAggregator = append(subscribeIsAggregator, aggregator) + subscribeValidatorIndices = append(subscribeValidatorIndices, validatorIndex) } } @@ -659,6 +662,7 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes if duty.Status == ethpb.ValidatorStatus_ACTIVE || duty.Status == ethpb.ValidatorStatus_EXITING { attesterSlot := duty.AttesterSlot committeeIndex := duty.CommitteeIndex + validatorIndex := duty.ValidatorIndex alreadySubscribedKey := validatorSubscribeKey(attesterSlot, committeeIndex) if _, ok := alreadySubscribed[alreadySubscribedKey]; ok { @@ -676,14 +680,18 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes subscribeSlots = append(subscribeSlots, attesterSlot) subscribeCommitteeIndices = append(subscribeCommitteeIndices, committeeIndex) subscribeIsAggregator = append(subscribeIsAggregator, aggregator) + subscribeValidatorIndices = append(subscribeValidatorIndices, validatorIndex) } } - _, err := v.validatorClient.SubscribeCommitteeSubnets(ctx, ðpb.CommitteeSubnetsSubscribeRequest{ - Slots: subscribeSlots, - CommitteeIds: subscribeCommitteeIndices, - IsAggregator: subscribeIsAggregator, - }) + _, err := v.validatorClient.SubscribeCommitteeSubnets(ctx, + ðpb.CommitteeSubnetsSubscribeRequest{ + Slots: subscribeSlots, + CommitteeIds: subscribeCommitteeIndices, + IsAggregator: subscribeIsAggregator, + }, + subscribeValidatorIndices, + ) return err } diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index ebf24b755..a2f41f85b 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -539,7 +539,8 @@ func TestUpdateDuties_OK(t *testing.T) { client.EXPECT().SubscribeCommitteeSubnets( gomock.Any(), gomock.Any(), - ).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest) (*emptypb.Empty, error) { + gomock.Any(), + ).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest, _ []types.ValidatorIndex) (*emptypb.Empty, error) { wg.Done() return nil, nil }) @@ -594,7 +595,8 @@ func TestUpdateDuties_OK_FilterBlacklistedPublicKeys(t *testing.T) { client.EXPECT().SubscribeCommitteeSubnets( gomock.Any(), gomock.Any(), - ).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest) (*emptypb.Empty, error) { + gomock.Any(), + ).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest, _ []types.ValidatorIndex) (*emptypb.Empty, error) { wg.Done() return nil, nil })