add --sentry.api.addr flag (#1850)

* add --sentry.api.addr

* add --sentry.api.addr

* add --sentry.api.addr

* add --sentry.api.addr

* add --sentry.api.addr

* add --sentry.api.addr

* add --sentry.api.addr

* add --sentry.api.addr

* add --sentry.api.addr

* add --sentry.api.addr
This commit is contained in:
Alex Sharov 2021-04-30 22:09:03 +07:00 committed by GitHub
parent c313238e4a
commit 249b3098cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 60 additions and 21 deletions

View File

@ -196,6 +196,8 @@ Run RPC daemon
> ./build/bin/rpcdaemon --private.api.addr=localhost:9090 --http.api=eth,debug,net
```
**gRPC ports**: `9090` TG, `9091` sentry, `9092` consensus engine, `9093` snapshot downloader, `9094` TxPool
**For dual mode**
If both `--datadir` and `--private.api.addr` options are used for RPC daemon, it works in a "dual" mode. This only works when RPC daemon is on the same computer as turbo-geth. In this mode, most data transfer from turbo-geth to RPC daemon happens via shared memory, only certain things (like new header notifications) happen via TPC socket.

View File

@ -38,7 +38,7 @@ import (
// Methods of Core called by sentry
func grpcSentryClient(ctx context.Context, sentryAddr string) (proto_sentry.SentryClient, error) {
func GrpcSentryClient(ctx context.Context, sentryAddr string) (proto_sentry.SentryClient, error) {
// creating grpc client connection
var dialOpts []grpc.DialOption
dialOpts = []grpc.DialOption{
@ -64,7 +64,7 @@ func Download(sentryAddrs []string, db ethdb.Database, timeout, window int, chai
log.Info("Starting Sentry client", "connecting to sentry", sentryAddrs)
sentries := make([]proto_sentry.SentryClient, len(sentryAddrs))
for i, addr := range sentryAddrs {
sentry, err := grpcSentryClient(ctx, addr)
sentry, err := GrpcSentryClient(ctx, addr)
if err != nil {
return err
}
@ -206,7 +206,8 @@ func Combined(natSetting string, port int, staticPeers []string, discovery bool,
sentry := &SentryClientDirect{}
sentry.SetServer(sentryServer)
chainConfig, genesisHash, engine, networkID := cfg(db, chain)
controlServer, err := NewControlServer(db, nodeName, chainConfig, genesisHash, engine, networkID, []proto_sentry.SentryClient{sentry}, window)
sentries := []proto_sentry.SentryClient{sentry}
controlServer, err := NewControlServer(db, nodeName, chainConfig, genesisHash, engine, networkID, sentries, window)
if err != nil {
return fmt.Errorf("create core P2P server: %w", err)
}
@ -225,7 +226,7 @@ func Combined(natSetting string, port int, staticPeers []string, discovery bool,
return err
}
if err = SetSentryStatus(ctx, sentry, controlServer); err != nil {
if err = SetSentryStatus(ctx, sentries, controlServer); err != nil {
log.Error("failed to set sentry status", "error", err)
return nil
}
@ -271,9 +272,11 @@ func Loop(ctx context.Context, db ethdb.Database, sync *stagedsync.StagedSync, c
}
func SetSentryStatus(ctx context.Context, sentry proto_sentry.SentryClient, controlServer *ControlServerImpl) error {
if _, err := sentry.SetStatus(ctx, makeStatusData(controlServer), &grpc.EmptyCallOption{}); err != nil {
return err
func SetSentryStatus(ctx context.Context, sentries []proto_sentry.SentryClient, controlServer *ControlServerImpl) error {
for i := range sentries {
if _, err := sentries[i].SetStatus(ctx, makeStatusData(controlServer), &grpc.EmptyCallOption{}); err != nil {
return err
}
}
return nil
}

View File

@ -467,6 +467,10 @@ var (
Usage: "Network listening port",
Value: 30303,
}
SentryAddrFlag = cli.StringSliceFlag{
Name: "sentry.api.addr",
Usage: "comma separated sentry addresses '<host>:<port>,<host>:<port>'",
}
BootnodesFlag = cli.StringFlag{
Name: "bootnodes",
Usage: "Comma separated enode URLs for P2P discovery bootstrap",
@ -724,6 +728,9 @@ func setListenAddress(ctx *cli.Context, cfg *p2p.Config) {
if ctx.GlobalIsSet(ListenPortFlag.Name) {
cfg.ListenAddr = fmt.Sprintf(":%d", ctx.GlobalInt(ListenPortFlag.Name))
}
if ctx.GlobalIsSet(SentryAddrFlag.Name) {
cfg.SentryAddr = ctx.GlobalStringSlice(SentryAddrFlag.Name)
}
}
// setNAT creates a port mapper from command line flags.
@ -815,6 +822,7 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
setNodeUserIdent(ctx, cfg)
setDataDir(ctx, cfg)
}
func SetNodeConfigCobra(cmd *cobra.Command, cfg *node.Config) {
flags := cmd.Flags()
//SetP2PConfig(ctx, &cfg.P2P)
@ -1124,6 +1132,8 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
setMiner(ctx, &cfg.Miner)
setWhitelist(ctx, cfg)
cfg.P2PEnabled = len(stack.Config().P2P.SentryAddr) == 0
if ctx.GlobalIsSet(NetworkIdFlag.Name) {
cfg.NetworkID = ctx.GlobalUint64(NetworkIdFlag.Name)
}

View File

@ -404,18 +404,28 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu
checkpoint := config.Checkpoint
if eth.config.EnableDownloadV2 {
eth.sentryServer = download.NewSentryServer(context.Background())
sentry := &download.SentryClientDirect{}
eth.sentryServer.P2pServer = eth.p2pServer
sentry.SetServer(eth.sentryServer)
eth.sentries = []proto_sentry.SentryClient{sentry}
blockDownloaderWindow := 65536
eth.downloadV2Ctx, eth.downloadV2Cancel = context.WithCancel(context.Background())
if len(stack.Config().P2P.SentryAddr) > 0 {
for _, addr := range stack.Config().P2P.SentryAddr {
sentry, err := download.GrpcSentryClient(eth.downloadV2Ctx, addr)
if err != nil {
return nil, err
}
eth.sentries = append(eth.sentries, sentry)
}
} else {
eth.sentryServer = download.NewSentryServer(eth.downloadV2Ctx)
sentry := &download.SentryClientDirect{}
eth.sentryServer.P2pServer = eth.p2pServer
sentry.SetServer(eth.sentryServer)
eth.sentries = []proto_sentry.SentryClient{sentry}
}
blockDownloaderWindow := 65536
eth.downloadServer, err = download.NewControlServer(chainDb, stack.Config().NodeName(), chainConfig, genesisHash, eth.engine, eth.config.NetworkID, eth.sentries, blockDownloaderWindow)
if err != nil {
return nil, err
}
if err = download.SetSentryStatus(eth.downloadV2Ctx, sentry, eth.downloadServer); err != nil {
if err = download.SetSentryStatus(eth.downloadV2Ctx, eth.sentries, eth.downloadServer); err != nil {
return nil, err
}
eth.txPoolServer, err = download.NewTxPoolServer(eth.sentries, eth.txPool)
@ -495,7 +505,10 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu
// Register the backend on the node
stack.RegisterAPIs(eth.APIs())
stack.RegisterProtocols(eth.Protocols())
if eth.config.P2PEnabled {
stack.RegisterProtocols(eth.Protocols())
}
stack.RegisterLifecycle(eth)
// Check for unclean shutdown
return eth, nil

View File

@ -121,6 +121,7 @@ type Config struct {
Pruning bool // Whether to disable pruning and flush everything to disk
EnableDownloadV2 bool
P2PEnabled bool
StorageMode ethdb.StorageMode
BatchSize datasize.ByteSize // Batch size for execution stage

View File

@ -256,14 +256,19 @@ func (n *Node) doClose(errs []error) error {
func (n *Node) openEndpoints() error {
// start networking endpoints
n.log.Info("Starting peer-to-peer node", "instance", n.server.Name)
if err := n.server.Start(); err != nil {
return convertFileLockError(err)
if len(n.config.P2P.SentryAddr) == 0 {
if err := n.server.Start(); err != nil {
return convertFileLockError(err)
}
}
// start RPC endpoints
err := n.startRPC()
if err != nil {
n.stopRPC()
n.server.Stop()
if len(n.config.P2P.SentryAddr) == 0 {
n.server.Stop()
}
}
return err
}
@ -281,7 +286,7 @@ func containsLifecycle(lfs []Lifecycle, l Lifecycle) bool {
// stopServices terminates running services, RPC and p2p networking.
// It is the inverse of Start.
func (n *Node) stopServices(running []Lifecycle) error {
n.stopRPC()
//n.stopRPC()
// Stop running lifecycles in reverse order.
failure := &StopError{Services: make(map[reflect.Type]error)}
@ -291,8 +296,10 @@ func (n *Node) stopServices(running []Lifecycle) error {
}
}
// Stop p2p networking.
n.server.Stop()
if len(n.config.P2P.SentryAddr) == 0 {
// Stop p2p networking.
n.server.Stop()
}
if len(failure.Services) > 0 {
return failure

View File

@ -140,6 +140,7 @@ type Config struct {
// ListenAddr field will be updated with the actual address when
// the server is started.
ListenAddr string
SentryAddr []string
// If set to a non-nil value, the given NAT port mapper
// is used to make the listening port available to the

View File

@ -69,4 +69,5 @@ var DefaultFlags = []cli.Flag{
utils.MinerExtraDataFlag,
utils.MinerNoVerfiyFlag,
DownloadV2Flag,
utils.SentryAddrFlag,
}

View File

@ -119,6 +119,7 @@ var (
func ApplyFlagsForEthConfig(ctx *cli.Context, cfg *ethconfig.Config) {
cfg.EnableDownloadV2 = ctx.GlobalBool(DownloadV2Flag.Name)
mode, err := ethdb.StorageModeFromString(ctx.GlobalString(StorageModeFlag.Name))
if err != nil {
utils.Fatalf(fmt.Sprintf("error while parsing mode: %v", err))