prysm-pulse/validator/client/service.go
james-prysm db687bf56d
Validator client builder support (#10749)
* Startinb builder service and interface

* Get header from builder

* Add get builder block

* Single validator registration

* Add mev-builder http cli flag

* Add method to verify registration signature

* Add builder registration

* Add submit validator registration

* suporting yaml

* fix yaml unmarshaling

* rolling back some changes from unmarshal from file

* adding yaml support

* adding register validator support

* added new validator requests into client/validator

* fixing gofmt

* updating flags and including gas limit, unit tests are still broken

* fixing bazel

* more name changes and fixing unit tests

* fixing unit tests and renaming functions

* fixing unit tests and renaming to match changes

* adding new test for yaml

* fixing bazel linter

* reverting change on validator service proto

* adding clarifying logs

* renaming function name to be more descriptive

* renaming variable

* rolling back some files that will be added from the builder-1 branch

* reverting more

* more reverting

* need placeholder

* need placeholder

* fixing unit test

* fixing unit test

* fixing unit test

* fixing unit test

* fixing more unit tests

* fixing more unit tests

* rolling back mockgen

* fixing bazel

* rolling back changes

* removing duplicate function

* fixing client mock

* removing unused type

* fixing missing brace

* fixing bad field name

* fixing bazel

* updating naming

* fixing bazel

* fixing unit test

* fixing bazel linting

* unhandled err

* fixing gofmt

* simplifying name based on feedback

* using corrected function

* moving default fee recipient and gaslimit to beaconconfig

* missing a few constant changes

* fixing bazel

* fixing more missed default renames

* fixing more constants in tests

* fixing bazel

* adding update proposer setting per epoch

* refactoring to reduce complexity

* adding unit test for proposer settings

* Update validator/client/validator.go

Co-authored-by: terencechain <terence@prysmaticlabs.com>

* trying out renaming based on feedback

* adjusting based on review comments

* making tests more appropriate

* fixing bazel

* updating flag description based on review feedback

* addressing review feedback

* switching to pushing at start of epoch for more time

* adding new unit test and properly throwing error

* switching keys in error to count

* fixing log variable

* resolving conflict

* resolving more conflicts

* adjusting error message

Co-authored-by: terence tsao <terence@prysmaticlabs.com>
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2022-06-06 19:32:41 +00:00

323 lines
11 KiB
Go

package client
import (
"context"
"strings"
"time"
"github.com/dgraph-io/ristretto"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
grpcutil "github.com/prysmaticlabs/prysm/api/grpc"
"github.com/prysmaticlabs/prysm/async/event"
lruwrpr "github.com/prysmaticlabs/prysm/cache/lru"
fieldparams "github.com/prysmaticlabs/prysm/config/fieldparams"
"github.com/prysmaticlabs/prysm/config/params"
validator_service_config "github.com/prysmaticlabs/prysm/config/validator/service"
"github.com/prysmaticlabs/prysm/consensus-types/interfaces"
types "github.com/prysmaticlabs/prysm/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/validator/accounts/wallet"
"github.com/prysmaticlabs/prysm/validator/client/iface"
"github.com/prysmaticlabs/prysm/validator/db"
"github.com/prysmaticlabs/prysm/validator/graffiti"
"github.com/prysmaticlabs/prysm/validator/keymanager"
"github.com/prysmaticlabs/prysm/validator/keymanager/local"
remote_web3signer "github.com/prysmaticlabs/prysm/validator/keymanager/remote-web3signer"
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/types/known/emptypb"
)
// SyncChecker is able to determine if a beacon node is currently
// going through chain synchronization.
type SyncChecker interface {
Syncing(ctx context.Context) (bool, error)
}
// GenesisFetcher can retrieve genesis information such as
// the genesis time and the validator deposit contract address.
type GenesisFetcher interface {
GenesisInfo(ctx context.Context) (*ethpb.Genesis, error)
}
// ValidatorService represents a service to manage the validator client
// routine.
type ValidatorService struct {
useWeb bool
emitAccountMetrics bool
logValidatorBalances bool
logDutyCountDown bool
interopKeysConfig *local.InteropKeymanagerConfig
conn *grpc.ClientConn
grpcRetryDelay time.Duration
grpcRetries uint
maxCallRecvMsgSize int
cancel context.CancelFunc
walletInitializedFeed *event.Feed
wallet *wallet.Wallet
graffitiStruct *graffiti.Graffiti
dataDir string
withCert string
endpoint string
ctx context.Context
validator iface.Validator
db db.Database
grpcHeaders []string
graffiti []byte
Web3SignerConfig *remote_web3signer.SetupConfig
proposerSettings *validator_service_config.ProposerSettings
}
// Config for the validator service.
type Config struct {
UseWeb bool
LogValidatorBalances bool
EmitAccountMetrics bool
LogDutyCountDown bool
InteropKeysConfig *local.InteropKeymanagerConfig
Wallet *wallet.Wallet
WalletInitializedFeed *event.Feed
GrpcRetriesFlag uint
GrpcMaxCallRecvMsgSizeFlag int
GrpcRetryDelay time.Duration
GraffitiStruct *graffiti.Graffiti
Validator iface.Validator
ValDB db.Database
CertFlag string
DataDir string
GrpcHeadersFlag string
GraffitiFlag string
Endpoint string
Web3SignerConfig *remote_web3signer.SetupConfig
ProposerSettings *validator_service_config.ProposerSettings
}
// NewValidatorService creates a new validator service for the service
// registry.
func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, error) {
ctx, cancel := context.WithCancel(ctx)
s := &ValidatorService{
ctx: ctx,
cancel: cancel,
endpoint: cfg.Endpoint,
withCert: cfg.CertFlag,
dataDir: cfg.DataDir,
graffiti: []byte(cfg.GraffitiFlag),
logValidatorBalances: cfg.LogValidatorBalances,
emitAccountMetrics: cfg.EmitAccountMetrics,
maxCallRecvMsgSize: cfg.GrpcMaxCallRecvMsgSizeFlag,
grpcRetries: cfg.GrpcRetriesFlag,
grpcRetryDelay: cfg.GrpcRetryDelay,
grpcHeaders: strings.Split(cfg.GrpcHeadersFlag, ","),
validator: cfg.Validator,
db: cfg.ValDB,
wallet: cfg.Wallet,
walletInitializedFeed: cfg.WalletInitializedFeed,
useWeb: cfg.UseWeb,
interopKeysConfig: cfg.InteropKeysConfig,
graffitiStruct: cfg.GraffitiStruct,
logDutyCountDown: cfg.LogDutyCountDown,
Web3SignerConfig: cfg.Web3SignerConfig,
proposerSettings: cfg.ProposerSettings,
}
dialOpts := ConstructDialOptions(
s.maxCallRecvMsgSize,
s.withCert,
s.grpcRetries,
s.grpcRetryDelay,
)
if dialOpts == nil {
return s, nil
}
s.ctx = grpcutil.AppendHeaders(ctx, s.grpcHeaders)
conn, err := grpc.DialContext(ctx, s.endpoint, dialOpts...)
if err != nil {
return s, err
}
if s.withCert != "" {
log.Info("Established secure gRPC connection")
}
s.conn = conn
return s, nil
}
// Start the validator service. Launches the main go routine for the validator
// client.
func (v *ValidatorService) Start() {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1920, // number of keys to track.
MaxCost: 192, // maximum cost of cache, 1 item = 1 cost.
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
panic(err)
}
aggregatedSlotCommitteeIDCache := lruwrpr.New(int(params.BeaconConfig().MaxCommitteesPerSlot))
sPubKeys, err := v.db.EIPImportBlacklistedPublicKeys(v.ctx)
if err != nil {
log.Errorf("Could not read slashable public keys from disk: %v", err)
return
}
slashablePublicKeys := make(map[[fieldparams.BLSPubkeyLength]byte]bool)
for _, pubKey := range sPubKeys {
slashablePublicKeys[pubKey] = true
}
graffitiOrderedIndex, err := v.db.GraffitiOrderedIndex(v.ctx, v.graffitiStruct.Hash)
if err != nil {
log.Errorf("Could not read graffiti ordered index from disk: %v", err)
return
}
valStruct := &validator{
db: v.db,
validatorClient: ethpb.NewBeaconNodeValidatorClient(v.conn),
beaconClient: ethpb.NewBeaconChainClient(v.conn),
slashingProtectionClient: ethpb.NewSlasherClient(v.conn),
node: ethpb.NewNodeClient(v.conn),
graffiti: v.graffiti,
logValidatorBalances: v.logValidatorBalances,
emitAccountMetrics: v.emitAccountMetrics,
startBalances: make(map[[fieldparams.BLSPubkeyLength]byte]uint64),
prevBalance: make(map[[fieldparams.BLSPubkeyLength]byte]uint64),
pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]types.ValidatorIndex),
attLogs: make(map[[32]byte]*attSubmitted),
domainDataCache: cache,
aggregatedSlotCommitteeIDCache: aggregatedSlotCommitteeIDCache,
voteStats: voteStats{startEpoch: types.Epoch(^uint64(0))},
useWeb: v.useWeb,
interopKeysConfig: v.interopKeysConfig,
wallet: v.wallet,
walletInitializedFeed: v.walletInitializedFeed,
blockFeed: new(event.Feed),
graffitiStruct: v.graffitiStruct,
graffitiOrderedIndex: graffitiOrderedIndex,
eipImportBlacklistedPublicKeys: slashablePublicKeys,
logDutyCountDown: v.logDutyCountDown,
Web3SignerConfig: v.Web3SignerConfig,
ProposerSettings: v.proposerSettings,
walletIntializedChannel: make(chan *wallet.Wallet, 1),
}
// To resolve a race condition at startup due to the interface
// nature of the abstracted block type. We initialize
// the inner type of the feed before hand. So that
// during future accesses, there will be no panics here
// from type incompatibility.
tempChan := make(chan interfaces.SignedBeaconBlock)
sub := valStruct.blockFeed.Subscribe(tempChan)
sub.Unsubscribe()
close(tempChan)
v.validator = valStruct
go run(v.ctx, v.validator)
}
// Stop the validator service.
func (v *ValidatorService) Stop() error {
v.cancel()
log.Info("Stopping service")
if v.conn != nil {
return v.conn.Close()
}
return nil
}
// Status of the validator service.
func (v *ValidatorService) Status() error {
if v.conn == nil {
return errors.New("no connection to beacon RPC")
}
return nil
}
// InteropKeysConfig returns the useInteropKeys flag.
func (v *ValidatorService) InteropKeysConfig() *local.InteropKeymanagerConfig {
return v.interopKeysConfig
}
func (v *ValidatorService) Keymanager() (keymanager.IKeymanager, error) {
return v.validator.Keymanager()
}
// ConstructDialOptions constructs a list of grpc dial options
func ConstructDialOptions(
maxCallRecvMsgSize int,
withCert string,
grpcRetries uint,
grpcRetryDelay time.Duration,
extraOpts ...grpc.DialOption,
) []grpc.DialOption {
var transportSecurity grpc.DialOption
if withCert != "" {
creds, err := credentials.NewClientTLSFromFile(withCert, "")
if err != nil {
log.Errorf("Could not get valid credentials: %v", err)
return nil
}
transportSecurity = grpc.WithTransportCredentials(creds)
} else {
transportSecurity = grpc.WithInsecure()
log.Warn("You are using an insecure gRPC connection. If you are running your beacon node and " +
"validator on the same machines, you can ignore this message. If you want to know " +
"how to enable secure connections, see: https://docs.prylabs.network/docs/prysm-usage/secure-grpc")
}
if maxCallRecvMsgSize == 0 {
maxCallRecvMsgSize = 10 * 5 << 20 // Default 50Mb
}
dialOpts := []grpc.DialOption{
transportSecurity,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxCallRecvMsgSize),
grpc_retry.WithMax(grpcRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(grpcRetryDelay)),
),
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(
grpc_opentracing.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_retry.UnaryClientInterceptor(),
grpcutil.LogRequests,
)),
grpc.WithChainStreamInterceptor(
grpcutil.LogStream,
grpc_opentracing.StreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_retry.StreamClientInterceptor(),
),
grpc.WithResolvers(&multipleEndpointsGrpcResolverBuilder{}),
}
dialOpts = append(dialOpts, extraOpts...)
return dialOpts
}
// Syncing returns whether or not the beacon node is currently synchronizing the chain.
func (v *ValidatorService) Syncing(ctx context.Context) (bool, error) {
nc := ethpb.NewNodeClient(v.conn)
resp, err := nc.GetSyncStatus(ctx, &emptypb.Empty{})
if err != nil {
return false, err
}
return resp.Syncing, nil
}
// GenesisInfo queries the beacon node for the chain genesis info containing
// the genesis time along with the validator deposit contract address.
func (v *ValidatorService) GenesisInfo(ctx context.Context) (*ethpb.Genesis, error) {
nc := ethpb.NewNodeClient(v.conn)
return nc.GetGenesis(ctx, &emptypb.Empty{})
}