Add REST implementation for Validator's SubscribeCommitteeSubnets (#11804)

* WIP

* Add REST implementation for Validator's SubscribeCommitteeSubnets

* Remove redundant test

* Initialize dutiesProvider

* Remove duplicate import

* Fix build break

* Address PR comments

* Remove file committed by mistake

* Fix broken test

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
This commit is contained in:
Patrice Vignola 2023-01-18 13:21:07 -08:00 committed by GitHub
parent 0f90bacac9
commit 55f311eb73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 649 additions and 20 deletions

View File

@ -73,6 +73,7 @@ gofmt -s -w "$mock_path/."
beacon_api_mock_path="validator/client/beacon-api/mock" beacon_api_mock_path="validator/client/beacon-api/mock"
beacon_api_mocks=( beacon_api_mocks=(
"$beacon_api_mock_path/genesis_mock.go genesis.go" "$beacon_api_mock_path/genesis_mock.go genesis.go"
"$beacon_api_mock_path/duties_mock.go duties.go"
"$beacon_api_mock_path/json_rest_handler_mock.go json_rest_handler.go" "$beacon_api_mock_path/json_rest_handler_mock.go json_rest_handler.go"
"$beacon_api_mock_path/state_validators_mock.go state_validators.go" "$beacon_api_mock_path/state_validators_mock.go state_validators.go"
) )

View File

@ -22,6 +22,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v3/testing/mock", importpath = "github.com/prysmaticlabs/prysm/v3/testing/mock",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//consensus-types/primitives:go_default_library",
"//proto/eth/service:go_default_library", "//proto/eth/service:go_default_library",
"//proto/eth/v1:go_default_library", "//proto/eth/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1:go_default_library",

View File

@ -9,6 +9,7 @@ import (
reflect "reflect" reflect "reflect"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
eth "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" eth "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
emptypb "google.golang.org/protobuf/types/known/emptypb" emptypb "google.golang.org/protobuf/types/known/emptypb"
) )
@ -352,18 +353,18 @@ func (mr *MockValidatorClientMockRecorder) SubmitValidatorRegistrations(arg0, ar
} }
// SubscribeCommitteeSubnets mocks base method. // SubscribeCommitteeSubnets mocks base method.
func (m *MockValidatorClient) SubscribeCommitteeSubnets(arg0 context.Context, arg1 *eth.CommitteeSubnetsSubscribeRequest) (*emptypb.Empty, error) { func (m *MockValidatorClient) SubscribeCommitteeSubnets(arg0 context.Context, arg1 *eth.CommitteeSubnetsSubscribeRequest, arg2 []types.ValidatorIndex) (*emptypb.Empty, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SubscribeCommitteeSubnets", arg0, arg1) ret := m.ctrl.Call(m, "SubscribeCommitteeSubnets", arg0, arg1, arg2)
ret0, _ := ret[0].(*emptypb.Empty) ret0, _ := ret[0].(*emptypb.Empty)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }
// SubscribeCommitteeSubnets indicates an expected call of SubscribeCommitteeSubnets. // SubscribeCommitteeSubnets indicates an expected call of SubscribeCommitteeSubnets.
func (mr *MockValidatorClientMockRecorder) SubscribeCommitteeSubnets(arg0, arg1 interface{}) *gomock.Call { func (mr *MockValidatorClientMockRecorder) SubscribeCommitteeSubnets(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeCommitteeSubnets", reflect.TypeOf((*MockValidatorClient)(nil).SubscribeCommitteeSubnets), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeCommitteeSubnets", reflect.TypeOf((*MockValidatorClient)(nil).SubscribeCommitteeSubnets), arg0, arg1, arg2)
} }
// ValidatorIndex mocks base method. // ValidatorIndex mocks base method.

View File

