beacon: Beacon Node gRPC Server and Client Services (#364)

This commit is contained in:
Raul Jordan 2018-08-01 17:08:44 -05:00 committed by GitHub
parent c9f1bfc19c
commit 29596bf862
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 295 additions and 7 deletions

View File

@ -58,7 +58,7 @@ VERSION:
app.Usage = "this is a beacon chain implementation for Ethereum 2.0"
app.Action = startNode
app.Flags = []cli.Flag{utils.SimulatorFlag, utils.VrcContractFlag, utils.PubKeyFlag, utils.Web3ProviderFlag, cmd.DataDirFlag, cmd.VerbosityFlag, debug.PProfFlag, debug.PProfAddrFlag, debug.PProfPortFlag, debug.MemProfileRateFlag, debug.CPUProfileFlag, debug.TraceFlag}
app.Flags = []cli.Flag{utils.SimulatorFlag, utils.VrcContractFlag, utils.PubKeyFlag, utils.Web3ProviderFlag, utils.RPCPort, cmd.DataDirFlag, cmd.VerbosityFlag, debug.PProfFlag, debug.PProfAddrFlag, debug.PProfPortFlag, debug.MemProfileRateFlag, debug.CPUProfileFlag, debug.TraceFlag}
app.Before = func(ctx *cli.Context) error {
runtime.GOMAXPROCS(runtime.NumCPU())

View File

@ -8,6 +8,7 @@ go_library(
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/powchain:go_default_library",
"//beacon-chain/rpc:go_default_library",
"//beacon-chain/simulator:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/utils:go_default_library",

View File

@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
"github.com/prysmaticlabs/prysm/beacon-chain/rpc"
"github.com/prysmaticlabs/prysm/beacon-chain/simulator"
rbcsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/beacon-chain/utils"
@ -64,16 +65,18 @@ func New(ctx *cli.Context) (*BeaconNode, error) {
return nil, err
}
if ctx.GlobalBool(utils.SimulatorFlag.Name) {
if err := beacon.registerSimulatorService(); err != nil {
return nil, err
}
if err := beacon.registerSimulatorService(ctx); err != nil {
return nil, err
}
if err := beacon.registerSyncService(); err != nil {
return nil, err
}
if err := beacon.registerRPCService(ctx); err != nil {
return nil, err
}
return beacon, nil
}
@ -180,7 +183,10 @@ func (b *BeaconNode) registerSyncService() error {
return b.services.RegisterService(syncService)
}
func (b *BeaconNode) registerSimulatorService() error {
func (b *BeaconNode) registerSimulatorService(ctx *cli.Context) error {
if !ctx.GlobalBool(utils.SimulatorFlag.Name) {
return nil
}
var p2pService *p2p.Server
if err := b.services.FetchService(&p2pService); err != nil {
return err
@ -200,3 +206,11 @@ func (b *BeaconNode) registerSimulatorService() error {
simulatorService := simulator.NewSimulator(context.TODO(), cfg, p2pService, web3Service, chainService)
return b.services.RegisterService(simulatorService)
}
func (b *BeaconNode) registerRPCService(ctx *cli.Context) error {
port := ctx.GlobalString(utils.RPCPort.Name)
rpcService := rpc.NewRPCService(context.TODO(), &rpc.Config{
Port: port,
})
return b.services.RegisterService(rpcService)
}

View File

@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["service.go"],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/rpc",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//proto/beacon/rpc/v1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["service_test.go"],
embed = [":go_default_library"],
deps = [
"//shared/testutil:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)

View File

@ -0,0 +1,84 @@
package rpc
import (
"context"
"fmt"
"net"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
var log = logrus.WithField("prefix", "rpc")
// Service defining an RPC server for a beacon node.
type Service struct {
ctx context.Context
cancel context.CancelFunc
port string
listener net.Listener
}
// Config options for the beacon node RPC server.
type Config struct {
Port string
}
// NewRPCService creates a new instance of a struct implementing the BeaconServiceServer
// interface.
func NewRPCService(ctx context.Context, cfg *Config) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
ctx: ctx,
cancel: cancel,
port: cfg.Port,
}
}
// Start the gRPC server.
func (s *Service) Start() {
log.Info("Starting service")
lis, err := net.Listen("tcp", fmt.Sprintf(":%s", s.port))
if err != nil {
log.Errorf("Could not listen to port :%s: %v", s.port, err)
return
}
s.listener = lis
log.Infof("RPC server listening on port :%s", s.port)
grpcServer := grpc.NewServer()
pb.RegisterBeaconServiceServer(grpcServer, s)
go func() {
err = grpcServer.Serve(lis)
if err != nil {
log.Errorf("Could not serve gRPC: %v", err)
}
}()
}
// Stop the service.
func (s *Service) Stop() error {
log.Info("Stopping service")
if s.listener != nil {
return s.listener.Close()
}
return nil
}
// ShuffleValidators shuffles the validators into attesters/proposers.
func (s *Service) ShuffleValidators(req *pb.ShuffleRequest, stream pb.BeaconService_ShuffleValidatorsServer) error {
return stream.Send(&pb.ShuffleResponse{IsProposer: true, IsAttester: false})
}
// ProposeBlock is called by a proposer in a sharding client and a full beacon node
// the request into a beacon block that can then be included in a canonical chain.
func (s *Service) ProposeBlock(ctx context.Context, req *pb.ProposeRequest) (*pb.ProposeResponse, error) {
return nil, nil
}
// SignBlock is a function called by an attester in a sharding client to sign off
// on a block.
func (s *Service) SignBlock(ctx context.Context, req *pb.SignRequest) (*pb.SignResponse, error) {
return nil, nil
}

View File

@ -0,0 +1,36 @@
package rpc
import (
"context"
"fmt"
"testing"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestLifecycle(t *testing.T) {
hook := logTest.NewGlobal()
rpcService := NewRPCService(context.Background(), &Config{Port: "9999"})
rpcService.Start()
testutil.AssertLogsContain(t, hook, "Starting service")
testutil.AssertLogsContain(t, hook, fmt.Sprintf("RPC server listening on port :%s", rpcService.port))
rpcService.Stop()
testutil.AssertLogsContain(t, hook, "Stopping service")
}
func TestBadEndpoint(t *testing.T) {
hook := logTest.NewGlobal()
rpcService := NewRPCService(context.Background(), &Config{Port: "ralph merkle!!!"})
rpcService.Start()
testutil.AssertLogsContain(t, hook, "Starting service")
testutil.AssertLogsContain(t, hook, fmt.Sprintf("Could not listen to port :%s", rpcService.port))
rpcService.Stop()
testutil.AssertLogsContain(t, hook, "Stopping service")
}

View File

@ -26,4 +26,10 @@ var (
Name: "pubkey",
Usage: "Validator's public key. Beacon chain node will listen to VRC log to determine when registration has completed based on this public key address.",
}
// RPCPort defines a beacon node RPC port to open.
RPCPort = cli.StringFlag{
Name: "rpc-port",
Usage: "RPC port exposed by a beacon node",
Value: "4000",
}
)

View File

@ -54,7 +54,7 @@ VERSION:
app.Usage = `launches a sharding client that interacts with a beacon chain, starts proposer services, shardp2p connections, and more
`
app.Action = startNode
app.Flags = []cli.Flag{utils.ActorFlag, cmd.VerbosityFlag, cmd.DataDirFlag, cmd.PasswordFileFlag, cmd.NetworkIDFlag, cmd.IPCPathFlag, cmd.RPCProviderFlag, utils.DepositFlag, utils.ShardIDFlag, debug.PProfFlag, debug.PProfAddrFlag, debug.PProfPortFlag, debug.MemProfileRateFlag, debug.CPUProfileFlag, debug.TraceFlag}
app.Flags = []cli.Flag{utils.ActorFlag, utils.BeaconRPCProviderFlag, cmd.VerbosityFlag, cmd.DataDirFlag, cmd.PasswordFileFlag, cmd.NetworkIDFlag, cmd.IPCPathFlag, cmd.RPCProviderFlag, utils.DepositFlag, utils.ShardIDFlag, debug.PProfFlag, debug.PProfAddrFlag, debug.PProfPortFlag, debug.MemProfileRateFlag, debug.CPUProfileFlag, debug.TraceFlag}
app.Before = func(ctx *cli.Context) error {
runtime.GOMAXPROCS(runtime.NumCPU())

View File

@ -11,6 +11,7 @@ go_library(
"//client/observer:go_default_library",
"//client/params:go_default_library",
"//client/proposer:go_default_library",
"//client/rpcclient:go_default_library",
"//client/simulator:go_default_library",
"//client/syncer:go_default_library",
"//client/txpool:go_default_library",

View File

@ -5,6 +5,7 @@
package node
import (
"context"
"fmt"
"os"
"os/signal"
@ -18,6 +19,7 @@ import (
"github.com/prysmaticlabs/prysm/client/observer"
"github.com/prysmaticlabs/prysm/client/params"
"github.com/prysmaticlabs/prysm/client/proposer"
"github.com/prysmaticlabs/prysm/client/rpcclient"
"github.com/prysmaticlabs/prysm/client/simulator"
"github.com/prysmaticlabs/prysm/client/syncer"
"github.com/prysmaticlabs/prysm/client/txpool"
@ -86,6 +88,10 @@ func New(ctx *cli.Context) (*ShardEthereum, error) {
return nil, err
}
if err := shardEthereum.registerBeaconRPCService(ctx); err != nil {
return nil, err
}
return shardEthereum, nil
}
@ -265,3 +271,11 @@ func (s *ShardEthereum) registerSyncerService(config *params.Config, shardID int
}
return s.services.RegisterService(sync)
}
func (s *ShardEthereum) registerBeaconRPCService(ctx *cli.Context) error {
endpoint := ctx.GlobalString(utils.BeaconRPCProviderFlag.Name)
rpcService := rpcclient.NewRPCClient(context.TODO(), &rpcclient.Config{
Endpoint: endpoint,
})
return s.services.RegisterService(rpcService)
}

View File

@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["service.go"],
importpath = "github.com/prysmaticlabs/prysm/client/rpcclient",
visibility = ["//client:__subpackages__"],
deps = [
"//proto/beacon/rpc/v1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["service_test.go"],
embed = [":go_default_library"],
deps = [
"//shared/testutil:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)

View File

@ -0,0 +1,59 @@
package rpcclient
import (
"context"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
var log = logrus.WithField("prefix", "rpc-client")
// Service for an RPCClient to a Beacon Node.
type Service struct {
ctx context.Context
cancel context.CancelFunc
conn *grpc.ClientConn
endpoint string
}
// Config for the RPCClient service.
type Config struct {
Endpoint string
}
// NewRPCClient sets up a new beacon node RPC client connection.
func NewRPCClient(ctx context.Context, cfg *Config) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
ctx: ctx,
cancel: cancel,
endpoint: cfg.Endpoint,
}
}
// Start the grpc connection.
func (s *Service) Start() {
log.Info("Starting service")
conn, err := grpc.Dial(s.endpoint, grpc.WithInsecure())
if err != nil {
log.Errorf("Could not connect to beacon node via RPC endpoint: %s: %v", s.endpoint, err)
return
}
s.conn = conn
}
// Stop the dialed connection.
func (s *Service) Stop() error {
log.Info("Stopping service")
if s.conn != nil {
return s.conn.Close()
}
return nil
}
// BeaconServiceClient return the proto RPC interface.
func (s *Service) BeaconServiceClient() pb.BeaconServiceClient {
return pb.NewBeaconServiceClient(s.conn)
}

View File

@ -0,0 +1,21 @@
package rpcclient
import (
"context"
"testing"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestLifecycle(t *testing.T) {
hook := logTest.NewGlobal()
rpcClientService := NewRPCClient(context.Background(), &Config{Endpoint: "merkle tries"})
rpcClientService.Start()
testutil.AssertLogsContain(t, hook, "Starting service")
rpcClientService.Stop()
testutil.AssertLogsContain(t, hook, "Stopping service")
}

View File

@ -23,4 +23,10 @@ var (
Name: "shardid",
Usage: `use the --shardid to determine which shard to start p2p server, listen for incoming transactions and perform proposer/observer duties`,
}
// BeaconRPCProviderFlag defines a beacon node RPC endpoint.
BeaconRPCProviderFlag = cli.StringFlag{
Name: "beacon-rpc-provider",
Usage: "Beacon node RPC provider endpoint",
Value: "http://localhost:4000/",
}
)