prysm-pulse/validator/client/service.go
Ivan Martinez 4ab0a91e51
Validator Slashing Protection DB (#4389)
* Begin adding DB to validator client

Begin adding ValidatorProposalHistory

Implement most of proposal history

Finish tests

Fix marking a proposal for the first time

Change proposalhistory to not using bit shifting

Add pb.go

Change after proto/slashing added

Finally fix protos

Fix most tests

Fix all tests for double proposal protection

Start initialiing DB in validator client

Add db to validator struct

Add DB to ProposeBlock

Fix test errors and begin mocking

Fix test formatting and pass test for validator protection!

Fix merge issues

Fix renames

Fix tests

* Fix tests

* Fix first startup on DB

* Fix nil check tests

* Fix E2E

* Fix e2e flag

* Fix comments

* Fix for comments

* Move proposal hepers to validator/client to keep DB clean

* Add clear-db flag to validator client

* Fix formatting

* Clear out unintended changes

* Fix build issues

* Fix build issues

* Gazelle

* Fix mock test

* Remove proposal history

* Add terminal confirmation to DB clearing

* Add interface for validatorDB, add context to DB functions

* Add force-clear-db flag

* Cleanup

* Update validator/node/node.go

Co-Authored-By: Raul Jordan <raul@prysmaticlabs.com>

* Change db to clear file, not whole folder

* Fix db test

* Fix teardown test

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
2020-01-08 13:16:17 -05:00

148 lines
4.3 KiB
Go

package client
import (
"context"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/validator/db"
"github.com/prysmaticlabs/prysm/validator/keymanager"
"github.com/sirupsen/logrus"
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
var log = logrus.WithField("prefix", "validator")
// ValidatorService represents a service to manage the validator client
// routine.
type ValidatorService struct {
ctx context.Context
cancel context.CancelFunc
validator Validator
graffiti []byte
conn *grpc.ClientConn
endpoint string
withCert string
dataDir string
keyManager keymanager.KeyManager
logValidatorBalances bool
}
// Config for the validator service.
type Config struct {
Endpoint string
DataDir string
CertFlag string
GraffitiFlag string
KeyManager keymanager.KeyManager
LogValidatorBalances bool
}
// NewValidatorService creates a new validator service for the service
// registry.
func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, error) {
ctx, cancel := context.WithCancel(ctx)
return &ValidatorService{
ctx: ctx,
cancel: cancel,
endpoint: cfg.Endpoint,
withCert: cfg.CertFlag,
dataDir: cfg.DataDir,
graffiti: []byte(cfg.GraffitiFlag),
keyManager: cfg.KeyManager,
logValidatorBalances: cfg.LogValidatorBalances,
}, nil
}
// Start the validator service. Launches the main go routine for the validator
// client.
func (v *ValidatorService) Start() {
var dialOpt grpc.DialOption
if v.withCert != "" {
creds, err := credentials.NewClientTLSFromFile(v.withCert, "")
if err != nil {
log.Errorf("Could not get valid credentials: %v", err)
return
}
dialOpt = grpc.WithTransportCredentials(creds)
} else {
dialOpt = grpc.WithInsecure()
log.Warn("You are using an insecure gRPC connection! Please provide a certificate and key to use a secure connection.")
}
opts := []grpc.DialOption{
dialOpt,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(10 * 5 << 20), // 10Mb
),
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithStreamInterceptor(middleware.ChainStreamClient(
grpc_opentracing.StreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
)),
grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(
grpc_opentracing.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
)),
}
conn, err := grpc.DialContext(v.ctx, v.endpoint, opts...)
if err != nil {
log.Errorf("Could not dial endpoint: %s, %v", v.endpoint, err)
return
}
log.Info("Successfully started gRPC connection")
pubkeys, err := v.keyManager.FetchValidatingKeys()
if err != nil {
log.Errorf("Could not get validating keys: %v", err)
return
}
valDB, err := db.NewKVStore(v.dataDir, pubkeys)
if err != nil {
log.Errorf("Could not initialize db: %v", err)
return
}
v.conn = conn
v.validator = &validator{
db: valDB,
validatorClient: ethpb.NewBeaconNodeValidatorClient(v.conn),
beaconClient: ethpb.NewBeaconChainClient(v.conn),
aggregatorClient: pb.NewAggregatorServiceClient(v.conn),
node: ethpb.NewNodeClient(v.conn),
keyManager: v.keyManager,
graffiti: v.graffiti,
logValidatorBalances: v.logValidatorBalances,
prevBalance: make(map[[48]byte]uint64),
attLogs: make(map[[32]byte]*attSubmitted),
pubKeyToID: make(map[[48]byte]uint64),
}
go run(v.ctx, v.validator)
}
// Stop the validator service.
func (v *ValidatorService) Stop() error {
v.cancel()
log.Info("Stopping service")
if v.conn != nil {
return v.conn.Close()
}
return nil
}
// Status ...
//
// WIP - not done.
func (v *ValidatorService) Status() error {
if v.conn == nil {
return errors.New("no connection to beacon RPC")
}
return nil
}