@ -11,6 +11,7 @@ go_library(
"beacon_block_proto_helpers.go", "beacon_block_proto_helpers.go",
"domain_data.go", "domain_data.go",
"doppelganger.go", "doppelganger.go",
"duties.go",
"genesis.go", "genesis.go",
"get_beacon_block.go", "get_beacon_block.go",
"index.go", "index.go",
@ -25,6 +26,7 @@ go_library(
"status.go", "status.go",
"submit_signed_aggregate_proof.go", "submit_signed_aggregate_proof.go",
"submit_signed_contribution_and_proof.go", "submit_signed_contribution_and_proof.go",
"subscribe_committee_subnets.go",
"sync_committee.go", "sync_committee.go",
], ],
importpath = "github.com/prysmaticlabs/prysm/v3/validator/client/beacon-api", importpath = "github.com/prysmaticlabs/prysm/v3/validator/client/beacon-api",
@ -62,6 +64,7 @@ go_test(
"beacon_block_proto_helpers_test.go", "beacon_block_proto_helpers_test.go",
"domain_data_test.go", "domain_data_test.go",
"doppelganger_test.go", "doppelganger_test.go",
"duties_test.go",
"genesis_test.go", "genesis_test.go",
"get_beacon_block_altair_test.go", "get_beacon_block_altair_test.go",
"get_beacon_block_bellatrix_test.go", "get_beacon_block_bellatrix_test.go",
@ -85,6 +88,7 @@ go_test(
"status_test.go", "status_test.go",
"submit_signed_aggregate_proof_test.go", "submit_signed_aggregate_proof_test.go",
"submit_signed_contribution_and_proof_test.go", "submit_signed_contribution_and_proof_test.go",
"subscribe_committee_subnets_test.go",
"sync_committee_test.go", "sync_committee_test.go",
"wait_for_chain_start_test.go", "wait_for_chain_start_test.go",
], ],
@ -100,6 +104,7 @@ go_test(
"//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library", "//testing/assert:go_default_library",
"//testing/require:go_default_library", "//testing/require:go_default_library",
"//time/slots:go_default_library",
"//validator/client/beacon-api/mock:go_default_library", "//validator/client/beacon-api/mock:go_default_library",
"//validator/client/beacon-api/test-helpers:go_default_library", "//validator/client/beacon-api/test-helpers:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",

View File

@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/golang/protobuf/ptypes/empty" "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors" "github.com/pkg/errors"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/validator/client/iface" "github.com/prysmaticlabs/prysm/v3/validator/client/iface"
@ -15,6 +16,7 @@ import (
type beaconApiValidatorClient struct { type beaconApiValidatorClient struct {
genesisProvider genesisProvider genesisProvider genesisProvider
dutiesProvider dutiesProvider
stateValidatorsProvider stateValidatorsProvider stateValidatorsProvider stateValidatorsProvider
jsonRestHandler jsonRestHandler jsonRestHandler jsonRestHandler
fallbackClient iface.ValidatorClient fallbackClient iface.ValidatorClient
@ -32,6 +34,7 @@ func NewBeaconApiValidatorClientWithFallback(host string, timeout time.Duration,
return &beaconApiValidatorClient{ return &beaconApiValidatorClient{
genesisProvider: beaconApiGenesisProvider{jsonRestHandler: jsonRestHandler}, genesisProvider: beaconApiGenesisProvider{jsonRestHandler: jsonRestHandler},
dutiesProvider: beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler},
stateValidatorsProvider: beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler}, stateValidatorsProvider: beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler},
jsonRestHandler: jsonRestHandler, jsonRestHandler: jsonRestHandler,
fallbackClient: fallbackClient, fallbackClient: fallbackClient,
@ -166,13 +169,8 @@ func (c *beaconApiValidatorClient) SubmitValidatorRegistrations(ctx context.Cont
return new(empty.Empty), c.submitValidatorRegistrations(ctx, in.Messages) return new(empty.Empty), c.submitValidatorRegistrations(ctx, in.Messages)
} }
func (c *beaconApiValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest) (*empty.Empty, error) { func (c *beaconApiValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, validatorIndices []types.ValidatorIndex) (*empty.Empty, error) {
if c.fallbackClient != nil { return new(empty.Empty), c.subscribeCommitteeSubnets(ctx, in, validatorIndices)
return c.fallbackClient.SubscribeCommitteeSubnets(ctx, in)
}
// TODO: Implement me
panic("beaconApiValidatorClient.SubscribeCommitteeSubnets is not implemented. To use a fallback client, create this validator with NewBeaconApiValidatorClientWithFallback instead.")
} }
func (c *beaconApiValidatorClient) ValidatorIndex(ctx context.Context, in *ethpb.ValidatorIndexRequest) (*ethpb.ValidatorIndexResponse, error) { func (c *beaconApiValidatorClient) ValidatorIndex(ctx context.Context, in *ethpb.ValidatorIndexRequest) (*ethpb.ValidatorIndexResponse, error) {

View File

@ -0,0 +1,47 @@
package beacon_api
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strconv"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/apimiddleware"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
)
type dutiesProvider interface {
GetAttesterDuties(ctx context.Context, epoch types.Epoch, validatorIndices []types.ValidatorIndex) ([]*apimiddleware.AttesterDutyJson, error)
}
type beaconApiDutiesProvider struct {
jsonRestHandler jsonRestHandler
}
func (c beaconApiDutiesProvider) GetAttesterDuties(ctx context.Context, epoch types.Epoch, validatorIndices []types.ValidatorIndex) ([]*apimiddleware.AttesterDutyJson, error) {
jsonValidatorIndices := make([]string, len(validatorIndices))
for index, validatorIndex := range validatorIndices {
jsonValidatorIndices[index] = strconv.FormatUint(uint64(validatorIndex), 10)
}
validatorIndicesBytes, err := json.Marshal(jsonValidatorIndices)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal validator indices")
}
attesterDuties := &apimiddleware.AttesterDutiesResponseJson{}
if _, err := c.jsonRestHandler.PostRestJson(ctx, fmt.Sprintf("/eth/v1/validator/duties/attester/%d", epoch), nil, bytes.NewBuffer(validatorIndicesBytes), attesterDuties); err != nil {
return nil, errors.Wrap(err, "failed to send POST data to REST endpoint")
}
for index, attesterDuty := range attesterDuties.Data {
if attesterDuty == nil {
return nil, errors.Errorf("attester duty at index `%d` is nil", index)
}
}
return attesterDuties.Data, nil
}

View File

@ -0,0 +1,133 @@
package beacon_api
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"testing"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/golang/mock/gomock"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/apimiddleware"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
"github.com/prysmaticlabs/prysm/v3/testing/require"
"github.com/prysmaticlabs/prysm/v3/validator/client/beacon-api/mock"
)
const getAttesterDutiesTestEndpoint = "/eth/v1/validator/duties/attester"
func TestGetAttesterDuties_Valid(t *testing.T) {
stringValidatorIndices := []string{"2", "9"}
const epoch = types.Epoch(1)
validatorIndicesBytes, err := json.Marshal(stringValidatorIndices)
require.NoError(t, err)
expectedAttesterDuties := apimiddleware.AttesterDutiesResponseJson{
Data: []*apimiddleware.AttesterDutyJson{
{
Pubkey: hexutil.Encode([]byte{1}),
ValidatorIndex: "2",
CommitteeIndex: "3",
CommitteeLength: "4",
CommitteesAtSlot: "5",
ValidatorCommitteeIndex: "6",
Slot: "7",
},
{
Pubkey: hexutil.Encode([]byte{8}),
ValidatorIndex: "9",
CommitteeIndex: "10",
CommitteeLength: "11",
CommitteesAtSlot: "12",
ValidatorCommitteeIndex: "13",
Slot: "14",
},
},
}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx := context.Background()
validatorIndices := []types.ValidatorIndex{2, 9}
jsonRestHandler := mock.NewMockjsonRestHandler(ctrl)
jsonRestHandler.EXPECT().PostRestJson(
ctx,
fmt.Sprintf("%s/%d", getAttesterDutiesTestEndpoint, epoch),
nil,
bytes.NewBuffer(validatorIndicesBytes),
&apimiddleware.AttesterDutiesResponseJson{},
).Return(
nil,
nil,
).SetArg(
4,
expectedAttesterDuties,
).Times(1)
dutiesProvider := &beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler}
attesterDuties, err := dutiesProvider.GetAttesterDuties(ctx, epoch, validatorIndices)
require.NoError(t, err)
assert.DeepEqual(t, expectedAttesterDuties.Data, attesterDuties)
}
func TestGetAttesterDuties_HttpError(t *testing.T) {
const epoch = types.Epoch(1)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx := context.Background()
jsonRestHandler := mock.NewMockjsonRestHandler(ctrl)
jsonRestHandler.EXPECT().PostRestJson(
ctx,
fmt.Sprintf("%s/%d", getAttesterDutiesTestEndpoint, epoch),
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(
nil,
errors.New("foo error"),
).Times(1)
dutiesProvider := &beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler}
_, err := dutiesProvider.GetAttesterDuties(ctx, epoch, nil)
assert.ErrorContains(t, "foo error", err)
assert.ErrorContains(t, "failed to send POST data to REST endpoint", err)
}
func TestGetAttesterDuties_NilAttesterDuty(t *testing.T) {
const epoch = types.Epoch(1)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx := context.Background()
jsonRestHandler := mock.NewMockjsonRestHandler(ctrl)
jsonRestHandler.EXPECT().PostRestJson(
ctx,
fmt.Sprintf("%s/%d", getAttesterDutiesTestEndpoint, epoch),
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(
nil,
nil,
).SetArg(
4,
apimiddleware.AttesterDutiesResponseJson{
Data: []*apimiddleware.AttesterDutyJson{nil},
},
).Times(1)
dutiesProvider := &beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler}
_, err := dutiesProvider.GetAttesterDuties(ctx, epoch, nil)
assert.ErrorContains(t, "attester duty at index `0` is nil", err)
}

View File

@ -3,6 +3,7 @@ load("@prysm//tools/go:def.bzl", "go_library")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"duties_mock.go",
"genesis_mock.go", "genesis_mock.go",
"json_rest_handler_mock.go", "json_rest_handler_mock.go",
"state_validators_mock.go", "state_validators_mock.go",
@ -12,6 +13,7 @@ go_library(
deps = [ deps = [
"//api/gateway/apimiddleware:go_default_library", "//api/gateway/apimiddleware:go_default_library",
"//beacon-chain/rpc/apimiddleware:go_default_library", "//beacon-chain/rpc/apimiddleware:go_default_library",
"//consensus-types/primitives:go_default_library",
"@com_github_golang_mock//gomock:go_default_library", "@com_github_golang_mock//gomock:go_default_library",
], ],
) )

