From 65f71b3a4828fd987f3be25b330f83465110b151 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Fri, 15 Mar 2024 12:08:19 +0100 Subject: [PATCH] P2P: Simplify code (#13719) * `subscribeStaticWithSubnets`: Fix docstring. * `buildOptions`: Avoid `options` mutations. * `dv5Cfg`: Avoid mutation. * `RefreshENR`: Use default for all but Phase0. * `udp4`, `udp6`: Create enum. * `p2p.Config`: `BootstrapNodeAddr`==> `BootstrapNodeAddrs`. * `p2p.Config`: `Discv5BootStrapAddr` ==> `Discv5BootStrapAddrs`. * `TestScorers_BadResponses_Score`: Improve. * `BeaconNode`: Avoid mutation. * `TestStore_TrustedPeers`: Remove blankline. * Remove blank identifiers. * `privKey`: Keep the majority of code with low indentation. * `P2PPreregistration`: Return error instead of fatal log. * `parseBootStrapAddrs` => `ParseBootStrapAddrs` (export) * `p2p.Config`: Remove `BootstrapNodeAddrs`. * `NewService`: Avoid mutation when possible. * `Service`: Remove blank identifier. * `buildOptions`: Avoid `log.Fatalf` (make deepsource happy). * `registerGRPCGateway`: Use `net.JoinHostPort` (make deepsource happy). * `registerBuilderService`: Make deepsource happy. * `scorers`: Add `NoLock` suffix (make deepsource happy). * `scorerr`: Add some `NoLock`suffixes (making deepsource happy). * `discovery_test.go`. Remove init. Rationale: `rand.Seed` is deprecated: As of Go 1.20 there is no reason to call Seed with a random value. Programs that call Seed with a known value to get a specific sequence of results should use New(NewSource(seed)) to obtain a local random generator. This makes deepsource happy as well. * `createListener`: Reduce cyclomatic complexity (make deepsource happy). * `startDB`: Reduce cyclomatic complexity (make deepsource happy). * `main`: Log a FATAL on error. This way, the error message is very readable. Before this commit, the error message is the less readable message in the logs. * `New`: Reduce cyclomatic complexity (make deepsource happy). * `main`: Avoid `App` mutation, and make deepsource happy. * Update beacon-chain/node/node.go Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com> * `bootnodes` ==> `BootNodes` (Fix PR comment). * Remove duplicate `configureFastSSZHashingAlgorithm` since already done in `configureBeacon`. (Fix PR comment) * Add `TestCreateLocalNode`. (PR comment fix.) * `startModules` ==> `startBaseServices (Fix PR comment). * `buildOptions` return errors consistently. * `New`: Change ordering. --------- Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com> --- beacon-chain/node/node.go | 496 ++++++++++-------- beacon-chain/node/registration/BUILD.bazel | 1 + beacon-chain/node/registration/p2p.go | 7 +- beacon-chain/p2p/broadcaster_test.go | 5 +- beacon-chain/p2p/config.go | 43 +- beacon-chain/p2p/connection_gater.go | 6 +- beacon-chain/p2p/discovery.go | 103 ++-- beacon-chain/p2p/discovery_test.go | 118 ++++- beacon-chain/p2p/fork_test.go | 10 +- beacon-chain/p2p/info.go | 2 +- beacon-chain/p2p/options.go | 23 +- beacon-chain/p2p/options_test.go | 4 +- beacon-chain/p2p/peers/peerdata/store_test.go | 1 - .../p2p/peers/scorers/bad_responses.go | 22 +- .../p2p/peers/scorers/bad_responses_test.go | 28 +- .../p2p/peers/scorers/block_providers.go | 32 +- .../p2p/peers/scorers/gossip_scorer.go | 20 +- beacon-chain/p2p/peers/scorers/peer_status.go | 22 +- beacon-chain/p2p/peers/scorers/service.go | 14 +- beacon-chain/p2p/service.go | 59 ++- beacon-chain/p2p/service_test.go | 7 +- beacon-chain/p2p/subnets_test.go | 16 +- beacon-chain/p2p/utils.go | 43 +- beacon-chain/sync/subscriber.go | 2 +- cmd/beacon-chain/BUILD.bazel | 1 + cmd/beacon-chain/main.go | 144 ++--- 26 files changed, 729 insertions(+), 500 deletions(-) diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index a1eccfbca..e87d68cf0 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -7,9 +7,11 @@ import ( "bytes" "context" "fmt" + "net" "os" "os/signal" "path/filepath" + "strconv" "strings" "sync" "syscall" @@ -127,51 +129,16 @@ type BeaconNode struct { // New creates a new node instance, sets up configuration options, and registers // every required service to the node. func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*BeaconNode, error) { - if err := configureTracing(cliCtx); err != nil { - return nil, err + if err := configureBeacon(cliCtx); err != nil { + return nil, errors.Wrap(err, "could not set beacon configuration options") } - prereqs.WarnIfPlatformNotSupported(cliCtx.Context) - if hasNetworkFlag(cliCtx) && cliCtx.IsSet(cmd.ChainConfigFileFlag.Name) { - return nil, fmt.Errorf("%s cannot be passed concurrently with network flag", cmd.ChainConfigFileFlag.Name) - } - if err := features.ConfigureBeaconChain(cliCtx); err != nil { - return nil, err - } - if err := cmd.ConfigureBeaconChain(cliCtx); err != nil { - return nil, err - } - flags.ConfigureGlobalFlags(cliCtx) - if err := configureChainConfig(cliCtx); err != nil { - return nil, err - } - if err := configureHistoricalSlasher(cliCtx); err != nil { - return nil, err - } - err := configureBuilderCircuitBreaker(cliCtx) - if err != nil { - return nil, err - } - if err := configureSlotsPerArchivedPoint(cliCtx); err != nil { - return nil, err - } - if err := configureEth1Config(cliCtx); err != nil { - return nil, err - } - configureNetwork(cliCtx) - if err := configureInteropConfig(cliCtx); err != nil { - return nil, err - } - if err := configureExecutionSetting(cliCtx); err != nil { - return nil, err - } - configureFastSSZHashingAlgorithm() // Initializes any forks here. params.BeaconConfig().InitializeForkSchedule() registry := runtime.NewServiceRegistry() - ctx := cliCtx.Context + beacon := &BeaconNode{ cliCtx: cliCtx, ctx: ctx, @@ -191,10 +158,10 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco slasherBlockHeadersFeed: new(event.Feed), slasherAttestationsFeed: new(event.Feed), serviceFlagOpts: &serviceFlagOpts{}, + initialSyncComplete: make(chan struct{}), + syncChecker: &initialsync.SyncChecker{}, } - beacon.initialSyncComplete = make(chan struct{}) - beacon.syncChecker = &initialsync.SyncChecker{} for _, opt := range opts { if err := opt(beacon); err != nil { return nil, err @@ -203,8 +170,8 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco synchronizer := startup.NewClockSynchronizer() beacon.clockWaiter = synchronizer - beacon.forkChoicer = doublylinkedtree.New() + depositAddress, err := execution.DepositContractAddress() if err != nil { return nil, err @@ -220,112 +187,29 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco beacon.BlobStorage = blobs } - log.Debugln("Starting DB") - if err := beacon.startDB(cliCtx, depositAddress); err != nil { - return nil, err - } - beacon.BlobStorage.WarmCache() - - log.Debugln("Starting Slashing DB") - if err := beacon.startSlasherDB(cliCtx); err != nil { - return nil, err - } - - log.Debugln("Registering P2P Service") - if err := beacon.registerP2P(cliCtx); err != nil { - return nil, err - } - - bfs, err := backfill.NewUpdater(ctx, beacon.db) + bfs, err := startBaseServices(cliCtx, beacon, depositAddress) if err != nil { - return nil, errors.Wrap(err, "backfill status initialization error") - } - - log.Debugln("Starting State Gen") - if err := beacon.startStateGen(ctx, bfs, beacon.forkChoicer); err != nil { - if errors.Is(err, stategen.ErrNoGenesisBlock) { - log.Errorf("No genesis block/state is found. Prysm only provides a mainnet genesis "+ - "state bundled in the application. You must provide the --%s or --%s flag to load "+ - "a genesis block/state for this network.", "genesis-state", "genesis-beacon-api-url") - } - return nil, err + return nil, errors.Wrap(err, "could not start modules") } beacon.verifyInitWaiter = verification.NewInitializerWaiter( beacon.clockWaiter, forkchoice.NewROForkChoice(beacon.forkChoicer), beacon.stateGen) pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer) - beacon.BackfillOpts = append(beacon.BackfillOpts, backfill.WithVerifierWaiter(beacon.verifyInitWaiter), - backfill.WithInitSyncWaiter(initSyncWaiter(ctx, beacon.initialSyncComplete))) + + beacon.BackfillOpts = append( + beacon.BackfillOpts, + backfill.WithVerifierWaiter(beacon.verifyInitWaiter), + backfill.WithInitSyncWaiter(initSyncWaiter(ctx, beacon.initialSyncComplete)), + ) + bf, err := backfill.NewService(ctx, bfs, beacon.BlobStorage, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...) if err != nil { return nil, errors.Wrap(err, "error initializing backfill service") } - if err := beacon.services.RegisterService(bf); err != nil { - return nil, errors.Wrap(err, "error registering backfill service") - } - log.Debugln("Registering POW Chain Service") - if err := beacon.registerPOWChainService(); err != nil { - return nil, err - } - - log.Debugln("Registering Attestation Pool Service") - if err := beacon.registerAttestationPool(); err != nil { - return nil, err - } - - log.Debugln("Registering Deterministic Genesis Service") - if err := beacon.registerDeterministicGenesisService(); err != nil { - return nil, err - } - - log.Debugln("Registering Blockchain Service") - if err := beacon.registerBlockchainService(beacon.forkChoicer, synchronizer, beacon.initialSyncComplete); err != nil { - return nil, err - } - - log.Debugln("Registering Initial Sync Service") - if err := beacon.registerInitialSyncService(beacon.initialSyncComplete); err != nil { - return nil, err - } - - log.Debugln("Registering Sync Service") - if err := beacon.registerSyncService(beacon.initialSyncComplete, bfs); err != nil { - return nil, err - } - - log.Debugln("Registering Slasher Service") - if err := beacon.registerSlasherService(); err != nil { - return nil, err - } - - log.Debugln("Registering builder service") - if err := beacon.registerBuilderService(cliCtx); err != nil { - return nil, err - } - - log.Debugln("Registering RPC Service") - router := newRouter(cliCtx) - if err := beacon.registerRPCService(router); err != nil { - return nil, err - } - - log.Debugln("Registering GRPC Gateway Service") - if err := beacon.registerGRPCGateway(router); err != nil { - return nil, err - } - - log.Debugln("Registering Validator Monitoring Service") - if err := beacon.registerValidatorMonitorService(beacon.initialSyncComplete); err != nil { - return nil, err - } - - if !cliCtx.Bool(cmd.DisableMonitoringFlag.Name) { - log.Debugln("Registering Prometheus Service") - if err := beacon.registerPrometheusService(cliCtx); err != nil { - return nil, err - } + if err := registerServices(cliCtx, beacon, synchronizer, bf, bfs); err != nil { + return nil, errors.Wrap(err, "could not register services") } // db.DatabasePath is the path to the containing directory @@ -343,6 +227,170 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco return beacon, nil } + +func configureBeacon(cliCtx *cli.Context) error { + if err := configureTracing(cliCtx); err != nil { + return errors.Wrap(err, "could not configure tracing") + } + + prereqs.WarnIfPlatformNotSupported(cliCtx.Context) + + if hasNetworkFlag(cliCtx) && cliCtx.IsSet(cmd.ChainConfigFileFlag.Name) { + return fmt.Errorf("%s cannot be passed concurrently with network flag", cmd.ChainConfigFileFlag.Name) + } + + if err := features.ConfigureBeaconChain(cliCtx); err != nil { + return errors.Wrap(err, "could not configure beacon chain") + } + + if err := cmd.ConfigureBeaconChain(cliCtx); err != nil { + return errors.Wrap(err, "could not configure beacon chain") + } + + flags.ConfigureGlobalFlags(cliCtx) + + if err := configureChainConfig(cliCtx); err != nil { + return errors.Wrap(err, "could not configure chain config") + } + + if err := configureHistoricalSlasher(cliCtx); err != nil { + return errors.Wrap(err, "could not configure historical slasher") + } + + if err := configureBuilderCircuitBreaker(cliCtx); err != nil { + return errors.Wrap(err, "could not configure builder circuit breaker") + } + + if err := configureSlotsPerArchivedPoint(cliCtx); err != nil { + return errors.Wrap(err, "could not configure slots per archived point") + } + + if err := configureEth1Config(cliCtx); err != nil { + return errors.Wrap(err, "could not configure eth1 config") + } + + configureNetwork(cliCtx) + + if err := configureInteropConfig(cliCtx); err != nil { + return errors.Wrap(err, "could not configure interop config") + } + + if err := configureExecutionSetting(cliCtx); err != nil { + return errors.Wrap(err, "could not configure execution setting") + } + + configureFastSSZHashingAlgorithm() + + return nil +} + +func startBaseServices(cliCtx *cli.Context, beacon *BeaconNode, depositAddress string) (*backfill.Store, error) { + ctx := cliCtx.Context + log.Debugln("Starting DB") + if err := beacon.startDB(cliCtx, depositAddress); err != nil { + return nil, errors.Wrap(err, "could not start DB") + } + beacon.BlobStorage.WarmCache() + + log.Debugln("Starting Slashing DB") + if err := beacon.startSlasherDB(cliCtx); err != nil { + return nil, errors.Wrap(err, "could not start slashing DB") + } + + log.Debugln("Registering P2P Service") + if err := beacon.registerP2P(cliCtx); err != nil { + return nil, errors.Wrap(err, "could not register P2P service") + } + + bfs, err := backfill.NewUpdater(ctx, beacon.db) + if err != nil { + return nil, errors.Wrap(err, "could not create backfill updater") + } + + log.Debugln("Starting State Gen") + if err := beacon.startStateGen(ctx, bfs, beacon.forkChoicer); err != nil { + if errors.Is(err, stategen.ErrNoGenesisBlock) { + log.Errorf("No genesis block/state is found. Prysm only provides a mainnet genesis "+ + "state bundled in the application. You must provide the --%s or --%s flag to load "+ + "a genesis block/state for this network.", "genesis-state", "genesis-beacon-api-url") + } + return nil, errors.Wrap(err, "could not start state generation") + } + + return bfs, nil +} + +func registerServices(cliCtx *cli.Context, beacon *BeaconNode, synchronizer *startup.ClockSynchronizer, bf *backfill.Service, bfs *backfill.Store) error { + if err := beacon.services.RegisterService(bf); err != nil { + return errors.Wrap(err, "could not register backfill service") + } + + log.Debugln("Registering POW Chain Service") + if err := beacon.registerPOWChainService(); err != nil { + return errors.Wrap(err, "could not register POW chain service") + } + + log.Debugln("Registering Attestation Pool Service") + if err := beacon.registerAttestationPool(); err != nil { + return errors.Wrap(err, "could not register attestation pool service") + } + + log.Debugln("Registering Deterministic Genesis Service") + if err := beacon.registerDeterministicGenesisService(); err != nil { + return errors.Wrap(err, "could not register deterministic genesis service") + } + + log.Debugln("Registering Blockchain Service") + if err := beacon.registerBlockchainService(beacon.forkChoicer, synchronizer, beacon.initialSyncComplete); err != nil { + return errors.Wrap(err, "could not register blockchain service") + } + + log.Debugln("Registering Initial Sync Service") + if err := beacon.registerInitialSyncService(beacon.initialSyncComplete); err != nil { + return errors.Wrap(err, "could not register initial sync service") + } + + log.Debugln("Registering Sync Service") + if err := beacon.registerSyncService(beacon.initialSyncComplete, bfs); err != nil { + return errors.Wrap(err, "could not register sync service") + } + + log.Debugln("Registering Slasher Service") + if err := beacon.registerSlasherService(); err != nil { + return errors.Wrap(err, "could not register slasher service") + } + + log.Debugln("Registering builder service") + if err := beacon.registerBuilderService(cliCtx); err != nil { + return errors.Wrap(err, "could not register builder service") + } + + log.Debugln("Registering RPC Service") + router := newRouter(cliCtx) + if err := beacon.registerRPCService(router); err != nil { + return errors.Wrap(err, "could not register RPC service") + } + + log.Debugln("Registering GRPC Gateway Service") + if err := beacon.registerGRPCGateway(router); err != nil { + return errors.Wrap(err, "could not register GRPC gateway service") + } + + log.Debugln("Registering Validator Monitoring Service") + if err := beacon.registerValidatorMonitorService(beacon.initialSyncComplete); err != nil { + return errors.Wrap(err, "could not register validator monitoring service") + } + + if !cliCtx.Bool(cmd.DisableMonitoringFlag.Name) { + log.Debugln("Registering Prometheus Service") + if err := beacon.registerPrometheusService(cliCtx); err != nil { + return errors.Wrap(err, "could not register prometheus service") + } + } + + return nil +} + func initSyncWaiter(ctx context.Context, complete chan struct{}) func() error { return func() error { select { @@ -431,40 +479,86 @@ func (b *BeaconNode) Close() { close(b.stop) } +func (b *BeaconNode) clearDB(clearDB, forceClearDB bool, d *kv.Store, dbPath string) (*kv.Store, error) { + var err error + clearDBConfirmed := false + + if clearDB && !forceClearDB { + const ( + actionText = "This will delete your beacon chain database stored in your data directory. " + + "Your database backups will not be removed - do you want to proceed? (Y/N)" + + deniedText = "Database will not be deleted. No changes have been made." + ) + + clearDBConfirmed, err = cmd.ConfirmAction(actionText, deniedText) + if err != nil { + return nil, errors.Wrapf(err, "could not confirm action") + } + } + + if clearDBConfirmed || forceClearDB { + log.Warning("Removing database") + if err := d.ClearDB(); err != nil { + return nil, errors.Wrap(err, "could not clear database") + } + + if err := b.BlobStorage.Clear(); err != nil { + return nil, errors.Wrap(err, "could not clear blob storage") + } + + d, err = kv.NewKVStore(b.ctx, dbPath) + if err != nil { + return nil, errors.Wrap(err, "could not create new database") + } + } + + return d, nil +} + +func (b *BeaconNode) checkAndSaveDepositContract(depositAddress string) error { + knownContract, err := b.db.DepositContractAddress(b.ctx) + if err != nil { + return errors.Wrap(err, "could not get deposit contract address") + } + + addr := common.HexToAddress(depositAddress) + if len(knownContract) == 0 { + if err := b.db.SaveDepositContractAddress(b.ctx, addr); err != nil { + return errors.Wrap(err, "could not save deposit contract") + } + } + + if len(knownContract) > 0 && !bytes.Equal(addr.Bytes(), knownContract) { + return fmt.Errorf("database contract is %#x but tried to run with %#x. This likely means "+ + "you are trying to run on a different network than what the database contains. You can run once with "+ + "--%s to wipe the old database or use an alternative data directory with --%s", + knownContract, addr.Bytes(), cmd.ClearDB.Name, cmd.DataDirFlag.Name) + } + + return nil +} + func (b *BeaconNode) startDB(cliCtx *cli.Context, depositAddress string) error { + var depositCache cache.DepositCache + baseDir := cliCtx.String(cmd.DataDirFlag.Name) dbPath := filepath.Join(baseDir, kv.BeaconNodeDbDirName) - clearDB := cliCtx.Bool(cmd.ClearDB.Name) - forceClearDB := cliCtx.Bool(cmd.ForceClearDB.Name) + clearDBRequired := cliCtx.Bool(cmd.ClearDB.Name) + forceClearDBRequired := cliCtx.Bool(cmd.ForceClearDB.Name) log.WithField("databasePath", dbPath).Info("Checking DB") d, err := kv.NewKVStore(b.ctx, dbPath) if err != nil { - return err + return errors.Wrapf(err, "could not create database at %s", dbPath) } - clearDBConfirmed := false - if clearDB && !forceClearDB { - actionText := "This will delete your beacon chain database stored in your data directory. " + - "Your database backups will not be removed - do you want to proceed? (Y/N)" - deniedText := "Database will not be deleted. No changes have been made." - clearDBConfirmed, err = cmd.ConfirmAction(actionText, deniedText) + + if clearDBRequired || forceClearDBRequired { + d, err = b.clearDB(clearDBRequired, forceClearDBRequired, d, dbPath) if err != nil { - return err - } - } - if clearDBConfirmed || forceClearDB { - log.Warning("Removing database") - if err := d.ClearDB(); err != nil { return errors.Wrap(err, "could not clear database") } - if err := b.BlobStorage.Clear(); err != nil { - return errors.Wrap(err, "could not clear blob storage") - } - d, err = kv.NewKVStore(b.ctx, dbPath) - if err != nil { - return errors.Wrap(err, "could not create new database") - } } if err := d.RunMigrations(b.ctx); err != nil { @@ -473,7 +567,6 @@ func (b *BeaconNode) startDB(cliCtx *cli.Context, depositAddress string) error { b.db = d - var depositCache cache.DepositCache if features.Get().EnableEIP4881 { depositCache, err = depositsnapshot.New() } else { @@ -488,16 +581,17 @@ func (b *BeaconNode) startDB(cliCtx *cli.Context, depositAddress string) error { if b.GenesisInitializer != nil { if err := b.GenesisInitializer.Initialize(b.ctx, d); err != nil { if err == db.ErrExistingGenesisState { - return errors.New("Genesis state flag specified but a genesis state " + - "exists already. Run again with --clear-db and/or ensure you are using the " + - "appropriate testnet flag to load the given genesis state.") + return errors.Errorf("Genesis state flag specified but a genesis state "+ + "exists already. Run again with --%s and/or ensure you are using the "+ + "appropriate testnet flag to load the given genesis state.", cmd.ClearDB.Name) } + return errors.Wrap(err, "could not load genesis from file") } } if err := b.db.EnsureEmbeddedGenesis(b.ctx); err != nil { - return err + return errors.Wrap(err, "could not ensure embedded genesis") } if b.CheckpointInitializer != nil { @@ -506,23 +600,11 @@ func (b *BeaconNode) startDB(cliCtx *cli.Context, depositAddress string) error { } } - knownContract, err := b.db.DepositContractAddress(b.ctx) - if err != nil { - return err + if err := b.checkAndSaveDepositContract(depositAddress); err != nil { + return errors.Wrap(err, "could not check and save deposit contract") } - addr := common.HexToAddress(depositAddress) - if len(knownContract) == 0 { - if err := b.db.SaveDepositContractAddress(b.ctx, addr); err != nil { - return errors.Wrap(err, "could not save deposit contract") - } - } - if len(knownContract) > 0 && !bytes.Equal(addr.Bytes(), knownContract) { - return fmt.Errorf("database contract is %#x but tried to run with %#x. This likely means "+ - "you are trying to run on a different network than what the database contains. You can run once with "+ - "'--clear-db' to wipe the old database or use an alternative data directory with '--datadir'", - knownContract, addr.Bytes()) - } - log.Infof("Deposit contract: %#x", addr.Bytes()) + + log.WithField("address", depositAddress).Info("Deposit contract") return nil } @@ -610,31 +692,31 @@ func (b *BeaconNode) startStateGen(ctx context.Context, bfs coverage.AvailableBl func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error { bootstrapNodeAddrs, dataDir, err := registration.P2PPreregistration(cliCtx) if err != nil { - return err + return errors.Wrapf(err, "could not register p2p service") } svc, err := p2p.NewService(b.ctx, &p2p.Config{ - NoDiscovery: cliCtx.Bool(cmd.NoDiscovery.Name), - StaticPeers: slice.SplitCommaSeparated(cliCtx.StringSlice(cmd.StaticPeers.Name)), - BootstrapNodeAddr: bootstrapNodeAddrs, - RelayNodeAddr: cliCtx.String(cmd.RelayNode.Name), - DataDir: dataDir, - LocalIP: cliCtx.String(cmd.P2PIP.Name), - HostAddress: cliCtx.String(cmd.P2PHost.Name), - HostDNS: cliCtx.String(cmd.P2PHostDNS.Name), - PrivateKey: cliCtx.String(cmd.P2PPrivKey.Name), - StaticPeerID: cliCtx.Bool(cmd.P2PStaticID.Name), - MetaDataDir: cliCtx.String(cmd.P2PMetadata.Name), - TCPPort: cliCtx.Uint(cmd.P2PTCPPort.Name), - UDPPort: cliCtx.Uint(cmd.P2PUDPPort.Name), - MaxPeers: cliCtx.Uint(cmd.P2PMaxPeers.Name), - QueueSize: cliCtx.Uint(cmd.PubsubQueueSize.Name), - AllowListCIDR: cliCtx.String(cmd.P2PAllowList.Name), - DenyListCIDR: slice.SplitCommaSeparated(cliCtx.StringSlice(cmd.P2PDenyList.Name)), - EnableUPnP: cliCtx.Bool(cmd.EnableUPnPFlag.Name), - StateNotifier: b, - DB: b.db, - ClockWaiter: b.clockWaiter, + NoDiscovery: cliCtx.Bool(cmd.NoDiscovery.Name), + StaticPeers: slice.SplitCommaSeparated(cliCtx.StringSlice(cmd.StaticPeers.Name)), + Discv5BootStrapAddrs: p2p.ParseBootStrapAddrs(bootstrapNodeAddrs), + RelayNodeAddr: cliCtx.String(cmd.RelayNode.Name), + DataDir: dataDir, + LocalIP: cliCtx.String(cmd.P2PIP.Name), + HostAddress: cliCtx.String(cmd.P2PHost.Name), + HostDNS: cliCtx.String(cmd.P2PHostDNS.Name), + PrivateKey: cliCtx.String(cmd.P2PPrivKey.Name), + StaticPeerID: cliCtx.Bool(cmd.P2PStaticID.Name), + MetaDataDir: cliCtx.String(cmd.P2PMetadata.Name), + TCPPort: cliCtx.Uint(cmd.P2PTCPPort.Name), + UDPPort: cliCtx.Uint(cmd.P2PUDPPort.Name), + MaxPeers: cliCtx.Uint(cmd.P2PMaxPeers.Name), + QueueSize: cliCtx.Uint(cmd.PubsubQueueSize.Name), + AllowListCIDR: cliCtx.String(cmd.P2PAllowList.Name), + DenyListCIDR: slice.SplitCommaSeparated(cliCtx.StringSlice(cmd.P2PDenyList.Name)), + EnableUPnP: cliCtx.Bool(cmd.EnableUPnPFlag.Name), + StateNotifier: b, + DB: b.db, + ClockWaiter: b.clockWaiter, }) if err != nil { return err @@ -976,11 +1058,13 @@ func (b *BeaconNode) registerGRPCGateway(router *mux.Router) error { if b.cliCtx.Bool(flags.DisableGRPCGateway.Name) { return nil } - gatewayPort := b.cliCtx.Int(flags.GRPCGatewayPort.Name) gatewayHost := b.cliCtx.String(flags.GRPCGatewayHost.Name) + gatewayPort := b.cliCtx.Int(flags.GRPCGatewayPort.Name) rpcHost := b.cliCtx.String(flags.RPCHost.Name) - selfAddress := fmt.Sprintf("%s:%d", rpcHost, b.cliCtx.Int(flags.RPCPort.Name)) - gatewayAddress := fmt.Sprintf("%s:%d", gatewayHost, gatewayPort) + rpcPort := b.cliCtx.Int(flags.RPCPort.Name) + + selfAddress := net.JoinHostPort(rpcHost, strconv.Itoa(rpcPort)) + gatewayAddress := net.JoinHostPort(gatewayHost, strconv.Itoa(gatewayPort)) allowedOrigins := strings.Split(b.cliCtx.String(flags.GPRCGatewayCorsDomain.Name), ",") enableDebugRPCEndpoints := b.cliCtx.Bool(flags.EnableDebugRPCEndpoints.Name) selfCert := b.cliCtx.String(flags.CertFlag.Name) @@ -1074,9 +1158,9 @@ func (b *BeaconNode) registerBuilderService(cliCtx *cli.Context) error { return err } - opts := append(b.serviceFlagOpts.builderOpts, - builder.WithHeadFetcher(chainService), - builder.WithDatabase(b.db)) + opts := b.serviceFlagOpts.builderOpts + opts = append(opts, builder.WithHeadFetcher(chainService), builder.WithDatabase(b.db)) + // make cache the default. if !cliCtx.Bool(features.DisableRegistrationCache.Name) { opts = append(opts, builder.WithRegistrationCache()) diff --git a/beacon-chain/node/registration/BUILD.bazel b/beacon-chain/node/registration/BUILD.bazel index 12f13bd68..95d1cadf2 100644 --- a/beacon-chain/node/registration/BUILD.bazel +++ b/beacon-chain/node/registration/BUILD.bazel @@ -11,6 +11,7 @@ go_library( deps = [ "//cmd:go_default_library", "//config/params:go_default_library", + "@com_github_pkg_errors//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_github_urfave_cli_v2//:go_default_library", "@in_gopkg_yaml_v2//:go_default_library", diff --git a/beacon-chain/node/registration/p2p.go b/beacon-chain/node/registration/p2p.go index 73549f754..6675f2544 100644 --- a/beacon-chain/node/registration/p2p.go +++ b/beacon-chain/node/registration/p2p.go @@ -4,6 +4,7 @@ import ( "os" "path/filepath" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/cmd" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/urfave/cli/v2" @@ -31,9 +32,9 @@ func P2PPreregistration(cliCtx *cli.Context) (bootstrapNodeAddrs []string, dataD if dataDir == "" { dataDir = cmd.DefaultDataDir() if dataDir == "" { - log.Fatal( - "Could not determine your system's HOME path, please specify a --datadir you wish " + - "to use for your chain data", + err = errors.Errorf( + "Could not determine your system's HOME path, please specify a --%s you wish to use for your chain data", + cmd.DataDirFlag.Name, ) } } diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go index 5fef1b65c..aa5253314 100644 --- a/beacon-chain/p2p/broadcaster_test.go +++ b/beacon-chain/p2p/broadcaster_test.go @@ -240,9 +240,8 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) { var hosts []host.Host // setup other nodes. cfg = &Config{ - BootstrapNodeAddr: []string{bootNode.String()}, - Discv5BootStrapAddr: []string{bootNode.String()}, - MaxPeers: 30, + Discv5BootStrapAddrs: []string{bootNode.String()}, + MaxPeers: 30, } // Setup 2 different hosts for i := 1; i <= 2; i++ { diff --git a/beacon-chain/p2p/config.go b/beacon-chain/p2p/config.go index 0b4c36bc1..ca5dbfa54 100644 --- a/beacon-chain/p2p/config.go +++ b/beacon-chain/p2p/config.go @@ -12,28 +12,27 @@ const defaultPubsubQueueSize = 600 // Config for the p2p service. These parameters are set from application level flags // to initialize the p2p service. type Config struct { - NoDiscovery bool - EnableUPnP bool - StaticPeerID bool - StaticPeers []string - BootstrapNodeAddr []string - Discv5BootStrapAddr []string - RelayNodeAddr string - LocalIP string - HostAddress string - HostDNS string - PrivateKey string - DataDir string - MetaDataDir string - TCPPort uint - UDPPort uint - MaxPeers uint - QueueSize uint - AllowListCIDR string - DenyListCIDR []string - StateNotifier statefeed.Notifier - DB db.ReadOnlyDatabase - ClockWaiter startup.ClockWaiter + NoDiscovery bool + EnableUPnP bool + StaticPeerID bool + StaticPeers []string + Discv5BootStrapAddrs []string + RelayNodeAddr string + LocalIP string + HostAddress string + HostDNS string + PrivateKey string + DataDir string + MetaDataDir string + TCPPort uint + UDPPort uint + MaxPeers uint + QueueSize uint + AllowListCIDR string + DenyListCIDR []string + StateNotifier statefeed.Notifier + DB db.ReadOnlyDatabase + ClockWaiter startup.ClockWaiter } // validateConfig validates whether the values provided are accurate and will set diff --git a/beacon-chain/p2p/connection_gater.go b/beacon-chain/p2p/connection_gater.go index c1cd0ea8b..a573bea81 100644 --- a/beacon-chain/p2p/connection_gater.go +++ b/beacon-chain/p2p/connection_gater.go @@ -25,7 +25,7 @@ const ( ) // InterceptPeerDial tests whether we're permitted to Dial the specified peer. -func (_ *Service) InterceptPeerDial(_ peer.ID) (allow bool) { +func (*Service) InterceptPeerDial(_ peer.ID) (allow bool) { return true } @@ -63,12 +63,12 @@ func (s *Service) InterceptAccept(n network.ConnMultiaddrs) (allow bool) { // InterceptSecured tests whether a given connection, now authenticated, // is allowed. -func (_ *Service) InterceptSecured(_ network.Direction, _ peer.ID, _ network.ConnMultiaddrs) (allow bool) { +func (*Service) InterceptSecured(_ network.Direction, _ peer.ID, _ network.ConnMultiaddrs) (allow bool) { return true } // InterceptUpgraded tests whether a fully capable connection is allowed. -func (_ *Service) InterceptUpgraded(_ network.Conn) (allow bool, reason control.DisconnectReason) { +func (*Service) InterceptUpgraded(_ network.Conn) (allow bool, reason control.DisconnectReason) { return true, 0 } diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 9b1615f87..e67d942c4 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -34,6 +34,11 @@ type Listener interface { LocalNode() *enode.LocalNode } +const ( + udp4 = iota + udp6 +) + // RefreshENR uses an epoch to refresh the enr entry for our node // with the tracked committee ids for the epoch, allowing our node // to be dynamically discoverable by others given our tracked committee ids. @@ -62,8 +67,14 @@ func (s *Service) RefreshENR() { // Compare current epoch with our fork epochs altairForkEpoch := params.BeaconConfig().AltairForkEpoch switch { - // Altair Behaviour - case currEpoch >= altairForkEpoch: + case currEpoch < altairForkEpoch: + // Phase 0 behaviour. + if bytes.Equal(bitV, currentBitV) { + // return early if bitfield hasn't changed + return + } + s.updateSubnetRecordWithMetadata(bitV) + default: // Retrieve sync subnets from application level // cache. bitS := bitfield.Bitvector4{byte(0x00)} @@ -82,13 +93,6 @@ func (s *Service) RefreshENR() { return } s.updateSubnetRecordWithMetadataV2(bitV, bitS) - default: - // Phase 0 behaviour. - if bytes.Equal(bitV, currentBitV) { - // return early if bitfield hasn't changed - return - } - s.updateSubnetRecordWithMetadata(bitV) } // ping all peers to inform them of new metadata s.pingPeers() @@ -140,9 +144,9 @@ func (s *Service) createListener( // by default we will listen to all interfaces. var bindIP net.IP switch udpVersionFromIP(ipAddr) { - case "udp4": + case udp4: bindIP = net.IPv4zero - case "udp6": + case udp6: bindIP = net.IPv6zero default: return nil, errors.New("invalid ip provided") @@ -160,6 +164,7 @@ func (s *Service) createListener( IP: bindIP, Port: int(s.cfg.UDPPort), } + // Listen to all network interfaces // for both ip protocols. networkVersion := "udp" @@ -177,44 +182,27 @@ func (s *Service) createListener( if err != nil { return nil, errors.Wrap(err, "could not create local node") } - if s.cfg.HostAddress != "" { - hostIP := net.ParseIP(s.cfg.HostAddress) - if hostIP.To4() == nil && hostIP.To16() == nil { - log.Errorf("Invalid host address given: %s", hostIP.String()) - } else { - localNode.SetFallbackIP(hostIP) - localNode.SetStaticIP(hostIP) - } - } - if s.cfg.HostDNS != "" { - host := s.cfg.HostDNS - ips, err := net.LookupIP(host) - if err != nil { - return nil, errors.Wrap(err, "could not resolve host address") - } - if len(ips) > 0 { - // Use first IP returned from the - // resolver. - firstIP := ips[0] - localNode.SetFallbackIP(firstIP) - } - } - dv5Cfg := discover.Config{ - PrivateKey: privKey, - } - dv5Cfg.Bootnodes = []*enode.Node{} - for _, addr := range s.cfg.Discv5BootStrapAddr { + + bootNodes := make([]*enode.Node, 0, len(s.cfg.Discv5BootStrapAddrs)) + for _, addr := range s.cfg.Discv5BootStrapAddrs { bootNode, err := enode.Parse(enode.ValidSchemes, addr) if err != nil { return nil, errors.Wrap(err, "could not bootstrap addr") } - dv5Cfg.Bootnodes = append(dv5Cfg.Bootnodes, bootNode) + + bootNodes = append(bootNodes, bootNode) + } + + dv5Cfg := discover.Config{ + PrivateKey: privKey, + Bootnodes: bootNodes, } listener, err := discover.ListenV5(conn, localNode, dv5Cfg) if err != nil { return nil, errors.Wrap(err, "could not listen to discV5") } + return listener, nil } @@ -242,8 +230,35 @@ func (s *Service) createLocalNode( if err != nil { return nil, errors.Wrap(err, "could not add eth2 fork version entry to enr") } + localNode = initializeAttSubnets(localNode) - return initializeSyncCommSubnets(localNode), nil + localNode = initializeSyncCommSubnets(localNode) + + if s.cfg != nil && s.cfg.HostAddress != "" { + hostIP := net.ParseIP(s.cfg.HostAddress) + if hostIP.To4() == nil && hostIP.To16() == nil { + return nil, errors.Errorf("invalid host address: %s", s.cfg.HostAddress) + } else { + localNode.SetFallbackIP(hostIP) + localNode.SetStaticIP(hostIP) + } + } + + if s.cfg != nil && s.cfg.HostDNS != "" { + host := s.cfg.HostDNS + ips, err := net.LookupIP(host) + if err != nil { + return nil, errors.Wrapf(err, "could not resolve host address: %s", host) + } + if len(ips) > 0 { + // Use first IP returned from the + // resolver. + firstIP := ips[0] + localNode.SetFallbackIP(firstIP) + } + } + + return localNode, nil } func (s *Service) startDiscoveryV5( @@ -363,7 +378,7 @@ func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) { return allAddrs, nil } -func parseBootStrapAddrs(addrs []string) (discv5Nodes []string) { +func ParseBootStrapAddrs(addrs []string) (discv5Nodes []string) { discv5Nodes, _ = parseGenericAddrs(addrs) if len(discv5Nodes) == 0 { log.Warn("No bootstrap addresses supplied") @@ -483,9 +498,9 @@ func multiAddrFromString(address string) (ma.Multiaddr, error) { return ma.NewMultiaddr(address) } -func udpVersionFromIP(ipAddr net.IP) string { +func udpVersionFromIP(ipAddr net.IP) int { if ipAddr.To4() != nil { - return "udp4" + return udp4 } - return "udp6" + return udp6 } diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index e8f99d63e..31d8a23e5 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -42,10 +42,6 @@ import ( var discoveryWaitTime = 1 * time.Second -func init() { - rand.Seed(time.Now().Unix()) -} - func createAddrAndPrivKey(t *testing.T) (net.IP, *ecdsa.PrivateKey) { ip, err := prysmNetwork.ExternalIPv4() require.NoError(t, err, "Could not get ip") @@ -103,8 +99,8 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) { for i := 1; i <= 5; i++ { port = 3000 + i cfg := &Config{ - Discv5BootStrapAddr: []string{bootNode.String()}, - UDPPort: uint(port), + Discv5BootStrapAddrs: []string{bootNode.String()}, + UDPPort: uint(port), } ipAddr, pkey := createAddrAndPrivKey(t) s = &Service{ @@ -134,6 +130,106 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) { } } +func TestCreateLocalNode(t *testing.T) { + testCases := []struct { + name string + cfg *Config + expectedError bool + }{ + { + name: "valid config", + cfg: nil, + expectedError: false, + }, + { + name: "invalid host address", + cfg: &Config{HostAddress: "invalid"}, + expectedError: true, + }, + { + name: "valid host address", + cfg: &Config{HostAddress: "192.168.0.1"}, + expectedError: false, + }, + { + name: "invalid host DNS", + cfg: &Config{HostDNS: "invalid"}, + expectedError: true, + }, + { + name: "valid host DNS", + cfg: &Config{HostDNS: "www.google.com"}, + expectedError: false, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + // Define ports. + const ( + udpPort = 2000 + tcpPort = 3000 + ) + + // Create a private key. + address, privKey := createAddrAndPrivKey(t) + + // Create a service. + service := &Service{ + genesisTime: time.Now(), + genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), + cfg: tt.cfg, + } + + localNode, err := service.createLocalNode(privKey, address, udpPort, tcpPort) + if tt.expectedError { + require.NotNil(t, err) + return + } + + require.NoError(t, err) + + expectedAddress := address + if tt.cfg != nil && tt.cfg.HostAddress != "" { + expectedAddress = net.ParseIP(tt.cfg.HostAddress) + } + + // Check IP. + // IP is not checked int case of DNS, since it can be resolved to different IPs. + if tt.cfg == nil || tt.cfg.HostDNS == "" { + ip := new(net.IP) + require.NoError(t, localNode.Node().Record().Load(enr.WithEntry("ip", ip))) + require.Equal(t, true, ip.Equal(expectedAddress)) + require.Equal(t, true, localNode.Node().IP().Equal(expectedAddress)) + } + + // Check UDP. + udp := new(uint16) + require.NoError(t, localNode.Node().Record().Load(enr.WithEntry("udp", udp))) + require.Equal(t, udpPort, localNode.Node().UDP()) + + // Check TCP. + tcp := new(uint16) + require.NoError(t, localNode.Node().Record().Load(enr.WithEntry("tcp", tcp))) + require.Equal(t, tcpPort, localNode.Node().TCP()) + + // Check fork is set. + fork := new([]byte) + require.NoError(t, localNode.Node().Record().Load(enr.WithEntry(eth2ENRKey, fork))) + require.NotEmpty(t, *fork) + + // Check att subnets. + attSubnets := new([]byte) + require.NoError(t, localNode.Node().Record().Load(enr.WithEntry(attSubnetEnrKey, attSubnets))) + require.DeepSSZEqual(t, []byte{0, 0, 0, 0, 0, 0, 0, 0}, *attSubnets) + + // Check sync committees subnets. + syncSubnets := new([]byte) + require.NoError(t, localNode.Node().Record().Load(enr.WithEntry(syncCommsSubnetEnrKey, syncSubnets))) + require.DeepSSZEqual(t, []byte{0}, *syncSubnets) + }) + } +} + func TestMultiAddrsConversion_InvalidIPAddr(t *testing.T) { addr := net.ParseIP("invalidIP") _, pkey := createAddrAndPrivKey(t) @@ -310,12 +406,12 @@ func TestMultipleDiscoveryAddresses(t *testing.T) { } func TestCorrectUDPVersion(t *testing.T) { - assert.Equal(t, "udp4", udpVersionFromIP(net.IPv4zero), "incorrect network version") - assert.Equal(t, "udp6", udpVersionFromIP(net.IPv6zero), "incorrect network version") - assert.Equal(t, "udp4", udpVersionFromIP(net.IP{200, 20, 12, 255}), "incorrect network version") - assert.Equal(t, "udp6", udpVersionFromIP(net.IP{22, 23, 24, 251, 17, 18, 0, 0, 0, 0, 12, 14, 212, 213, 16, 22}), "incorrect network version") + assert.Equal(t, udp4, udpVersionFromIP(net.IPv4zero), "incorrect network version") + assert.Equal(t, udp6, udpVersionFromIP(net.IPv6zero), "incorrect network version") + assert.Equal(t, udp4, udpVersionFromIP(net.IP{200, 20, 12, 255}), "incorrect network version") + assert.Equal(t, udp6, udpVersionFromIP(net.IP{22, 23, 24, 251, 17, 18, 0, 0, 0, 0, 12, 14, 212, 213, 16, 22}), "incorrect network version") // v4 in v6 - assert.Equal(t, "udp4", udpVersionFromIP(net.IP{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, 212, 213, 16, 22}), "incorrect network version") + assert.Equal(t, udp4, udpVersionFromIP(net.IP{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, 212, 213, 16, 22}), "incorrect network version") } // addPeer is a helper to add a peer with a given connection state) diff --git a/beacon-chain/p2p/fork_test.go b/beacon-chain/p2p/fork_test.go index 32307eb05..86541c894 100644 --- a/beacon-chain/p2p/fork_test.go +++ b/beacon-chain/p2p/fork_test.go @@ -46,9 +46,9 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) { bootNode := bootListener.Self() cfg := &Config{ - Discv5BootStrapAddr: []string{bootNode.String()}, - UDPPort: uint(port), - StateNotifier: &mock.MockStateNotifier{}, + Discv5BootStrapAddrs: []string{bootNode.String()}, + UDPPort: uint(port), + StateNotifier: &mock.MockStateNotifier{}, } var listeners []*discover.UDPv5 @@ -132,8 +132,8 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) { bootNode := bootListener.Self() cfg := &Config{ - Discv5BootStrapAddr: []string{bootNode.String()}, - UDPPort: uint(port), + Discv5BootStrapAddrs: []string{bootNode.String()}, + UDPPort: uint(port), } var listeners []*discover.UDPv5 diff --git a/beacon-chain/p2p/info.go b/beacon-chain/p2p/info.go index 4ccc7e3dd..4659835f9 100644 --- a/beacon-chain/p2p/info.go +++ b/beacon-chain/p2p/info.go @@ -20,7 +20,7 @@ self=%s %d peers %v `, - s.cfg.BootstrapNodeAddr, + s.cfg.Discv5BootStrapAddrs, s.selfAddresses(), len(s.host.Network().Peers()), formatPeers(s.host), // Must be last. Writes one entry per row. diff --git a/beacon-chain/p2p/options.go b/beacon-chain/p2p/options.go index d75468491..bf5322137 100644 --- a/beacon-chain/p2p/options.go +++ b/beacon-chain/p2p/options.go @@ -31,29 +31,31 @@ func MultiAddressBuilder(ipAddr string, port uint) (ma.Multiaddr, error) { } // buildOptions for the libp2p host. -func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) []libp2p.Option { +func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Option, error) { cfg := s.cfg listen, err := MultiAddressBuilder(ip.String(), cfg.TCPPort) if err != nil { - log.WithError(err).Fatal("Failed to p2p listen") + return nil, errors.Wrapf(err, "cannot produce multiaddr format from %s:%d", ip.String(), cfg.TCPPort) } if cfg.LocalIP != "" { if net.ParseIP(cfg.LocalIP) == nil { - log.Fatalf("Invalid local ip provided: %s", cfg.LocalIP) + return nil, errors.Wrapf(err, "invalid local ip provided: %s:%d", cfg.LocalIP, cfg.TCPPort) } + listen, err = MultiAddressBuilder(cfg.LocalIP, cfg.TCPPort) if err != nil { - log.WithError(err).Fatal("Failed to p2p listen") + return nil, errors.Wrapf(err, "cannot produce multiaddr format from %s:%d", cfg.LocalIP, cfg.TCPPort) } } ifaceKey, err := ecdsaprysm.ConvertToInterfacePrivkey(priKey) if err != nil { - log.WithError(err).Fatal("Failed to retrieve private key") + return nil, errors.Wrap(err, "cannot convert private key to interface private key. (Private key not displayed in logs for security reasons)") } id, err := peer.IDFromPublicKey(ifaceKey.GetPublic()) if err != nil { - log.WithError(err).Fatal("Failed to retrieve peer id") + return nil, errors.Wrapf(err, "cannot get ID from public key: %s", ifaceKey.GetPublic().Type().String()) } + log.Infof("Running node with peer id of %s ", id.String()) options := []libp2p.Option{ @@ -64,10 +66,10 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) []libp2p.Opt libp2p.Transport(tcp.NewTCPTransport), libp2p.DefaultMuxers, libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport), + libp2p.Security(noise.ID, noise.New), + libp2p.Ping(false), // Disable Ping Service. } - options = append(options, libp2p.Security(noise.ID, noise.New)) - if cfg.EnableUPnP { options = append(options, libp2p.NATPortMap()) // Allow to use UPnP } @@ -99,12 +101,11 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) []libp2p.Opt return addrs })) } - // Disable Ping Service. - options = append(options, libp2p.Ping(false)) + if features.Get().DisableResourceManager { options = append(options, libp2p.ResourceManager(&network.NullResourceManager{})) } - return options + return options, nil } func multiAddressBuilderWithID(ipAddr, protocol string, port uint, id peer.ID) (ma.Multiaddr, error) { diff --git a/beacon-chain/p2p/options_test.go b/beacon-chain/p2p/options_test.go index 27e203d4a..07b80e2b6 100644 --- a/beacon-chain/p2p/options_test.go +++ b/beacon-chain/p2p/options_test.go @@ -119,7 +119,9 @@ func TestDefaultMultiplexers(t *testing.T) { svc.privKey, err = privKey(svc.cfg) assert.NoError(t, err) ipAddr := network.IPAddr() - opts := svc.buildOptions(ipAddr, svc.privKey) + opts, err := svc.buildOptions(ipAddr, svc.privKey) + assert.NoError(t, err) + err = cfg.Apply(append(opts, libp2p.FallbackDefaults)...) assert.NoError(t, err) diff --git a/beacon-chain/p2p/peers/peerdata/store_test.go b/beacon-chain/p2p/peers/peerdata/store_test.go index 037b6421b..935e9a50e 100644 --- a/beacon-chain/p2p/peers/peerdata/store_test.go +++ b/beacon-chain/p2p/peers/peerdata/store_test.go @@ -107,5 +107,4 @@ func TestStore_TrustedPeers(t *testing.T) { assert.Equal(t, false, store.IsTrustedPeer(pid1)) assert.Equal(t, false, store.IsTrustedPeer(pid2)) assert.Equal(t, false, store.IsTrustedPeer(pid3)) - } diff --git a/beacon-chain/p2p/peers/scorers/bad_responses.go b/beacon-chain/p2p/peers/scorers/bad_responses.go index 56fa54648..73d74ecfc 100644 --- a/beacon-chain/p2p/peers/scorers/bad_responses.go +++ b/beacon-chain/p2p/peers/scorers/bad_responses.go @@ -56,12 +56,12 @@ func newBadResponsesScorer(store *peerdata.Store, config *BadResponsesScorerConf func (s *BadResponsesScorer) Score(pid peer.ID) float64 { s.store.RLock() defer s.store.RUnlock() - return s.score(pid) + return s.scoreNoLock(pid) } -// score is a lock-free version of Score. -func (s *BadResponsesScorer) score(pid peer.ID) float64 { - if s.isBadPeer(pid) { +// scoreNoLock is a lock-free version of Score. +func (s *BadResponsesScorer) scoreNoLock(pid peer.ID) float64 { + if s.isBadPeerNoLock(pid) { return BadPeerScore } score := float64(0) @@ -87,11 +87,11 @@ func (s *BadResponsesScorer) Params() *BadResponsesScorerConfig { func (s *BadResponsesScorer) Count(pid peer.ID) (int, error) { s.store.RLock() defer s.store.RUnlock() - return s.count(pid) + return s.countNoLock(pid) } -// count is a lock-free version of Count. -func (s *BadResponsesScorer) count(pid peer.ID) (int, error) { +// countNoLock is a lock-free version of Count. +func (s *BadResponsesScorer) countNoLock(pid peer.ID) (int, error) { if peerData, ok := s.store.PeerData(pid); ok { return peerData.BadResponses, nil } @@ -119,11 +119,11 @@ func (s *BadResponsesScorer) Increment(pid peer.ID) { func (s *BadResponsesScorer) IsBadPeer(pid peer.ID) bool { s.store.RLock() defer s.store.RUnlock() - return s.isBadPeer(pid) + return s.isBadPeerNoLock(pid) } -// isBadPeer is lock-free version of IsBadPeer. -func (s *BadResponsesScorer) isBadPeer(pid peer.ID) bool { +// isBadPeerNoLock is lock-free version of IsBadPeer. +func (s *BadResponsesScorer) isBadPeerNoLock(pid peer.ID) bool { if peerData, ok := s.store.PeerData(pid); ok { return peerData.BadResponses >= s.config.Threshold } @@ -137,7 +137,7 @@ func (s *BadResponsesScorer) BadPeers() []peer.ID { badPeers := make([]peer.ID, 0) for pid := range s.store.Peers() { - if s.isBadPeer(pid) { + if s.isBadPeerNoLock(pid) { badPeers = append(badPeers, pid) } } diff --git a/beacon-chain/p2p/peers/scorers/bad_responses_test.go b/beacon-chain/p2p/peers/scorers/bad_responses_test.go index c8ff21a67..186a50f55 100644 --- a/beacon-chain/p2p/peers/scorers/bad_responses_test.go +++ b/beacon-chain/p2p/peers/scorers/bad_responses_test.go @@ -15,6 +15,8 @@ import ( ) func TestScorers_BadResponses_Score(t *testing.T) { + const pid = "peer1" + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -28,15 +30,23 @@ func TestScorers_BadResponses_Score(t *testing.T) { }) scorer := peerStatuses.Scorers().BadResponsesScorer() - assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score for unregistered peer") - scorer.Increment("peer1") - assert.Equal(t, -2.5, scorer.Score("peer1")) - scorer.Increment("peer1") - assert.Equal(t, float64(-5), scorer.Score("peer1")) - scorer.Increment("peer1") - scorer.Increment("peer1") - assert.Equal(t, -100.0, scorer.Score("peer1")) - assert.Equal(t, true, scorer.IsBadPeer("peer1")) + assert.Equal(t, 0., scorer.Score(pid), "Unexpected score for unregistered peer") + + scorer.Increment(pid) + assert.Equal(t, false, scorer.IsBadPeer(pid)) + assert.Equal(t, -2.5, scorer.Score(pid)) + + scorer.Increment(pid) + assert.Equal(t, false, scorer.IsBadPeer(pid)) + assert.Equal(t, float64(-5), scorer.Score(pid)) + + scorer.Increment(pid) + assert.Equal(t, false, scorer.IsBadPeer(pid)) + assert.Equal(t, float64(-7.5), scorer.Score(pid)) + + scorer.Increment(pid) + assert.Equal(t, true, scorer.IsBadPeer(pid)) + assert.Equal(t, -100.0, scorer.Score(pid)) } func TestScorers_BadResponses_ParamsThreshold(t *testing.T) { diff --git a/beacon-chain/p2p/peers/scorers/block_providers.go b/beacon-chain/p2p/peers/scorers/block_providers.go index 03b3ff2bf..649ff5700 100644 --- a/beacon-chain/p2p/peers/scorers/block_providers.go +++ b/beacon-chain/p2p/peers/scorers/block_providers.go @@ -98,11 +98,11 @@ func newBlockProviderScorer(store *peerdata.Store, config *BlockProviderScorerCo func (s *BlockProviderScorer) Score(pid peer.ID) float64 { s.store.RLock() defer s.store.RUnlock() - return s.score(pid) + return s.scoreNoLock(pid) } -// score is a lock-free version of Score. -func (s *BlockProviderScorer) score(pid peer.ID) float64 { +// scoreNoLock is a lock-free version of Score. +func (s *BlockProviderScorer) scoreNoLock(pid peer.ID) float64 { score := float64(0) peerData, ok := s.store.PeerData(pid) // Boost score of new peers or peers that haven't been accessed for too long. @@ -126,7 +126,7 @@ func (s *BlockProviderScorer) Params() *BlockProviderScorerConfig { func (s *BlockProviderScorer) IncrementProcessedBlocks(pid peer.ID, cnt uint64) { s.store.Lock() defer s.store.Unlock() - defer s.touch(pid) + defer s.touchNoLock(pid) if cnt <= 0 { return @@ -145,11 +145,11 @@ func (s *BlockProviderScorer) IncrementProcessedBlocks(pid peer.ID, cnt uint64) func (s *BlockProviderScorer) Touch(pid peer.ID, t ...time.Time) { s.store.Lock() defer s.store.Unlock() - s.touch(pid, t...) + s.touchNoLock(pid, t...) } -// touch is a lock-free version of Touch. -func (s *BlockProviderScorer) touch(pid peer.ID, t ...time.Time) { +// touchNoLock is a lock-free version of Touch. +func (s *BlockProviderScorer) touchNoLock(pid peer.ID, t ...time.Time) { peerData := s.store.PeerDataGetOrCreate(pid) if len(t) == 1 { peerData.BlockProviderUpdated = t[0] @@ -162,11 +162,11 @@ func (s *BlockProviderScorer) touch(pid peer.ID, t ...time.Time) { func (s *BlockProviderScorer) ProcessedBlocks(pid peer.ID) uint64 { s.store.RLock() defer s.store.RUnlock() - return s.processedBlocks(pid) + return s.processedBlocksNoLock(pid) } -// processedBlocks is a lock-free version of ProcessedBlocks. -func (s *BlockProviderScorer) processedBlocks(pid peer.ID) uint64 { +// processedBlocksNoLock is a lock-free version of ProcessedBlocks. +func (s *BlockProviderScorer) processedBlocksNoLock(pid peer.ID) uint64 { if peerData, ok := s.store.PeerData(pid); ok { return peerData.ProcessedBlocks } @@ -177,13 +177,13 @@ func (s *BlockProviderScorer) processedBlocks(pid peer.ID) uint64 { // Block provider scorer cannot guarantee that lower score of a peer is indeed a sign of a bad peer. // Therefore this scorer never marks peers as bad, and relies on scores to probabilistically sort // out low-scorers (see WeightSorted method). -func (_ *BlockProviderScorer) IsBadPeer(_ peer.ID) bool { +func (*BlockProviderScorer) IsBadPeer(_ peer.ID) bool { return false } // BadPeers returns the peers that are considered bad. // No peers are considered bad by block providers scorer. -func (_ *BlockProviderScorer) BadPeers() []peer.ID { +func (*BlockProviderScorer) BadPeers() []peer.ID { return []peer.ID{} } @@ -277,9 +277,9 @@ func (s *BlockProviderScorer) mapScoresAndPeers( peers := make([]peer.ID, len(pids)) for i, pid := range pids { if scoreFn != nil { - scores[pid] = scoreFn(pid, s.score(pid)) + scores[pid] = scoreFn(pid, s.scoreNoLock(pid)) } else { - scores[pid] = s.score(pid) + scores[pid] = s.scoreNoLock(pid) } peers[i] = pid } @@ -293,9 +293,9 @@ func (s *BlockProviderScorer) FormatScorePretty(pid peer.ID) string { if !features.Get().EnablePeerScorer { return "disabled" } - score := s.score(pid) + score := s.scoreNoLock(pid) return fmt.Sprintf("[%0.1f%%, raw: %0.2f, blocks: %d/%d]", - (score/s.MaxScore())*100, score, s.processedBlocks(pid), s.config.ProcessedBlocksCap) + (score/s.MaxScore())*100, score, s.processedBlocksNoLock(pid), s.config.ProcessedBlocksCap) } // MaxScore exposes maximum score attainable by peers. diff --git a/beacon-chain/p2p/peers/scorers/gossip_scorer.go b/beacon-chain/p2p/peers/scorers/gossip_scorer.go index 5b557c3a2..5482ebde7 100644 --- a/beacon-chain/p2p/peers/scorers/gossip_scorer.go +++ b/beacon-chain/p2p/peers/scorers/gossip_scorer.go @@ -38,11 +38,11 @@ func newGossipScorer(store *peerdata.Store, config *GossipScorerConfig) *GossipS func (s *GossipScorer) Score(pid peer.ID) float64 { s.store.RLock() defer s.store.RUnlock() - return s.score(pid) + return s.scoreNoLock(pid) } -// score is a lock-free version of Score. -func (s *GossipScorer) score(pid peer.ID) float64 { +// scoreNoLock is a lock-free version of Score. +func (s *GossipScorer) scoreNoLock(pid peer.ID) float64 { peerData, ok := s.store.PeerData(pid) if !ok { return 0 @@ -54,11 +54,11 @@ func (s *GossipScorer) score(pid peer.ID) float64 { func (s *GossipScorer) IsBadPeer(pid peer.ID) bool { s.store.RLock() defer s.store.RUnlock() - return s.isBadPeer(pid) + return s.isBadPeerNoLock(pid) } -// isBadPeer is lock-free version of IsBadPeer. -func (s *GossipScorer) isBadPeer(pid peer.ID) bool { +// isBadPeerNoLock is lock-free version of IsBadPeer. +func (s *GossipScorer) isBadPeerNoLock(pid peer.ID) bool { peerData, ok := s.store.PeerData(pid) if !ok { return false @@ -73,7 +73,7 @@ func (s *GossipScorer) BadPeers() []peer.ID { badPeers := make([]peer.ID, 0) for pid := range s.store.Peers() { - if s.isBadPeer(pid) { + if s.isBadPeerNoLock(pid) { badPeers = append(badPeers, pid) } } @@ -98,11 +98,11 @@ func (s *GossipScorer) SetGossipData(pid peer.ID, gScore float64, func (s *GossipScorer) GossipData(pid peer.ID) (float64, float64, map[string]*pbrpc.TopicScoreSnapshot, error) { s.store.RLock() defer s.store.RUnlock() - return s.gossipData(pid) + return s.gossipDataNoLock(pid) } -// gossipData lock-free version of GossipData. -func (s *GossipScorer) gossipData(pid peer.ID) (float64, float64, map[string]*pbrpc.TopicScoreSnapshot, error) { +// gossipDataNoLock lock-free version of GossipData. +func (s *GossipScorer) gossipDataNoLock(pid peer.ID) (float64, float64, map[string]*pbrpc.TopicScoreSnapshot, error) { if peerData, ok := s.store.PeerData(pid); ok { return peerData.GossipScore, peerData.BehaviourPenalty, peerData.TopicScores, nil } diff --git a/beacon-chain/p2p/peers/scorers/peer_status.go b/beacon-chain/p2p/peers/scorers/peer_status.go index 8468b2798..5153c0c78 100644 --- a/beacon-chain/p2p/peers/scorers/peer_status.go +++ b/beacon-chain/p2p/peers/scorers/peer_status.go @@ -41,12 +41,12 @@ func newPeerStatusScorer(store *peerdata.Store, config *PeerStatusScorerConfig) func (s *PeerStatusScorer) Score(pid peer.ID) float64 { s.store.RLock() defer s.store.RUnlock() - return s.score(pid) + return s.scoreNoLock(pid) } -// score is a lock-free version of Score. -func (s *PeerStatusScorer) score(pid peer.ID) float64 { - if s.isBadPeer(pid) { +// scoreNoLock is a lock-free version of Score. +func (s *PeerStatusScorer) scoreNoLock(pid peer.ID) float64 { + if s.isBadPeerNoLock(pid) { return BadPeerScore } score := float64(0) @@ -70,11 +70,11 @@ func (s *PeerStatusScorer) score(pid peer.ID) float64 { func (s *PeerStatusScorer) IsBadPeer(pid peer.ID) bool { s.store.RLock() defer s.store.RUnlock() - return s.isBadPeer(pid) + return s.isBadPeerNoLock(pid) } -// isBadPeer is lock-free version of IsBadPeer. -func (s *PeerStatusScorer) isBadPeer(pid peer.ID) bool { +// isBadPeerNoLock is lock-free version of IsBadPeer. +func (s *PeerStatusScorer) isBadPeerNoLock(pid peer.ID) bool { peerData, ok := s.store.PeerData(pid) if !ok { return false @@ -100,7 +100,7 @@ func (s *PeerStatusScorer) BadPeers() []peer.ID { badPeers := make([]peer.ID, 0) for pid := range s.store.Peers() { - if s.isBadPeer(pid) { + if s.isBadPeerNoLock(pid) { badPeers = append(badPeers, pid) } } @@ -129,11 +129,11 @@ func (s *PeerStatusScorer) SetPeerStatus(pid peer.ID, chainState *pb.Status, val func (s *PeerStatusScorer) PeerStatus(pid peer.ID) (*pb.Status, error) { s.store.RLock() defer s.store.RUnlock() - return s.peerStatus(pid) + return s.peerStatusNoLock(pid) } -// peerStatus lock-free version of PeerStatus. -func (s *PeerStatusScorer) peerStatus(pid peer.ID) (*pb.Status, error) { +// peerStatusNoLock lock-free version of PeerStatus. +func (s *PeerStatusScorer) peerStatusNoLock(pid peer.ID) (*pb.Status, error) { if peerData, ok := s.store.PeerData(pid); ok { if peerData.ChainState == nil { return nil, peerdata.ErrNoPeerStatus diff --git a/beacon-chain/p2p/peers/scorers/service.go b/beacon-chain/p2p/peers/scorers/service.go index 765a9f1cd..4ae91fc49 100644 --- a/beacon-chain/p2p/peers/scorers/service.go +++ b/beacon-chain/p2p/peers/scorers/service.go @@ -116,10 +116,10 @@ func (s *Service) ScoreNoLock(pid peer.ID) float64 { if _, ok := s.store.PeerData(pid); !ok { return 0 } - score += s.scorers.badResponsesScorer.score(pid) * s.scorerWeight(s.scorers.badResponsesScorer) - score += s.scorers.blockProviderScorer.score(pid) * s.scorerWeight(s.scorers.blockProviderScorer) - score += s.scorers.peerStatusScorer.score(pid) * s.scorerWeight(s.scorers.peerStatusScorer) - score += s.scorers.gossipScorer.score(pid) * s.scorerWeight(s.scorers.gossipScorer) + score += s.scorers.badResponsesScorer.scoreNoLock(pid) * s.scorerWeight(s.scorers.badResponsesScorer) + score += s.scorers.blockProviderScorer.scoreNoLock(pid) * s.scorerWeight(s.scorers.blockProviderScorer) + score += s.scorers.peerStatusScorer.scoreNoLock(pid) * s.scorerWeight(s.scorers.peerStatusScorer) + score += s.scorers.gossipScorer.scoreNoLock(pid) * s.scorerWeight(s.scorers.gossipScorer) return math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor } @@ -132,14 +132,14 @@ func (s *Service) IsBadPeer(pid peer.ID) bool { // IsBadPeerNoLock is a lock-free version of IsBadPeer. func (s *Service) IsBadPeerNoLock(pid peer.ID) bool { - if s.scorers.badResponsesScorer.isBadPeer(pid) { + if s.scorers.badResponsesScorer.isBadPeerNoLock(pid) { return true } - if s.scorers.peerStatusScorer.isBadPeer(pid) { + if s.scorers.peerStatusScorer.isBadPeerNoLock(pid) { return true } if features.Get().EnablePeerScorer { - if s.scorers.gossipScorer.isBadPeer(pid) { + if s.scorers.gossipScorer.isBadPeerNoLock(pid) { return true } } diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index ea5cd10ee..a70ede6db 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -82,44 +82,49 @@ type Service struct { // NewService initializes a new p2p service compatible with shared.Service interface. No // connections are made until the Start function is called during the service registry startup. func NewService(ctx context.Context, cfg *Config) (*Service, error) { - var err error ctx, cancel := context.WithCancel(ctx) _ = cancel // govet fix for lost cancel. Cancel is handled in service.Stop(). + cfg = validateConfig(cfg) + privKey, err := privKey(cfg) + if err != nil { + return nil, errors.Wrapf(err, "failed to generate p2p private key") + } + + metaData, err := metaDataFromConfig(cfg) + if err != nil { + log.WithError(err).Error("Failed to create peer metadata") + return nil, err + } + + addrFilter, err := configureFilter(cfg) + if err != nil { + log.WithError(err).Error("Failed to create address filter") + return nil, err + } + + ipLimiter := leakybucket.NewCollector(ipLimit, ipBurst, 30*time.Second, true /* deleteEmptyBuckets */) + s := &Service{ ctx: ctx, cancel: cancel, cfg: cfg, + addrFilter: addrFilter, + ipLimiter: ipLimiter, + privKey: privKey, + metaData: metaData, isPreGenesis: true, joinedTopics: make(map[string]*pubsub.Topic, len(gossipTopicMappings)), subnetsLock: make(map[uint64]*sync.RWMutex), } - s.cfg = validateConfig(s.cfg) - - dv5Nodes := parseBootStrapAddrs(s.cfg.BootstrapNodeAddr) - - cfg.Discv5BootStrapAddr = dv5Nodes - ipAddr := prysmnetwork.IPAddr() - s.privKey, err = privKey(s.cfg) - if err != nil { - log.WithError(err).Error("Failed to generate p2p private key") - return nil, err - } - s.metaData, err = metaDataFromConfig(s.cfg) - if err != nil { - log.WithError(err).Error("Failed to create peer metadata") - return nil, err - } - s.addrFilter, err = configureFilter(s.cfg) - if err != nil { - log.WithError(err).Error("Failed to create address filter") - return nil, err - } - s.ipLimiter = leakybucket.NewCollector(ipLimit, ipBurst, 30*time.Second, true /* deleteEmptyBuckets */) - opts := s.buildOptions(ipAddr, s.privKey) + opts, err := s.buildOptions(ipAddr, s.privKey) + if err != nil { + return nil, errors.Wrapf(err, "failed to build p2p options") + } + h, err := libp2p.New(opts...) if err != nil { log.WithError(err).Error("Failed to create p2p host") @@ -285,7 +290,7 @@ func (s *Service) Started() bool { } // Encoding returns the configured networking encoding. -func (_ *Service) Encoding() encoder.NetworkEncoding { +func (*Service) Encoding() encoder.NetworkEncoding { return &encoder.SszNetworkEncoder{} } @@ -451,8 +456,8 @@ func (s *Service) connectWithPeer(ctx context.Context, info peer.AddrInfo) error } func (s *Service) connectToBootnodes() error { - nodes := make([]*enode.Node, 0, len(s.cfg.Discv5BootStrapAddr)) - for _, addr := range s.cfg.Discv5BootStrapAddr { + nodes := make([]*enode.Node, 0, len(s.cfg.Discv5BootStrapAddrs)) + for _, addr := range s.cfg.Discv5BootStrapAddrs { bootNode, err := enode.Parse(enode.ValidSchemes, addr) if err != nil { return err diff --git a/beacon-chain/p2p/service_test.go b/beacon-chain/p2p/service_test.go index 0e3dcf974..7eabd44a3 100644 --- a/beacon-chain/p2p/service_test.go +++ b/beacon-chain/p2p/service_test.go @@ -213,10 +213,9 @@ func TestListenForNewNodes(t *testing.T) { // setup other nodes. cs := startup.NewClockSynchronizer() cfg = &Config{ - BootstrapNodeAddr: []string{bootNode.String()}, - Discv5BootStrapAddr: []string{bootNode.String()}, - MaxPeers: 30, - ClockWaiter: cs, + Discv5BootStrapAddrs: []string{bootNode.String()}, + MaxPeers: 30, + ClockWaiter: cs, } for i := 1; i <= 5; i++ { h, pkey, ipAddr := createHost(t, port+i) diff --git a/beacon-chain/p2p/subnets_test.go b/beacon-chain/p2p/subnets_test.go index ccfa18d34..d5fc81168 100644 --- a/beacon-chain/p2p/subnets_test.go +++ b/beacon-chain/p2p/subnets_test.go @@ -57,10 +57,9 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) { for i := 1; i <= 3; i++ { port = 3000 + i cfg := &Config{ - BootstrapNodeAddr: []string{bootNode.String()}, - Discv5BootStrapAddr: []string{bootNode.String()}, - MaxPeers: 30, - UDPPort: uint(port), + Discv5BootStrapAddrs: []string{bootNode.String()}, + MaxPeers: 30, + UDPPort: uint(port), } ipAddr, pkey := createAddrAndPrivKey(t) s = &Service{ @@ -88,11 +87,10 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) { port = 4001 gs := startup.NewClockSynchronizer() cfg := &Config{ - BootstrapNodeAddr: []string{bootNode.String()}, - Discv5BootStrapAddr: []string{bootNode.String()}, - MaxPeers: 30, - UDPPort: uint(port), - ClockWaiter: gs, + Discv5BootStrapAddrs: []string{bootNode.String()}, + MaxPeers: 30, + UDPPort: uint(port), + ClockWaiter: gs, } s, err = NewService(context.Background(), cfg) require.NoError(t, err) diff --git a/beacon-chain/p2p/utils.go b/beacon-chain/p2p/utils.go index 8fe98f5fb..3295423b8 100644 --- a/beacon-chain/p2p/utils.go +++ b/beacon-chain/p2p/utils.go @@ -54,38 +54,45 @@ func privKey(cfg *Config) (*ecdsa.PrivateKey, error) { return privKeyFromFile(cfg.PrivateKey) } + // Default keys have the next highest precedence, if they exist. _, err := os.Stat(defaultKeyPath) defaultKeysExist := !os.IsNotExist(err) if err != nil && defaultKeysExist { return nil, err } - // Default keys have the next highest precedence, if they exist. + if defaultKeysExist { return privKeyFromFile(defaultKeyPath) } + // There are no keys on the filesystem, so we need to generate one. priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) if err != nil { return nil, err } - // If the StaticPeerID flag is set, save the generated key as the default - // key, so that it will be used by default on the next node start. - if cfg.StaticPeerID { - rawbytes, err := priv.Raw() - if err != nil { - return nil, err - } - dst := make([]byte, hex.EncodedLen(len(rawbytes))) - hex.Encode(dst, rawbytes) - if err := file.WriteFile(defaultKeyPath, dst); err != nil { - return nil, err - } - log.Infof("Wrote network key to file") - // Read the key from the defaultKeyPath file just written - // for the strongest guarantee that the next start will be the same as this one. - return privKeyFromFile(defaultKeyPath) + + // If the StaticPeerID flag is not set, return the private key. + if !cfg.StaticPeerID { + return ecdsaprysm.ConvertFromInterfacePrivKey(priv) } - return ecdsaprysm.ConvertFromInterfacePrivKey(priv) + + // Save the generated key as the default key, so that it will be used by + // default on the next node start. + rawbytes, err := priv.Raw() + if err != nil { + return nil, err + } + + dst := make([]byte, hex.EncodedLen(len(rawbytes))) + hex.Encode(dst, rawbytes) + if err := file.WriteFile(defaultKeyPath, dst); err != nil { + return nil, err + } + + log.Info("Wrote network key to file") + // Read the key from the defaultKeyPath file just written + // for the strongest guarantee that the next start will be the same as this one. + return privKeyFromFile(defaultKeyPath) } // Retrieves a p2p networking private key from a file path. diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 5bdbeb10c..ee7691a2b 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -321,7 +321,7 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p } } -// subscribe to a static subnet with the given topic and index.A given validator and subscription handler is +// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is // used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding. func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte, subnetCount uint64) { genRoot := s.cfg.clock.GenesisValidatorsRoot() diff --git a/cmd/beacon-chain/BUILD.bazel b/cmd/beacon-chain/BUILD.bazel index 826d4622e..92a0b7d04 100644 --- a/cmd/beacon-chain/BUILD.bazel +++ b/cmd/beacon-chain/BUILD.bazel @@ -38,6 +38,7 @@ go_library( "@com_github_ethereum_go_ethereum//log:go_default_library", "@com_github_ipfs_go_log_v2//:go_default_library", "@com_github_joonix_log//:go_default_library", + "@com_github_pkg_errors//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_github_urfave_cli_v2//:go_default_library", ], diff --git a/cmd/beacon-chain/main.go b/cmd/beacon-chain/main.go index f2a32e360..4f420ac17 100644 --- a/cmd/beacon-chain/main.go +++ b/cmd/beacon-chain/main.go @@ -11,6 +11,7 @@ import ( gethlog "github.com/ethereum/go-ethereum/log" golog "github.com/ipfs/go-log/v2" joonix "github.com/joonix/log" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/builder" "github.com/prysmaticlabs/prysm/v5/beacon-chain/node" "github.com/prysmaticlabs/prysm/v5/cmd" @@ -149,78 +150,89 @@ func init() { appFlags = cmd.WrapFlags(append(appFlags, features.BeaconChainFlags...)) } +func before(ctx *cli.Context) error { + // Load flags from config file, if specified. + if err := cmd.LoadFlagsFromConfig(ctx, appFlags); err != nil { + return errors.Wrap(err, "failed to load flags from config file") + } + + format := ctx.String(cmd.LogFormat.Name) + + switch format { + case "text": + formatter := new(prefixed.TextFormatter) + formatter.TimestampFormat = "2006-01-02 15:04:05" + formatter.FullTimestamp = true + + // If persistent log files are written - we disable the log messages coloring because + // the colors are ANSI codes and seen as gibberish in the log files. + formatter.DisableColors = ctx.String(cmd.LogFileName.Name) != "" + logrus.SetFormatter(formatter) + case "fluentd": + f := joonix.NewFormatter() + + if err := joonix.DisableTimestampFormat(f); err != nil { + panic(err) + } + + logrus.SetFormatter(f) + case "json": + logrus.SetFormatter(&logrus.JSONFormatter{}) + case "journald": + if err := journald.Enable(); err != nil { + return err + } + default: + return fmt.Errorf("unknown log format %s", format) + } + + logFileName := ctx.String(cmd.LogFileName.Name) + if logFileName != "" { + if err := logs.ConfigurePersistentLogging(logFileName); err != nil { + log.WithError(err).Error("Failed to configuring logging to disk.") + } + } + + if err := cmd.ExpandSingleEndpointIfFile(ctx, flags.ExecutionEngineEndpoint); err != nil { + return errors.Wrap(err, "failed to expand single endpoint") + } + + if ctx.IsSet(flags.SetGCPercent.Name) { + runtimeDebug.SetGCPercent(ctx.Int(flags.SetGCPercent.Name)) + } + + if err := debug.Setup(ctx); err != nil { + return errors.Wrap(err, "failed to setup debug") + } + + if err := fdlimits.SetMaxFdLimits(); err != nil { + return errors.Wrap(err, "failed to set max fd limits") + } + + return cmd.ValidateNoArgs(ctx) +} + func main() { // rctx = root context with cancellation. // note other instances of ctx in this func are *cli.Context. rctx, cancel := context.WithCancel(context.Background()) - app := cli.App{} - app.Name = "beacon-chain" - app.Usage = "this is a beacon chain implementation for Ethereum" - app.Action = func(ctx *cli.Context) error { - if err := startNode(ctx, cancel); err != nil { - return cli.Exit(err.Error(), 1) - } - return nil - } - app.Version = version.Version() - app.Commands = []*cli.Command{ - dbcommands.Commands, - jwtcommands.Commands, - } - - app.Flags = appFlags - - app.Before = func(ctx *cli.Context) error { - // Load flags from config file, if specified. - if err := cmd.LoadFlagsFromConfig(ctx, app.Flags); err != nil { - return err - } - - format := ctx.String(cmd.LogFormat.Name) - switch format { - case "text": - formatter := new(prefixed.TextFormatter) - formatter.TimestampFormat = "2006-01-02 15:04:05" - formatter.FullTimestamp = true - // If persistent log files are written - we disable the log messages coloring because - // the colors are ANSI codes and seen as gibberish in the log files. - formatter.DisableColors = ctx.String(cmd.LogFileName.Name) != "" - logrus.SetFormatter(formatter) - case "fluentd": - f := joonix.NewFormatter() - if err := joonix.DisableTimestampFormat(f); err != nil { - panic(err) - } - logrus.SetFormatter(f) - case "json": - logrus.SetFormatter(&logrus.JSONFormatter{}) - case "journald": - if err := journald.Enable(); err != nil { + app := cli.App{ + Name: "beacon-chain", + Usage: "this is a beacon chain implementation for Ethereum", + Action: func(ctx *cli.Context) error { + if err := startNode(ctx, cancel); err != nil { + log.Fatal(err.Error()) return err } - default: - return fmt.Errorf("unknown log format %s", format) - } - - logFileName := ctx.String(cmd.LogFileName.Name) - if logFileName != "" { - if err := logs.ConfigurePersistentLogging(logFileName); err != nil { - log.WithError(err).Error("Failed to configuring logging to disk.") - } - } - if err := cmd.ExpandSingleEndpointIfFile(ctx, flags.ExecutionEngineEndpoint); err != nil { - return err - } - if ctx.IsSet(flags.SetGCPercent.Name) { - runtimeDebug.SetGCPercent(ctx.Int(flags.SetGCPercent.Name)) - } - if err := debug.Setup(ctx); err != nil { - return err - } - if err := fdlimits.SetMaxFdLimits(); err != nil { - return err - } - return cmd.ValidateNoArgs(ctx) + return nil + }, + Version: version.Version(), + Commands: []*cli.Command{ + dbcommands.Commands, + jwtcommands.Commands, + }, + Flags: appFlags, + Before: before, } defer func() {