From 29596bf8627bf708019d186f8fb2d8b990932886 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Wed, 1 Aug 2018 17:08:44 -0500 Subject: [PATCH] beacon: Beacon Node gRPC Server and Client Services (#364) --- beacon-chain/main.go | 2 +- beacon-chain/node/BUILD.bazel | 1 + beacon-chain/node/node.go | 24 +++++++-- beacon-chain/rpc/BUILD.bazel | 23 +++++++++ beacon-chain/rpc/service.go | 84 ++++++++++++++++++++++++++++++++ beacon-chain/rpc/service_test.go | 36 ++++++++++++++ beacon-chain/utils/flags.go | 6 +++ client/main.go | 2 +- client/node/BUILD.bazel | 1 + client/node/node.go | 14 ++++++ client/rpcclient/BUILD.bazel | 23 +++++++++ client/rpcclient/service.go | 59 ++++++++++++++++++++++ client/rpcclient/service_test.go | 21 ++++++++ client/utils/flags.go | 6 +++ 14 files changed, 295 insertions(+), 7 deletions(-) create mode 100644 beacon-chain/rpc/BUILD.bazel create mode 100644 beacon-chain/rpc/service.go create mode 100644 beacon-chain/rpc/service_test.go create mode 100644 client/rpcclient/BUILD.bazel create mode 100644 client/rpcclient/service.go create mode 100644 client/rpcclient/service_test.go diff --git a/beacon-chain/main.go b/beacon-chain/main.go index 920577471..6f89a2079 100644 --- a/beacon-chain/main.go +++ b/beacon-chain/main.go @@ -58,7 +58,7 @@ VERSION: app.Usage = "this is a beacon chain implementation for Ethereum 2.0" app.Action = startNode - app.Flags = []cli.Flag{utils.SimulatorFlag, utils.VrcContractFlag, utils.PubKeyFlag, utils.Web3ProviderFlag, cmd.DataDirFlag, cmd.VerbosityFlag, debug.PProfFlag, debug.PProfAddrFlag, debug.PProfPortFlag, debug.MemProfileRateFlag, debug.CPUProfileFlag, debug.TraceFlag} + app.Flags = []cli.Flag{utils.SimulatorFlag, utils.VrcContractFlag, utils.PubKeyFlag, utils.Web3ProviderFlag, utils.RPCPort, cmd.DataDirFlag, cmd.VerbosityFlag, debug.PProfFlag, debug.PProfAddrFlag, debug.PProfPortFlag, debug.MemProfileRateFlag, debug.CPUProfileFlag, debug.TraceFlag} app.Before = func(ctx *cli.Context) error { runtime.GOMAXPROCS(runtime.NumCPU()) diff --git a/beacon-chain/node/BUILD.bazel b/beacon-chain/node/BUILD.bazel index ce0b6cb83..f8ba6eeea 100644 --- a/beacon-chain/node/BUILD.bazel +++ b/beacon-chain/node/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//beacon-chain/blockchain:go_default_library", "//beacon-chain/powchain:go_default_library", + "//beacon-chain/rpc:go_default_library", "//beacon-chain/simulator:go_default_library", "//beacon-chain/sync:go_default_library", "//beacon-chain/utils:go_default_library", diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 5d578c1ca..26beb4f95 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/powchain" + "github.com/prysmaticlabs/prysm/beacon-chain/rpc" "github.com/prysmaticlabs/prysm/beacon-chain/simulator" rbcsync "github.com/prysmaticlabs/prysm/beacon-chain/sync" "github.com/prysmaticlabs/prysm/beacon-chain/utils" @@ -64,16 +65,18 @@ func New(ctx *cli.Context) (*BeaconNode, error) { return nil, err } - if ctx.GlobalBool(utils.SimulatorFlag.Name) { - if err := beacon.registerSimulatorService(); err != nil { - return nil, err - } + if err := beacon.registerSimulatorService(ctx); err != nil { + return nil, err } if err := beacon.registerSyncService(); err != nil { return nil, err } + if err := beacon.registerRPCService(ctx); err != nil { + return nil, err + } + return beacon, nil } @@ -180,7 +183,10 @@ func (b *BeaconNode) registerSyncService() error { return b.services.RegisterService(syncService) } -func (b *BeaconNode) registerSimulatorService() error { +func (b *BeaconNode) registerSimulatorService(ctx *cli.Context) error { + if !ctx.GlobalBool(utils.SimulatorFlag.Name) { + return nil + } var p2pService *p2p.Server if err := b.services.FetchService(&p2pService); err != nil { return err @@ -200,3 +206,11 @@ func (b *BeaconNode) registerSimulatorService() error { simulatorService := simulator.NewSimulator(context.TODO(), cfg, p2pService, web3Service, chainService) return b.services.RegisterService(simulatorService) } + +func (b *BeaconNode) registerRPCService(ctx *cli.Context) error { + port := ctx.GlobalString(utils.RPCPort.Name) + rpcService := rpc.NewRPCService(context.TODO(), &rpc.Config{ + Port: port, + }) + return b.services.RegisterService(rpcService) +} diff --git a/beacon-chain/rpc/BUILD.bazel b/beacon-chain/rpc/BUILD.bazel new file mode 100644 index 000000000..71e620f1d --- /dev/null +++ b/beacon-chain/rpc/BUILD.bazel @@ -0,0 +1,23 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["service.go"], + importpath = "github.com/prysmaticlabs/prysm/beacon-chain/rpc", + visibility = ["//beacon-chain:__subpackages__"], + deps = [ + "//proto/beacon/rpc/v1:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + "@org_golang_google_grpc//:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["service_test.go"], + embed = [":go_default_library"], + deps = [ + "//shared/testutil:go_default_library", + "@com_github_sirupsen_logrus//hooks/test:go_default_library", + ], +) diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go new file mode 100644 index 000000000..2dd98e226 --- /dev/null +++ b/beacon-chain/rpc/service.go @@ -0,0 +1,84 @@ +package rpc + +import ( + "context" + "fmt" + "net" + + pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1" + "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +var log = logrus.WithField("prefix", "rpc") + +// Service defining an RPC server for a beacon node. +type Service struct { + ctx context.Context + cancel context.CancelFunc + port string + listener net.Listener +} + +// Config options for the beacon node RPC server. +type Config struct { + Port string +} + +// NewRPCService creates a new instance of a struct implementing the BeaconServiceServer +// interface. +func NewRPCService(ctx context.Context, cfg *Config) *Service { + ctx, cancel := context.WithCancel(ctx) + return &Service{ + ctx: ctx, + cancel: cancel, + port: cfg.Port, + } +} + +// Start the gRPC server. +func (s *Service) Start() { + log.Info("Starting service") + lis, err := net.Listen("tcp", fmt.Sprintf(":%s", s.port)) + if err != nil { + log.Errorf("Could not listen to port :%s: %v", s.port, err) + return + } + s.listener = lis + log.Infof("RPC server listening on port :%s", s.port) + + grpcServer := grpc.NewServer() + pb.RegisterBeaconServiceServer(grpcServer, s) + go func() { + err = grpcServer.Serve(lis) + if err != nil { + log.Errorf("Could not serve gRPC: %v", err) + } + }() +} + +// Stop the service. +func (s *Service) Stop() error { + log.Info("Stopping service") + if s.listener != nil { + return s.listener.Close() + } + return nil +} + +// ShuffleValidators shuffles the validators into attesters/proposers. +func (s *Service) ShuffleValidators(req *pb.ShuffleRequest, stream pb.BeaconService_ShuffleValidatorsServer) error { + return stream.Send(&pb.ShuffleResponse{IsProposer: true, IsAttester: false}) +} + +// ProposeBlock is called by a proposer in a sharding client and a full beacon node +// the request into a beacon block that can then be included in a canonical chain. +func (s *Service) ProposeBlock(ctx context.Context, req *pb.ProposeRequest) (*pb.ProposeResponse, error) { + return nil, nil +} + +// SignBlock is a function called by an attester in a sharding client to sign off +// on a block. +func (s *Service) SignBlock(ctx context.Context, req *pb.SignRequest) (*pb.SignResponse, error) { + return nil, nil +} diff --git a/beacon-chain/rpc/service_test.go b/beacon-chain/rpc/service_test.go new file mode 100644 index 000000000..fac67a320 --- /dev/null +++ b/beacon-chain/rpc/service_test.go @@ -0,0 +1,36 @@ +package rpc + +import ( + "context" + "fmt" + "testing" + + "github.com/prysmaticlabs/prysm/shared/testutil" + logTest "github.com/sirupsen/logrus/hooks/test" +) + +func TestLifecycle(t *testing.T) { + hook := logTest.NewGlobal() + rpcService := NewRPCService(context.Background(), &Config{Port: "9999"}) + + rpcService.Start() + + testutil.AssertLogsContain(t, hook, "Starting service") + testutil.AssertLogsContain(t, hook, fmt.Sprintf("RPC server listening on port :%s", rpcService.port)) + + rpcService.Stop() + testutil.AssertLogsContain(t, hook, "Stopping service") +} + +func TestBadEndpoint(t *testing.T) { + hook := logTest.NewGlobal() + rpcService := NewRPCService(context.Background(), &Config{Port: "ralph merkle!!!"}) + + rpcService.Start() + + testutil.AssertLogsContain(t, hook, "Starting service") + testutil.AssertLogsContain(t, hook, fmt.Sprintf("Could not listen to port :%s", rpcService.port)) + + rpcService.Stop() + testutil.AssertLogsContain(t, hook, "Stopping service") +} diff --git a/beacon-chain/utils/flags.go b/beacon-chain/utils/flags.go index 3266c7008..51db1e6dc 100644 --- a/beacon-chain/utils/flags.go +++ b/beacon-chain/utils/flags.go @@ -26,4 +26,10 @@ var ( Name: "pubkey", Usage: "Validator's public key. Beacon chain node will listen to VRC log to determine when registration has completed based on this public key address.", } + // RPCPort defines a beacon node RPC port to open. + RPCPort = cli.StringFlag{ + Name: "rpc-port", + Usage: "RPC port exposed by a beacon node", + Value: "4000", + } ) diff --git a/client/main.go b/client/main.go index 8a5274788..9c04e1c2e 100644 --- a/client/main.go +++ b/client/main.go @@ -54,7 +54,7 @@ VERSION: app.Usage = `launches a sharding client that interacts with a beacon chain, starts proposer services, shardp2p connections, and more ` app.Action = startNode - app.Flags = []cli.Flag{utils.ActorFlag, cmd.VerbosityFlag, cmd.DataDirFlag, cmd.PasswordFileFlag, cmd.NetworkIDFlag, cmd.IPCPathFlag, cmd.RPCProviderFlag, utils.DepositFlag, utils.ShardIDFlag, debug.PProfFlag, debug.PProfAddrFlag, debug.PProfPortFlag, debug.MemProfileRateFlag, debug.CPUProfileFlag, debug.TraceFlag} + app.Flags = []cli.Flag{utils.ActorFlag, utils.BeaconRPCProviderFlag, cmd.VerbosityFlag, cmd.DataDirFlag, cmd.PasswordFileFlag, cmd.NetworkIDFlag, cmd.IPCPathFlag, cmd.RPCProviderFlag, utils.DepositFlag, utils.ShardIDFlag, debug.PProfFlag, debug.PProfAddrFlag, debug.PProfPortFlag, debug.MemProfileRateFlag, debug.CPUProfileFlag, debug.TraceFlag} app.Before = func(ctx *cli.Context) error { runtime.GOMAXPROCS(runtime.NumCPU()) diff --git a/client/node/BUILD.bazel b/client/node/BUILD.bazel index 114c4e212..ffac23de9 100644 --- a/client/node/BUILD.bazel +++ b/client/node/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "//client/observer:go_default_library", "//client/params:go_default_library", "//client/proposer:go_default_library", + "//client/rpcclient:go_default_library", "//client/simulator:go_default_library", "//client/syncer:go_default_library", "//client/txpool:go_default_library", diff --git a/client/node/node.go b/client/node/node.go index 721cdb488..912a4269c 100644 --- a/client/node/node.go +++ b/client/node/node.go @@ -5,6 +5,7 @@ package node import ( + "context" "fmt" "os" "os/signal" @@ -18,6 +19,7 @@ import ( "github.com/prysmaticlabs/prysm/client/observer" "github.com/prysmaticlabs/prysm/client/params" "github.com/prysmaticlabs/prysm/client/proposer" + "github.com/prysmaticlabs/prysm/client/rpcclient" "github.com/prysmaticlabs/prysm/client/simulator" "github.com/prysmaticlabs/prysm/client/syncer" "github.com/prysmaticlabs/prysm/client/txpool" @@ -86,6 +88,10 @@ func New(ctx *cli.Context) (*ShardEthereum, error) { return nil, err } + if err := shardEthereum.registerBeaconRPCService(ctx); err != nil { + return nil, err + } + return shardEthereum, nil } @@ -265,3 +271,11 @@ func (s *ShardEthereum) registerSyncerService(config *params.Config, shardID int } return s.services.RegisterService(sync) } + +func (s *ShardEthereum) registerBeaconRPCService(ctx *cli.Context) error { + endpoint := ctx.GlobalString(utils.BeaconRPCProviderFlag.Name) + rpcService := rpcclient.NewRPCClient(context.TODO(), &rpcclient.Config{ + Endpoint: endpoint, + }) + return s.services.RegisterService(rpcService) +} diff --git a/client/rpcclient/BUILD.bazel b/client/rpcclient/BUILD.bazel new file mode 100644 index 000000000..70a5e4080 --- /dev/null +++ b/client/rpcclient/BUILD.bazel @@ -0,0 +1,23 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["service.go"], + importpath = "github.com/prysmaticlabs/prysm/client/rpcclient", + visibility = ["//client:__subpackages__"], + deps = [ + "//proto/beacon/rpc/v1:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + "@org_golang_google_grpc//:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["service_test.go"], + embed = [":go_default_library"], + deps = [ + "//shared/testutil:go_default_library", + "@com_github_sirupsen_logrus//hooks/test:go_default_library", + ], +) diff --git a/client/rpcclient/service.go b/client/rpcclient/service.go new file mode 100644 index 000000000..b56d6baa8 --- /dev/null +++ b/client/rpcclient/service.go @@ -0,0 +1,59 @@ +package rpcclient + +import ( + "context" + + pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1" + "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +var log = logrus.WithField("prefix", "rpc-client") + +// Service for an RPCClient to a Beacon Node. +type Service struct { + ctx context.Context + cancel context.CancelFunc + conn *grpc.ClientConn + endpoint string +} + +// Config for the RPCClient service. +type Config struct { + Endpoint string +} + +// NewRPCClient sets up a new beacon node RPC client connection. +func NewRPCClient(ctx context.Context, cfg *Config) *Service { + ctx, cancel := context.WithCancel(ctx) + return &Service{ + ctx: ctx, + cancel: cancel, + endpoint: cfg.Endpoint, + } +} + +// Start the grpc connection. +func (s *Service) Start() { + log.Info("Starting service") + conn, err := grpc.Dial(s.endpoint, grpc.WithInsecure()) + if err != nil { + log.Errorf("Could not connect to beacon node via RPC endpoint: %s: %v", s.endpoint, err) + return + } + s.conn = conn +} + +// Stop the dialed connection. +func (s *Service) Stop() error { + log.Info("Stopping service") + if s.conn != nil { + return s.conn.Close() + } + return nil +} + +// BeaconServiceClient return the proto RPC interface. +func (s *Service) BeaconServiceClient() pb.BeaconServiceClient { + return pb.NewBeaconServiceClient(s.conn) +} diff --git a/client/rpcclient/service_test.go b/client/rpcclient/service_test.go new file mode 100644 index 000000000..aee040b97 --- /dev/null +++ b/client/rpcclient/service_test.go @@ -0,0 +1,21 @@ +package rpcclient + +import ( + "context" + "testing" + + "github.com/prysmaticlabs/prysm/shared/testutil" + logTest "github.com/sirupsen/logrus/hooks/test" +) + +func TestLifecycle(t *testing.T) { + hook := logTest.NewGlobal() + rpcClientService := NewRPCClient(context.Background(), &Config{Endpoint: "merkle tries"}) + + rpcClientService.Start() + + testutil.AssertLogsContain(t, hook, "Starting service") + + rpcClientService.Stop() + testutil.AssertLogsContain(t, hook, "Stopping service") +} diff --git a/client/utils/flags.go b/client/utils/flags.go index b850bc8f8..a6431a4d4 100644 --- a/client/utils/flags.go +++ b/client/utils/flags.go @@ -23,4 +23,10 @@ var ( Name: "shardid", Usage: `use the --shardid to determine which shard to start p2p server, listen for incoming transactions and perform proposer/observer duties`, } + // BeaconRPCProviderFlag defines a beacon node RPC endpoint. + BeaconRPCProviderFlag = cli.StringFlag{ + Name: "beacon-rpc-provider", + Usage: "Beacon node RPC provider endpoint", + Value: "http://localhost:4000/", + } )