View File

@ -0,0 +1,52 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: validator/client/beacon-api/duties.go
// Package mock is a generated GoMock package.
package mock
import (
context "context"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
apimiddleware "github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/apimiddleware"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
)
// MockdutiesProvider is a mock of dutiesProvider interface.
type MockdutiesProvider struct {
ctrl *gomock.Controller
recorder *MockdutiesProviderMockRecorder
}
// MockdutiesProviderMockRecorder is the mock recorder for MockdutiesProvider.
type MockdutiesProviderMockRecorder struct {
mock *MockdutiesProvider
}
// NewMockdutiesProvider creates a new mock instance.
func NewMockdutiesProvider(ctrl *gomock.Controller) *MockdutiesProvider {
mock := &MockdutiesProvider{ctrl: ctrl}
mock.recorder = &MockdutiesProviderMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockdutiesProvider) EXPECT() *MockdutiesProviderMockRecorder {
return m.recorder
}
// GetAttesterDuties mocks base method.
func (m *MockdutiesProvider) GetAttesterDuties(ctx context.Context, epoch types.Epoch, validatorIndices []types.ValidatorIndex) ([]*apimiddleware.AttesterDutyJson, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetAttesterDuties", ctx, epoch, validatorIndices)
ret0, _ := ret[0].([]*apimiddleware.AttesterDutyJson)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetAttesterDuties indicates an expected call of GetAttesterDuties.
func (mr *MockdutiesProviderMockRecorder) GetAttesterDuties(ctx, epoch, validatorIndices interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAttesterDuties", reflect.TypeOf((*MockdutiesProvider)(nil).GetAttesterDuties), ctx, epoch, validatorIndices)
}

View File

@ -0,0 +1,81 @@
package beacon_api
import (
"bytes"
"context"
"encoding/json"
"strconv"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/apimiddleware"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/time/slots"
)
func (c beaconApiValidatorClient) subscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, validatorIndices []types.ValidatorIndex) error {
if in == nil {
return errors.New("committee subnets subscribe request is nil")
}
if len(in.CommitteeIds) != len(in.Slots) || len(in.CommitteeIds) != len(in.IsAggregator) || len(in.CommitteeIds) != len(validatorIndices) {
return errors.New("arrays `in.CommitteeIds`, `in.Slots`, `in.IsAggregator` and `validatorIndices` don't have the same length")
}
slotToCommitteesAtSlotMap := make(map[types.Slot]uint64)
jsonCommitteeSubscriptions := make([]*apimiddleware.BeaconCommitteeSubscribeJson, len(in.CommitteeIds))
for index := range in.CommitteeIds {
subscribeSlot := in.Slots[index]
subscribeCommitteeId := in.CommitteeIds[index]
subscribeIsAggregator := in.IsAggregator[index]
subscribeValidatorIndex := validatorIndices[index]
committeesAtSlot, foundSlot := slotToCommitteesAtSlotMap[subscribeSlot]
if !foundSlot {
// Lazily fetch the committeesAtSlot from the beacon node if they are not already in the map
epoch := slots.ToEpoch(subscribeSlot)
duties, err := c.dutiesProvider.GetAttesterDuties(ctx, epoch, validatorIndices)
if err != nil {
return errors.Wrapf(err, "failed to get duties for epoch `%d`", epoch)
}
for _, duty := range duties {
dutySlot, err := strconv.ParseUint(duty.Slot, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse slot `%s`", duty.Slot)
}
committees, err := strconv.ParseUint(duty.CommitteesAtSlot, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse CommitteesAtSlot `%s`", duty.CommitteesAtSlot)
}
slotToCommitteesAtSlotMap[types.Slot(dutySlot)] = committees
}
// If the slot still isn't in the map, we either received bad data from the beacon node or the caller of this function gave us bad data
if committeesAtSlot, foundSlot = slotToCommitteesAtSlotMap[subscribeSlot]; !foundSlot {
return errors.Errorf("failed to get committees for slot `%d`", subscribeSlot)
}
}
jsonCommitteeSubscriptions[index] = &apimiddleware.BeaconCommitteeSubscribeJson{
CommitteeIndex: strconv.FormatUint(uint64(subscribeCommitteeId), 10),
CommitteesAtSlot: strconv.FormatUint(committeesAtSlot, 10),
Slot: strconv.FormatUint(uint64(subscribeSlot), 10),
IsAggregator: subscribeIsAggregator,
ValidatorIndex: strconv.FormatUint(uint64(subscribeValidatorIndex), 10),
}
}
committeeSubscriptionsBytes, err := json.Marshal(jsonCommitteeSubscriptions)
if err != nil {
return errors.Wrap(err, "failed to marshal committees subscriptions")
}
if _, err := c.jsonRestHandler.PostRestJson(ctx, "/eth/v1/validator/beacon_committee_subscriptions", nil, bytes.NewBuffer(committeeSubscriptionsBytes), nil); err != nil {
return errors.Wrap(err, "failed to send POST data to REST endpoint")
}
return nil
}

View File

@ -0,0 +1,295 @@
package beacon_api
import (
"bytes"
"context"
"encoding/json"
"errors"
"strconv"
"testing"
"github.com/golang/mock/gomock"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/apimiddleware"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
"github.com/prysmaticlabs/prysm/v3/testing/require"
"github.com/prysmaticlabs/prysm/v3/time/slots"
"github.com/prysmaticlabs/prysm/v3/validator/client/beacon-api/mock"
)
const subscribeCommitteeSubnetsTestEndpoint = "/eth/v1/validator/beacon_committee_subscriptions"
func TestSubscribeCommitteeSubnets_Valid(t *testing.T) {
subscribeSlots := []types.Slot{0, 1, 100}
validatorIndices := []types.ValidatorIndex{2, 3, 4}
committeesAtSlot := []uint64{5, 6, 7}
isAggregator := []bool{false, true, false}
committeeIndices := []types.CommitteeIndex{8, 9, 10}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
jsonCommitteeSubscriptions := make([]*apimiddleware.BeaconCommitteeSubscribeJson, len(subscribeSlots))
for index := range jsonCommitteeSubscriptions {
jsonCommitteeSubscriptions[index] = &apimiddleware.BeaconCommitteeSubscribeJson{
ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[index]), 10),
CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[index]), 10),
CommitteesAtSlot: strconv.FormatUint(committeesAtSlot[index], 10),
Slot: strconv.FormatUint(uint64(subscribeSlots[index]), 10),
IsAggregator: isAggregator[index],
}
}
committeeSubscriptionsBytes, err := json.Marshal(jsonCommitteeSubscriptions)
require.NoError(t, err)
ctx := context.Background()
jsonRestHandler := mock.NewMockjsonRestHandler(ctrl)
jsonRestHandler.EXPECT().PostRestJson(
ctx,
subscribeCommitteeSubnetsTestEndpoint,
nil,
bytes.NewBuffer(committeeSubscriptionsBytes),
nil,
).Return(
nil,
nil,
).Times(1)
duties := make([]*apimiddleware.AttesterDutyJson, len(subscribeSlots))
for index := range duties {
duties[index] = &apimiddleware.AttesterDutyJson{
ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[index]), 10),
CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[index]), 10),
CommitteesAtSlot: strconv.FormatUint(committeesAtSlot[index], 10),
Slot: strconv.FormatUint(uint64(subscribeSlots[index]), 10),
}
}
// Even though we have 3 distinct slots, the first 2 ones are in the same epoch so we should only send 2 requests to the beacon node
dutiesProvider := mock.NewMockdutiesProvider(ctrl)
dutiesProvider.EXPECT().GetAttesterDuties(
ctx,
slots.ToEpoch(subscribeSlots[0]),
validatorIndices,
).Return(
[]*apimiddleware.AttesterDutyJson{
{
CommitteesAtSlot: strconv.FormatUint(committeesAtSlot[0], 10),
Slot: strconv.FormatUint(uint64(subscribeSlots[0]), 10),
},
{
CommitteesAtSlot: strconv.FormatUint(committeesAtSlot[1], 10),
Slot: strconv.FormatUint(uint64(subscribeSlots[1]), 10),
},
},
nil,
).Times(1)
dutiesProvider.EXPECT().GetAttesterDuties(
ctx,
slots.ToEpoch(subscribeSlots[2]),
validatorIndices,
).Return(
[]*apimiddleware.AttesterDutyJson{
{
CommitteesAtSlot: strconv.FormatUint(committeesAtSlot[2], 10),
Slot: strconv.FormatUint(uint64(subscribeSlots[2]), 10),
},
},
nil,
).Times(1)
validatorClient := &beaconApiValidatorClient{
jsonRestHandler: jsonRestHandler,
dutiesProvider: dutiesProvider,
}
err = validatorClient.subscribeCommitteeSubnets(
ctx,
&ethpb.CommitteeSubnetsSubscribeRequest{
Slots: subscribeSlots,
CommitteeIds: committeeIndices,
IsAggregator: isAggregator,
},
validatorIndices,
)
require.NoError(t, err)
}
func TestSubscribeCommitteeSubnets_Error(t *testing.T) {
const arraySizeMismatchErrorMessage = "arrays `in.CommitteeIds`, `in.Slots`, `in.IsAggregator` and `validatorIndices` don't have the same length"
testCases := []struct {
name string
subscribeRequest *ethpb.CommitteeSubnetsSubscribeRequest
validatorIndices []types.ValidatorIndex
attesterDuty *apimiddleware.AttesterDutyJson
dutiesError error
expectGetDutiesQuery bool
expectSubscribeRestCall bool
expectedErrorMessage string
}{
{
name: "nil subscribe request",
subscribeRequest: nil,
expectedErrorMessage: "committee subnets subscribe request is nil",
},
{
name: "CommitteeIds size mismatch",
subscribeRequest: &ethpb.CommitteeSubnetsSubscribeRequest{
CommitteeIds: []types.CommitteeIndex{1},
Slots: []types.Slot{1, 2},
IsAggregator: []bool{false, true},
},
validatorIndices: []types.ValidatorIndex{1, 2},
expectedErrorMessage: arraySizeMismatchErrorMessage,
},
{
name: "Slots size mismatch",
subscribeRequest: &ethpb.CommitteeSubnetsSubscribeRequest{
CommitteeIds: []types.CommitteeIndex{1, 2},
Slots: []types.Slot{1},
IsAggregator: []bool{false, true},
},
validatorIndices: []types.ValidatorIndex{1, 2},
expectedErrorMessage: arraySizeMismatchErrorMessage,
},
{
name: "IsAggregator size mismatch",
subscribeRequest: &ethpb.CommitteeSubnetsSubscribeRequest{
CommitteeIds: []types.CommitteeIndex{1, 2},
Slots: []types.Slot{1, 2},
IsAggregator: []bool{false},
},
validatorIndices: []types.ValidatorIndex{1, 2},
expectedErrorMessage: arraySizeMismatchErrorMessage,
},
{
name: "ValidatorIndices size mismatch",
subscribeRequest: &ethpb.CommitteeSubnetsSubscribeRequest{
CommitteeIds: []types.CommitteeIndex{1, 2},
Slots: []types.Slot{1, 2},
IsAggregator: []bool{false, true},
},
validatorIndices: []types.ValidatorIndex{1},
expectedErrorMessage: arraySizeMismatchErrorMessage,
},
{
name: "bad duties query",
subscribeRequest: &ethpb.CommitteeSubnetsSubscribeRequest{
Slots: []types.Slot{1},
CommitteeIds: []types.CommitteeIndex{2},
IsAggregator: []bool{false},
},
validatorIndices: []types.ValidatorIndex{3},
dutiesError: errors.New("foo error"),
expectGetDutiesQuery: true,
expectedErrorMessage: "failed to get duties for epoch `0`: foo error",
},
{
name: "bad duty slot",
subscribeRequest: &ethpb.CommitteeSubnetsSubscribeRequest{
Slots: []types.Slot{1},
CommitteeIds: []types.CommitteeIndex{2},
IsAggregator: []bool{false},
},
validatorIndices: []types.ValidatorIndex{3},
attesterDuty: &apimiddleware.AttesterDutyJson{
Slot: "foo",
CommitteesAtSlot: "1",
},
expectGetDutiesQuery: true,
expectedErrorMessage: "failed to parse slot `foo`",
},
{
name: "bad duty committees at slot",
subscribeRequest: &ethpb.CommitteeSubnetsSubscribeRequest{
Slots: []types.Slot{1},
CommitteeIds: []types.CommitteeIndex{2},
IsAggregator: []bool{false},
},
validatorIndices: []types.ValidatorIndex{3},
attesterDuty: &apimiddleware.AttesterDutyJson{
Slot: "1",
CommitteesAtSlot: "foo",
},
expectGetDutiesQuery: true,
expectedErrorMessage: "failed to parse CommitteesAtSlot `foo`",
},
{
name: "missing slot in duties",
subscribeRequest: &ethpb.CommitteeSubnetsSubscribeRequest{
Slots: []types.Slot{1},
CommitteeIds: []types.CommitteeIndex{2},
IsAggregator: []bool{false},
},
validatorIndices: []types.ValidatorIndex{3},
attesterDuty: &apimiddleware.AttesterDutyJson{
Slot: "2",
CommitteesAtSlot: "3",
},
expectGetDutiesQuery: true,
expectedErrorMessage: "failed to get committees for slot `1`",
},
{
name: "bad POST request",
subscribeRequest: &ethpb.CommitteeSubnetsSubscribeRequest{
Slots: []types.Slot{1},
CommitteeIds: []types.CommitteeIndex{2},
IsAggregator: []bool{false},
},
validatorIndices: []types.ValidatorIndex{3},
attesterDuty: &apimiddleware.AttesterDutyJson{
Slot: "1",
CommitteesAtSlot: "2",
},
expectGetDutiesQuery: true,
expectSubscribeRestCall: true,
expectedErrorMessage: "failed to send POST data to REST endpoint: foo error",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx := context.Background()
dutiesProvider := mock.NewMockdutiesProvider(ctrl)
if testCase.expectGetDutiesQuery {
dutiesProvider.EXPECT().GetAttesterDuties(
ctx,
gomock.Any(),
gomock.Any(),
).Return(
[]*apimiddleware.AttesterDutyJson{testCase.attesterDuty},
testCase.dutiesError,
).Times(1)
}
jsonRestHandler := mock.NewMockjsonRestHandler(ctrl)
if testCase.expectSubscribeRestCall {
jsonRestHandler.EXPECT().PostRestJson(
ctx,
subscribeCommitteeSubnetsTestEndpoint,
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(
nil,
errors.New("foo error"),
).Times(1)
}
validatorClient := &beaconApiValidatorClient{
jsonRestHandler: jsonRestHandler,
dutiesProvider: dutiesProvider,
}
err := validatorClient.subscribeCommitteeSubnets(ctx, testCase.subscribeRequest, testCase.validatorIndices)
assert.ErrorContains(t, testCase.expectedErrorMessage, err)
})
}
}

View File

@ -6,6 +6,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v3/validator/client/grpc-api", importpath = "github.com/prysmaticlabs/prysm/v3/validator/client/grpc-api",
visibility = ["//validator:__subpackages__"], visibility = ["//validator:__subpackages__"],
deps = [ deps = [
"//consensus-types/primitives:go_default_library",
"//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1:go_default_library",
"//validator/client/iface:go_default_library", "//validator/client/iface:go_default_library",
"@com_github_pkg_errors//:go_default_library", "@com_github_pkg_errors//:go_default_library",

View File

@ -5,6 +5,7 @@ import (
"github.com/golang/protobuf/ptypes/empty" "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors" "github.com/pkg/errors"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
iface "github.com/prysmaticlabs/prysm/v3/validator/client/iface" iface "github.com/prysmaticlabs/prysm/v3/validator/client/iface"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -98,7 +99,7 @@ func (c *grpcValidatorClient) SubmitValidatorRegistrations(ctx context.Context,
return c.beaconNodeValidatorClient.SubmitValidatorRegistrations(ctx, in) return c.beaconNodeValidatorClient.SubmitValidatorRegistrations(ctx, in)
} }
func (c *grpcValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest) (*empty.Empty, error) { func (c *grpcValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, _ []types.ValidatorIndex) (*empty.Empty, error) {
return c.beaconNodeValidatorClient.SubscribeCommitteeSubnets(ctx, in) return c.beaconNodeValidatorClient.SubscribeCommitteeSubnets(ctx, in)
} }

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/golang/protobuf/ptypes/empty" "github.com/golang/protobuf/ptypes/empty"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
) )
@ -25,7 +26,7 @@ type ValidatorClient interface {
SubmitAggregateSelectionProof(ctx context.Context, in *ethpb.AggregateSelectionRequest) (*ethpb.AggregateSelectionResponse, error) SubmitAggregateSelectionProof(ctx context.Context, in *ethpb.AggregateSelectionRequest) (*ethpb.AggregateSelectionResponse, error)
SubmitSignedAggregateSelectionProof(ctx context.Context, in *ethpb.SignedAggregateSubmitRequest) (*ethpb.SignedAggregateSubmitResponse, error) SubmitSignedAggregateSelectionProof(ctx context.Context, in *ethpb.SignedAggregateSubmitRequest) (*ethpb.SignedAggregateSubmitResponse, error)
ProposeExit(ctx context.Context, in *ethpb.SignedVoluntaryExit) (*ethpb.ProposeExitResponse, error) ProposeExit(ctx context.Context, in *ethpb.SignedVoluntaryExit) (*ethpb.ProposeExitResponse, error)
SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest) (*empty.Empty, error) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, validatorIndices []types.ValidatorIndex) (*empty.Empty, error)
CheckDoppelGanger(ctx context.Context, in *ethpb.DoppelGangerRequest) (*ethpb.DoppelGangerResponse, error) CheckDoppelGanger(ctx context.Context, in *ethpb.DoppelGangerRequest) (*ethpb.DoppelGangerResponse, error)
GetSyncMessageBlockRoot(ctx context.Context, in *empty.Empty) (*ethpb.SyncMessageBlockRootResponse, error) GetSyncMessageBlockRoot(ctx context.Context, in *empty.Empty) (*ethpb.SyncMessageBlockRootResponse, error)
SubmitSyncMessage(ctx context.Context, in *ethpb.SyncCommitteeMessage) (*empty.Empty, error) SubmitSyncMessage(ctx context.Context, in *ethpb.SyncCommitteeMessage) (*empty.Empty, error)

View File

@ -628,6 +628,7 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes
subscribeSlots := make([]types.Slot, 0, len(res.CurrentEpochDuties)+len(res.NextEpochDuties)) subscribeSlots := make([]types.Slot, 0, len(res.CurrentEpochDuties)+len(res.NextEpochDuties))
subscribeCommitteeIndices := make([]types.CommitteeIndex, 0, len(res.CurrentEpochDuties)+len(res.NextEpochDuties)) subscribeCommitteeIndices := make([]types.CommitteeIndex, 0, len(res.CurrentEpochDuties)+len(res.NextEpochDuties))
subscribeIsAggregator := make([]bool, 0, len(res.CurrentEpochDuties)+len(res.NextEpochDuties)) subscribeIsAggregator := make([]bool, 0, len(res.CurrentEpochDuties)+len(res.NextEpochDuties))
subscribeValidatorIndices := make([]types.ValidatorIndex, 0, len(res.CurrentEpochDuties)+len(res.NextEpochDuties))
alreadySubscribed := make(map[[64]byte]bool) alreadySubscribed := make(map[[64]byte]bool)
for _, duty := range res.CurrentEpochDuties { for _, duty := range res.CurrentEpochDuties {
@ -635,6 +636,7 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes
if duty.Status == ethpb.ValidatorStatus_ACTIVE || duty.Status == ethpb.ValidatorStatus_EXITING { if duty.Status == ethpb.ValidatorStatus_ACTIVE || duty.Status == ethpb.ValidatorStatus_EXITING {
attesterSlot := duty.AttesterSlot attesterSlot := duty.AttesterSlot
committeeIndex := duty.CommitteeIndex committeeIndex := duty.CommitteeIndex
validatorIndex := duty.ValidatorIndex
alreadySubscribedKey := validatorSubscribeKey(attesterSlot, committeeIndex) alreadySubscribedKey := validatorSubscribeKey(attesterSlot, committeeIndex)
if _, ok := alreadySubscribed[alreadySubscribedKey]; ok { if _, ok := alreadySubscribed[alreadySubscribedKey]; ok {
@ -652,6 +654,7 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes
subscribeSlots = append(subscribeSlots, attesterSlot) subscribeSlots = append(subscribeSlots, attesterSlot)
subscribeCommitteeIndices = append(subscribeCommitteeIndices, committeeIndex) subscribeCommitteeIndices = append(subscribeCommitteeIndices, committeeIndex)
subscribeIsAggregator = append(subscribeIsAggregator, aggregator) subscribeIsAggregator = append(subscribeIsAggregator, aggregator)
subscribeValidatorIndices = append(subscribeValidatorIndices, validatorIndex)
} }
} }
@ -659,6 +662,7 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes
if duty.Status == ethpb.ValidatorStatus_ACTIVE || duty.Status == ethpb.ValidatorStatus_EXITING { if duty.Status == ethpb.ValidatorStatus_ACTIVE || duty.Status == ethpb.ValidatorStatus_EXITING {
attesterSlot := duty.AttesterSlot attesterSlot := duty.AttesterSlot
committeeIndex := duty.CommitteeIndex committeeIndex := duty.CommitteeIndex
validatorIndex := duty.ValidatorIndex
alreadySubscribedKey := validatorSubscribeKey(attesterSlot, committeeIndex) alreadySubscribedKey := validatorSubscribeKey(attesterSlot, committeeIndex)
if _, ok := alreadySubscribed[alreadySubscribedKey]; ok { if _, ok := alreadySubscribed[alreadySubscribedKey]; ok {
@ -676,14 +680,18 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes
subscribeSlots = append(subscribeSlots, attesterSlot) subscribeSlots = append(subscribeSlots, attesterSlot)
subscribeCommitteeIndices = append(subscribeCommitteeIndices, committeeIndex) subscribeCommitteeIndices = append(subscribeCommitteeIndices, committeeIndex)
subscribeIsAggregator = append(subscribeIsAggregator, aggregator) subscribeIsAggregator = append(subscribeIsAggregator, aggregator)
subscribeValidatorIndices = append(subscribeValidatorIndices, validatorIndex)
} }
} }
_, err := v.validatorClient.SubscribeCommitteeSubnets(ctx, &ethpb.CommitteeSubnetsSubscribeRequest{ _, err := v.validatorClient.SubscribeCommitteeSubnets(ctx,
Slots: subscribeSlots, &ethpb.CommitteeSubnetsSubscribeRequest{
CommitteeIds: subscribeCommitteeIndices, Slots: subscribeSlots,
IsAggregator: subscribeIsAggregator, CommitteeIds: subscribeCommitteeIndices,
}) IsAggregator: subscribeIsAggregator,
},
subscribeValidatorIndices,
)
return err return err
} }

View File

@ -539,7 +539,8 @@ func TestUpdateDuties_OK(t *testing.T) {
client.EXPECT().SubscribeCommitteeSubnets( client.EXPECT().SubscribeCommitteeSubnets(
gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(),
).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest) (*emptypb.Empty, error) { gomock.Any(),
).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest, _ []types.ValidatorIndex) (*emptypb.Empty, error) {
wg.Done() wg.Done()
return nil, nil return nil, nil
}) })
@ -594,7 +595,8 @@ func TestUpdateDuties_OK_FilterBlacklistedPublicKeys(t *testing.T) {
client.EXPECT().SubscribeCommitteeSubnets( client.EXPECT().SubscribeCommitteeSubnets(
gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(),
).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest) (*emptypb.Empty, error) { gomock.Any(),
).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest, _ []types.ValidatorIndex) (*emptypb.Empty, error) {
wg.Done() wg.Done()
return nil, nil return nil, nil
}) })