From 100ca0ebaf00b1f073eff855caf500c63dcb9bc6 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Wed, 17 Aug 2022 06:38:57 +0000 Subject: [PATCH] Prysmctl Command to Request Beacon Nodes for Block Ranges Over P2P (#11035) * first * attempt p2p connect send tool * attempt * stream registration * trying to register * attempt * workinnnn * begin * p2p prysmctl tool * ignore * fix * delete deprecated * p2p smaller iface surface area * further p2p refactor * gaz * better logging * process * all functionality * fix up * rhandle * v2 req * cmd * send sub * v1 handle * show head slot * cmd * cmd lib * gazelle fix * bazel * gaz * work on the handshake items * prevent dial to self * add config awareness * gaz * inferring host addrs from p2p * initialize data mappings * add own mock * fix up logic * gaz * add img * gaz * add images * builds * builds * nishant feedback: Co-authored-by: Nishant Das --- beacon-chain/blockchain/chain_info.go | 4 +- beacon-chain/forkchoice/BUILD.bazel | 1 + beacon-chain/p2p/BUILD.bazel | 1 + beacon-chain/p2p/discovery.go | 49 ++--- beacon-chain/p2p/encoder/BUILD.bazel | 1 + beacon-chain/p2p/interfaces.go | 9 +- beacon-chain/p2p/log.go | 2 +- beacon-chain/p2p/options.go | 29 +-- beacon-chain/p2p/options_test.go | 3 +- beacon-chain/p2p/peers/BUILD.bazel | 5 +- beacon-chain/p2p/service.go | 7 +- beacon-chain/p2p/types/BUILD.bazel | 1 + beacon-chain/p2p/utils.go | 10 - beacon-chain/sync/BUILD.bazel | 1 + beacon-chain/sync/context.go | 6 +- beacon-chain/sync/rpc_chunked_response.go | 8 +- beacon-chain/sync/rpc_send_request.go | 2 +- cmd/prysmctl/BUILD.bazel | 96 +++++++++ cmd/prysmctl/main.go | 2 + cmd/prysmctl/p2p/BUILD.bazel | 56 ++++++ cmd/prysmctl/p2p/client.go | 230 ++++++++++++++++++++++ cmd/prysmctl/p2p/handler.go | 102 ++++++++++ cmd/prysmctl/p2p/handshake.go | 81 ++++++++ cmd/prysmctl/p2p/log.go | 5 + cmd/prysmctl/p2p/mock_chain.go | 36 ++++ cmd/prysmctl/p2p/p2p.go | 17 ++ cmd/prysmctl/p2p/peers.go | 28 +++ cmd/prysmctl/p2p/request_blocks.go | 221 +++++++++++++++++++++ network/external_ip.go | 9 + 29 files changed, 956 insertions(+), 66 deletions(-) create mode 100644 cmd/prysmctl/p2p/BUILD.bazel create mode 100644 cmd/prysmctl/p2p/client.go create mode 100644 cmd/prysmctl/p2p/handler.go create mode 100644 cmd/prysmctl/p2p/handshake.go create mode 100644 cmd/prysmctl/p2p/log.go create mode 100644 cmd/prysmctl/p2p/mock_chain.go create mode 100644 cmd/prysmctl/p2p/p2p.go create mode 100644 cmd/prysmctl/p2p/peers.go create mode 100644 cmd/prysmctl/p2p/request_blocks.go diff --git a/beacon-chain/blockchain/chain_info.go b/beacon-chain/blockchain/chain_info.go index bcef4c8b5..693933eb6 100644 --- a/beacon-chain/blockchain/chain_info.go +++ b/beacon-chain/blockchain/chain_info.go @@ -25,10 +25,8 @@ import ( type ChainInfoFetcher interface { HeadFetcher FinalizationFetcher - GenesisFetcher CanonicalFetcher ForkFetcher - TimeFetcher HeadDomainFetcher } @@ -70,6 +68,8 @@ type HeadFetcher interface { type ForkFetcher interface { ForkChoicer() forkchoice.ForkChoicer CurrentFork() *ethpb.Fork + GenesisFetcher + TimeFetcher } // CanonicalFetcher retrieves the current chain's canonical information. diff --git a/beacon-chain/forkchoice/BUILD.bazel b/beacon-chain/forkchoice/BUILD.bazel index d7ee672b9..5a1767c99 100644 --- a/beacon-chain/forkchoice/BUILD.bazel +++ b/beacon-chain/forkchoice/BUILD.bazel @@ -10,6 +10,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/v3/beacon-chain/forkchoice", visibility = [ "//beacon-chain:__subpackages__", + "//cmd:__subpackages__", "//testing/spectest:__subpackages__", ], deps = [ diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 4e794488f..80fa0343f 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -35,6 +35,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p", visibility = [ "//beacon-chain:__subpackages__", + "//cmd:__subpackages__", "//testing/endtoend/evaluators:__pkg__", "//tools:__subpackages__", ], diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 572ab70eb..9fecac7bf 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -332,6 +332,31 @@ func (s *Service) isPeerAtLimit(inbound bool) bool { return activePeers >= maxPeers || numOfConns >= maxPeers } +// PeersFromStringAddrs convers peer raw ENRs into multiaddrs for p2p. +func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) { + var allAddrs []ma.Multiaddr + enodeString, multiAddrString := parseGenericAddrs(addrs) + for _, stringAddr := range multiAddrString { + addr, err := multiAddrFromString(stringAddr) + if err != nil { + return nil, errors.Wrapf(err, "Could not get multiaddr from string") + } + allAddrs = append(allAddrs, addr) + } + for _, stringAddr := range enodeString { + enodeAddr, err := enode.Parse(enode.ValidSchemes, stringAddr) + if err != nil { + return nil, errors.Wrapf(err, "Could not get enode from string") + } + addr, err := convertToSingleMultiAddr(enodeAddr) + if err != nil { + return nil, errors.Wrapf(err, "Could not get multiaddr") + } + allAddrs = append(allAddrs, addr) + } + return allAddrs, nil +} + func parseBootStrapAddrs(addrs []string) (discv5Nodes []string) { discv5Nodes, _ = parseGenericAddrs(addrs) if len(discv5Nodes) == 0 { @@ -435,30 +460,6 @@ func convertToUdpMultiAddr(node *enode.Node) ([]ma.Multiaddr, error) { return addresses, nil } -func peersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) { - var allAddrs []ma.Multiaddr - enodeString, multiAddrString := parseGenericAddrs(addrs) - for _, stringAddr := range multiAddrString { - addr, err := multiAddrFromString(stringAddr) - if err != nil { - return nil, errors.Wrapf(err, "Could not get multiaddr from string") - } - allAddrs = append(allAddrs, addr) - } - for _, stringAddr := range enodeString { - enodeAddr, err := enode.Parse(enode.ValidSchemes, stringAddr) - if err != nil { - return nil, errors.Wrapf(err, "Could not get enode from string") - } - addr, err := convertToSingleMultiAddr(enodeAddr) - if err != nil { - return nil, errors.Wrapf(err, "Could not get multiaddr") - } - allAddrs = append(allAddrs, addr) - } - return allAddrs, nil -} - func multiAddrFromString(address string) (ma.Multiaddr, error) { return ma.NewMultiaddr(address) } diff --git a/beacon-chain/p2p/encoder/BUILD.bazel b/beacon-chain/p2p/encoder/BUILD.bazel index ca9ee1886..b22c33872 100644 --- a/beacon-chain/p2p/encoder/BUILD.bazel +++ b/beacon-chain/p2p/encoder/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder", visibility = [ "//beacon-chain:__subpackages__", + "//cmd:__subpackages__", ], deps = [ "//config/params:go_default_library", diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index da989d09a..329f519f1 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -21,11 +21,10 @@ import ( type P2P interface { Broadcaster SetStreamHandler - EncodingProvider PubSubProvider PubSubTopicUser + SenderEncoder PeerManager - Sender ConnectionHandler PeersProvider MetadataProvider @@ -59,6 +58,12 @@ type ConnectionHandler interface { connmgr.ConnectionGater } +// SenderEncoder allows sending functionality from libp2p as well as encoding for requests and responses. +type SenderEncoder interface { + EncodingProvider + Sender +} + // EncodingProvider provides p2p network encoding. type EncodingProvider interface { Encoding() encoder.NetworkEncoding diff --git a/beacon-chain/p2p/log.go b/beacon-chain/p2p/log.go index b29ab5f16..75240205a 100644 --- a/beacon-chain/p2p/log.go +++ b/beacon-chain/p2p/log.go @@ -29,7 +29,7 @@ func logIPAddr(id peer.ID, addrs ...ma.Multiaddr) { func logExternalIPAddr(id peer.ID, addr string, port uint) { if addr != "" { - multiAddr, err := multiAddressBuilder(addr, port) + multiAddr, err := MultiAddressBuilder(addr, port) if err != nil { log.WithError(err).Error("Could not create multiaddress") return diff --git a/beacon-chain/p2p/options.go b/beacon-chain/p2p/options.go index d8f093601..0fab1e2fe 100644 --- a/beacon-chain/p2p/options.go +++ b/beacon-chain/p2p/options.go @@ -16,10 +16,22 @@ import ( "github.com/prysmaticlabs/prysm/v3/runtime/version" ) +// MultiAddressBuilder takes in an ip address string and port to produce a go multiaddr format. +func MultiAddressBuilder(ipAddr string, port uint) (ma.Multiaddr, error) { + parsedIP := net.ParseIP(ipAddr) + if parsedIP.To4() == nil && parsedIP.To16() == nil { + return nil, errors.Errorf("invalid ip address provided: %s", ipAddr) + } + if parsedIP.To4() != nil { + return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port)) + } + return ma.NewMultiaddr(fmt.Sprintf("/ip6/%s/tcp/%d", ipAddr, port)) +} + // buildOptions for the libp2p host. func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) []libp2p.Option { cfg := s.cfg - listen, err := multiAddressBuilder(ip.String(), cfg.TCPPort) + listen, err := MultiAddressBuilder(ip.String(), cfg.TCPPort) if err != nil { log.WithError(err).Fatal("Failed to p2p listen") } @@ -27,7 +39,7 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) []libp2p.Opt if net.ParseIP(cfg.LocalIP) == nil { log.Fatalf("Invalid local ip provided: %s", cfg.LocalIP) } - listen, err = multiAddressBuilder(cfg.LocalIP, cfg.TCPPort) + listen, err = MultiAddressBuilder(cfg.LocalIP, cfg.TCPPort) if err != nil { log.WithError(err).Fatal("Failed to p2p listen") } @@ -65,7 +77,7 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) []libp2p.Opt } if cfg.HostAddress != "" { options = append(options, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { - external, err := multiAddressBuilder(cfg.HostAddress, cfg.TCPPort) + external, err := MultiAddressBuilder(cfg.HostAddress, cfg.TCPPort) if err != nil { log.WithError(err).Error("Unable to create external multiaddress") } else { @@ -90,17 +102,6 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) []libp2p.Opt return options } -func multiAddressBuilder(ipAddr string, port uint) (ma.Multiaddr, error) { - parsedIP := net.ParseIP(ipAddr) - if parsedIP.To4() == nil && parsedIP.To16() == nil { - return nil, errors.Errorf("invalid ip address provided: %s", ipAddr) - } - if parsedIP.To4() != nil { - return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port)) - } - return ma.NewMultiaddr(fmt.Sprintf("/ip6/%s/tcp/%d", ipAddr, port)) -} - func multiAddressBuilderWithID(ipAddr, protocol string, port uint, id peer.ID) (ma.Multiaddr, error) { parsedIP := net.ParseIP(ipAddr) if parsedIP.To4() == nil && parsedIP.To16() == nil { diff --git a/beacon-chain/p2p/options_test.go b/beacon-chain/p2p/options_test.go index b1f76d2c5..d26a91291 100644 --- a/beacon-chain/p2p/options_test.go +++ b/beacon-chain/p2p/options_test.go @@ -15,6 +15,7 @@ import ( mock "github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/v3/config/params" ecdsaprysm "github.com/prysmaticlabs/prysm/v3/crypto/ecdsa" + "github.com/prysmaticlabs/prysm/v3/network" "github.com/prysmaticlabs/prysm/v3/testing/assert" "github.com/prysmaticlabs/prysm/v3/testing/require" ) @@ -89,7 +90,7 @@ func TestDefaultMultiplexers(t *testing.T) { var err error svc.privKey, err = privKey(svc.cfg) assert.NoError(t, err) - ipAddr := ipAddr() + ipAddr := network.IPAddr() opts := svc.buildOptions(ipAddr, svc.privKey) err = cfg.Apply(append(opts, libp2p.FallbackDefaults)...) assert.NoError(t, err) diff --git a/beacon-chain/p2p/peers/BUILD.bazel b/beacon-chain/p2p/peers/BUILD.bazel index e18f0959f..8f7b22140 100644 --- a/beacon-chain/p2p/peers/BUILD.bazel +++ b/beacon-chain/p2p/peers/BUILD.bazel @@ -7,7 +7,10 @@ go_library( "status.go", ], importpath = "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers", - visibility = ["//beacon-chain:__subpackages__"], + visibility = [ + "//beacon-chain:__subpackages__", + "//cmd:__subpackages__", + ], deps = [ "//beacon-chain/p2p/peers/peerdata:go_default_library", "//beacon-chain/p2p/peers/scorers:go_default_library", diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index b43610e65..7e9aebf50 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -30,6 +30,7 @@ import ( "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers/scorers" "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v3/config/params" + prysmnetwork "github.com/prysmaticlabs/prysm/v3/network" "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1/metadata" "github.com/prysmaticlabs/prysm/v3/runtime" "github.com/prysmaticlabs/prysm/v3/time/slots" @@ -107,7 +108,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) { cfg.Discv5BootStrapAddr = dv5Nodes - ipAddr := ipAddr() + ipAddr := prysmnetwork.IPAddr() s.privKey, err = privKey(s.cfg) if err != nil { log.WithError(err).Error("Failed to generate p2p private key") @@ -201,7 +202,7 @@ func (s *Service) Start() { } if !s.cfg.NoDiscovery && !s.cfg.DisableDiscv5 { - ipAddr := ipAddr() + ipAddr := prysmnetwork.IPAddr() listener, err := s.startDiscoveryV5( ipAddr, s.privKey, @@ -224,7 +225,7 @@ func (s *Service) Start() { s.started = true if len(s.cfg.StaticPeers) > 0 { - addrs, err := peersFromStringAddrs(s.cfg.StaticPeers) + addrs, err := PeersFromStringAddrs(s.cfg.StaticPeers) if err != nil { log.WithError(err).Error("Could not connect to static peer") } diff --git a/beacon-chain/p2p/types/BUILD.bazel b/beacon-chain/p2p/types/BUILD.bazel index 9277d6f8e..822eb8a5e 100644 --- a/beacon-chain/p2p/types/BUILD.bazel +++ b/beacon-chain/p2p/types/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types", visibility = [ "//beacon-chain:__subpackages__", + "//cmd:__subpackages__", "//slasher/rpc:__pkg__", "//testing/util:__pkg__", "//validator/client:__pkg__", diff --git a/beacon-chain/p2p/utils.go b/beacon-chain/p2p/utils.go index 91ed9ff72..cec7222a2 100644 --- a/beacon-chain/p2p/utils.go +++ b/beacon-chain/p2p/utils.go @@ -19,7 +19,6 @@ import ( "github.com/prysmaticlabs/prysm/v3/consensus-types/wrapper" ecdsaprysm "github.com/prysmaticlabs/prysm/v3/crypto/ecdsa" "github.com/prysmaticlabs/prysm/v3/io/file" - "github.com/prysmaticlabs/prysm/v3/network" pb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1/metadata" "github.com/sirupsen/logrus" @@ -129,15 +128,6 @@ func metaDataFromConfig(cfg *Config) (metadata.Metadata, error) { return wrapper.WrappedMetadataV0(metaData), nil } -// Retrieves an external ipv4 address and converts into a libp2p formatted value. -func ipAddr() net.IP { - ip, err := network.ExternalIP() - if err != nil { - log.WithError(err).Fatal("Could not get IPv4 address") - } - return net.ParseIP(ip) -} - // Attempt to dial an address to verify its connectivity func verifyConnectivity(addr string, port uint, protocol string) { if addr != "" { diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 6e108dfcf..815108dba 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -48,6 +48,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/v3/beacon-chain/sync", visibility = [ "//beacon-chain:__subpackages__", + "//cmd:__subpackages__", "//testing:__subpackages__", ], deps = [ diff --git a/beacon-chain/sync/context.go b/beacon-chain/sync/context.go index 32c6abf55..47ef700a6 100644 --- a/beacon-chain/sync/context.go +++ b/beacon-chain/sync/context.go @@ -13,7 +13,7 @@ import ( const forkDigestLength = 4 // writes peer's current context for the expected payload to the stream. -func writeContextToStream(objCtx []byte, stream network.Stream, chain blockchain.ChainInfoFetcher) error { +func writeContextToStream(objCtx []byte, stream network.Stream, chain blockchain.ForkFetcher) error { // The rpc context for our v2 methods is the fork-digest of // the relevant payload. We write the associated fork-digest(context) // into the stream for the payload. @@ -34,7 +34,7 @@ func writeContextToStream(objCtx []byte, stream network.Stream, chain blockchain } // reads any attached context-bytes to the payload. -func readContextFromStream(stream network.Stream, chain blockchain.ChainInfoFetcher) ([]byte, error) { +func readContextFromStream(stream network.Stream, chain blockchain.ForkFetcher) ([]byte, error) { rpcCtx, err := rpcContext(stream, chain) if err != nil { return nil, err @@ -51,7 +51,7 @@ func readContextFromStream(stream network.Stream, chain blockchain.ChainInfoFetc } // retrieve expected context depending on rpc topic schema version. -func rpcContext(stream network.Stream, chain blockchain.ChainInfoFetcher) ([]byte, error) { +func rpcContext(stream network.Stream, chain blockchain.ForkFetcher) ([]byte, error) { _, _, version, err := p2p.TopicDeconstructor(string(stream.Protocol())) if err != nil { return nil, err diff --git a/beacon-chain/sync/rpc_chunked_response.go b/beacon-chain/sync/rpc_chunked_response.go index bbcdafbc8..c73d08805 100644 --- a/beacon-chain/sync/rpc_chunked_response.go +++ b/beacon-chain/sync/rpc_chunked_response.go @@ -64,7 +64,7 @@ func WriteBlockChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher // ReadChunkedBlock handles each response chunk that is sent by the // peer and converts it into a beacon block. -func ReadChunkedBlock(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P, isFirstChunk bool) (interfaces.SignedBeaconBlock, error) { +func ReadChunkedBlock(stream libp2pcore.Stream, chain blockchain.ForkFetcher, p2p p2p.EncodingProvider, isFirstChunk bool) (interfaces.SignedBeaconBlock, error) { // Handle deadlines differently for first chunk if isFirstChunk { return readFirstChunkedBlock(stream, chain, p2p) @@ -75,7 +75,7 @@ func ReadChunkedBlock(stream libp2pcore.Stream, chain blockchain.ChainInfoFetche // readFirstChunkedBlock reads the first chunked block and applies the appropriate deadlines to // it. -func readFirstChunkedBlock(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P) (interfaces.SignedBeaconBlock, error) { +func readFirstChunkedBlock(stream libp2pcore.Stream, chain blockchain.ForkFetcher, p2p p2p.EncodingProvider) (interfaces.SignedBeaconBlock, error) { code, errMsg, err := ReadStatusCode(stream, p2p.Encoding()) if err != nil { return nil, err @@ -97,7 +97,7 @@ func readFirstChunkedBlock(stream libp2pcore.Stream, chain blockchain.ChainInfoF // readResponseChunk reads the response from the stream and decodes it into the // provided message type. -func readResponseChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P) (interfaces.SignedBeaconBlock, error) { +func readResponseChunk(stream libp2pcore.Stream, chain blockchain.ForkFetcher, p2p p2p.EncodingProvider) (interfaces.SignedBeaconBlock, error) { SetStreamReadDeadline(stream, respTimeout) code, errMsg, err := readStatusCodeNoDeadline(stream, p2p.Encoding()) if err != nil { @@ -119,7 +119,7 @@ func readResponseChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetch return blk, err } -func extractBlockDataType(digest []byte, chain blockchain.ChainInfoFetcher) (interfaces.SignedBeaconBlock, error) { +func extractBlockDataType(digest []byte, chain blockchain.ForkFetcher) (interfaces.SignedBeaconBlock, error) { if len(digest) == 0 { bFunc, ok := types.BlockMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] if !ok { diff --git a/beacon-chain/sync/rpc_send_request.go b/beacon-chain/sync/rpc_send_request.go index 201bef1d3..8b80f49c8 100644 --- a/beacon-chain/sync/rpc_send_request.go +++ b/beacon-chain/sync/rpc_send_request.go @@ -25,7 +25,7 @@ type BeaconBlockProcessor func(block interfaces.SignedBeaconBlock) error // SendBeaconBlocksByRangeRequest sends BeaconBlocksByRange and returns fetched blocks, if any. func SendBeaconBlocksByRangeRequest( - ctx context.Context, chain blockchain.ChainInfoFetcher, p2pProvider p2p.P2P, pid peer.ID, + ctx context.Context, chain blockchain.ForkFetcher, p2pProvider p2p.SenderEncoder, pid peer.ID, req *pb.BeaconBlocksByRangeRequest, blockProcessor BeaconBlockProcessor, ) ([]interfaces.SignedBeaconBlock, error) { topic, err := p2p.TopicFromMessage(p2p.BeaconBlocksByRangeMessageName, slots.ToEpoch(chain.CurrentSlot())) diff --git a/cmd/prysmctl/BUILD.bazel b/cmd/prysmctl/BUILD.bazel index a9b560ecc..9629d53c7 100644 --- a/cmd/prysmctl/BUILD.bazel +++ b/cmd/prysmctl/BUILD.bazel @@ -1,5 +1,9 @@ load("@io_bazel_rules_go//go:def.bzl", "go_binary") load("@prysm//tools/go:def.bzl", "go_library") +load("@io_bazel_rules_docker//go:image.bzl", "go_image") +load("@io_bazel_rules_docker//container:container.bzl", "container_bundle", "container_image") +load("//tools:go_image.bzl", "go_image_alpine", "go_image_debug") +load("@io_bazel_rules_docker//contrib:push-all.bzl", "docker_push") go_library( name = "go_default_library", @@ -8,11 +12,103 @@ go_library( visibility = ["//visibility:private"], deps = [ "//cmd/prysmctl/checkpoint:go_default_library", + "//cmd/prysmctl/p2p:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_github_urfave_cli_v2//:go_default_library", ], ) +go_image( + name = "image", + base = select({ + "//tools:base_image_alpine": "//tools:alpine_cc_image", + "//tools:base_image_cc": "//tools:cc_image", + "//conditions:default": "//tools:cc_image", + }), + binary = ":prysmctl", + tags = ["manual"], + visibility = ["//cmd/prysmctl:__pkg__"], +) + +container_image( + name = "image_with_creation_time", + base = "image", + stamp = True, + tags = ["manual"], + visibility = ["//cmd/prysmctl:__pkg__"], +) + +container_bundle( + name = "image_bundle", + images = { + "gcr.io/prysmaticlabs/prysm/cmd/prysmctl:latest": ":image_with_creation_time", + "gcr.io/prysmaticlabs/prysm/cmd/prysmctl:{DOCKER_TAG}": ":image_with_creation_time", + "index.docker.io/prysmaticlabs/prysmctl:latest": ":image_with_creation_time", + "index.docker.io/prysmaticlabs/prysmctl:{DOCKER_TAG}": ":image_with_creation_time", + }, + tags = ["manual"], + visibility = ["//cmd/prysmctl:__pkg__"], +) + +go_image_debug( + name = "image_debug", + image = ":image", + tags = ["manual"], + visibility = ["//cmd/prysmctl:__pkg__"], +) + +container_bundle( + name = "image_bundle_debug", + images = { + "gcr.io/prysmaticlabs/prysm/cmd/prysmctl:latest-debug": ":image_debug", + "gcr.io/prysmaticlabs/prysm/cmd/prysmctl:{DOCKER_TAG}-debug": ":image_debug", + "index.docker.io/prysmaticlabs/prysmctl:latest-debug": ":image_debug", + "index.docker.io/prysmaticlabs/prysmctl:{DOCKER_TAG}-debug": ":image_debug", + }, + tags = ["manual"], + visibility = ["//cmd/prysmctl:__pkg__"], +) + +go_image_alpine( + name = "image_alpine", + image = ":image", + tags = ["manual"], + visibility = ["//cmd/prysmctl:__pkg__"], +) + +container_bundle( + name = "image_bundle_alpine", + images = { + "gcr.io/prysmaticlabs/prysm/cmd/prysmctl:latest-alpine": ":image_alpine", + "gcr.io/prysmaticlabs/prysm/cmd/prysmctl:{DOCKER_TAG}-alpine": ":image_alpine", + "index.docker.io/prysmaticlabs/prysmctl:latest-alpine": ":image_alpine", + "index.docker.io/prysmaticlabs/prysmctl:{DOCKER_TAG}-alpine": ":image_alpine", + }, + tags = ["manual"], + visibility = ["//cmd/prysmctl:__pkg__"], +) + +docker_push( + name = "push_images", + bundle = ":image_bundle", + tags = ["manual"], + visibility = ["//cmd/prysmctl:__pkg__"], +) + +docker_push( + name = "push_images_debug", + bundle = ":image_bundle_debug", + tags = ["manual"], + visibility = ["//cmd/prysmctl:__pkg__"], +) + +docker_push( + name = "push_images_alpine", + bundle = ":image_bundle_alpine", + tags = ["manual"], + visibility = ["//cmd/prysmctl:__pkg__"], +) + go_binary( name = "prysmctl", embed = [":go_default_library"], diff --git a/cmd/prysmctl/main.go b/cmd/prysmctl/main.go index 5ab7fdb8e..4e5a134a1 100644 --- a/cmd/prysmctl/main.go +++ b/cmd/prysmctl/main.go @@ -4,6 +4,7 @@ import ( "os" "github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/checkpoint" + "github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/p2p" log "github.com/sirupsen/logrus" "github.com/urfave/cli/v2" ) @@ -22,4 +23,5 @@ func main() { func init() { prysmctlCommands = append(prysmctlCommands, checkpoint.Commands...) + prysmctlCommands = append(prysmctlCommands, p2p.Commands...) } diff --git a/cmd/prysmctl/p2p/BUILD.bazel b/cmd/prysmctl/p2p/BUILD.bazel new file mode 100644 index 000000000..b51da93f6 --- /dev/null +++ b/cmd/prysmctl/p2p/BUILD.bazel @@ -0,0 +1,56 @@ +load("@prysm//tools/go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "client.go", + "handler.go", + "handshake.go", + "log.go", + "mock_chain.go", + "p2p.go", + "peers.go", + "request_blocks.go", + ], + importpath = "github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/p2p", + visibility = ["//visibility:public"], + deps = [ + "//beacon-chain/forkchoice:go_default_library", + "//beacon-chain/p2p:go_default_library", + "//beacon-chain/p2p/encoder:go_default_library", + "//beacon-chain/p2p/types:go_default_library", + "//beacon-chain/sync:go_default_library", + "//cmd:go_default_library", + "//config/params:go_default_library", + "//consensus-types/blocks:go_default_library", + "//consensus-types/primitives:go_default_library", + "//consensus-types/wrapper:go_default_library", + "//crypto/ecdsa:go_default_library", + "//encoding/bytesutil:go_default_library", + "//monitoring/tracing:go_default_library", + "//network:go_default_library", + "//network/forks:go_default_library", + "//proto/prysm/v1alpha1:go_default_library", + "//proto/prysm/v1alpha1/metadata:go_default_library", + "//runtime/version:go_default_library", + "//time/slots:go_default_library", + "@com_github_libp2p_go_libp2p//:go_default_library", + "@com_github_libp2p_go_libp2p//p2p/protocol/identify:go_default_library", + "@com_github_libp2p_go_libp2p//p2p/security/noise:go_default_library", + "@com_github_libp2p_go_libp2p//p2p/transport/tcp:go_default_library", + "@com_github_libp2p_go_libp2p_core//:go_default_library", + "@com_github_libp2p_go_libp2p_core//crypto:go_default_library", + "@com_github_libp2p_go_libp2p_core//host:go_default_library", + "@com_github_libp2p_go_libp2p_core//network:go_default_library", + "@com_github_libp2p_go_libp2p_core//peer:go_default_library", + "@com_github_libp2p_go_libp2p_core//protocol:go_default_library", + "@com_github_pkg_errors//:go_default_library", + "@com_github_prysmaticlabs_fastssz//:go_default_library", + "@com_github_prysmaticlabs_go_bitfield//:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + "@com_github_urfave_cli_v2//:go_default_library", + "@io_opencensus_go//trace:go_default_library", + "@org_golang_google_grpc//:go_default_library", + "@org_golang_google_protobuf//types/known/emptypb:go_default_library", + ], +) diff --git a/cmd/prysmctl/p2p/client.go b/cmd/prysmctl/p2p/client.go new file mode 100644 index 000000000..54ea6bb87 --- /dev/null +++ b/cmd/prysmctl/p2p/client.go @@ -0,0 +1,230 @@ +package p2p + +import ( + "context" + "crypto/ecdsa" + "crypto/rand" + "net" + "time" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/host" + corenet "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" + "github.com/libp2p/go-libp2p/p2p/security/noise" + "github.com/libp2p/go-libp2p/p2p/transport/tcp" + "github.com/pkg/errors" + ssz "github.com/prysmaticlabs/fastssz" + "github.com/prysmaticlabs/go-bitfield" + "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder" + "github.com/prysmaticlabs/prysm/v3/consensus-types/wrapper" + ecdsaprysm "github.com/prysmaticlabs/prysm/v3/crypto/ecdsa" + "github.com/prysmaticlabs/prysm/v3/encoding/bytesutil" + "github.com/prysmaticlabs/prysm/v3/monitoring/tracing" + "github.com/prysmaticlabs/prysm/v3/network" + "github.com/prysmaticlabs/prysm/v3/network/forks" + pb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1/metadata" + "github.com/prysmaticlabs/prysm/v3/runtime/version" + "github.com/prysmaticlabs/prysm/v3/time/slots" + "go.opencensus.io/trace" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" +) + +// A minimal client for peering with beacon nodes over libp2p and sending p2p RPC requests for data. +type client struct { + host host.Host + meta metadata.Metadata + beaconClient pb.BeaconChainClient + nodeClient pb.NodeClient +} + +func newClient(beaconEndpoints []string, clientPort uint) (*client, error) { + ipAdd := ipAddr() + priv, err := privKey() + if err != nil { + return nil, errors.Wrap(err, "could not set up p2p private key") + } + meta, err := readMetadata() + if err != nil { + return nil, errors.Wrap(err, "could not set up p2p metadata") + } + listen, err := p2p.MultiAddressBuilder(ipAdd.String(), clientPort) + if err != nil { + return nil, errors.Wrap(err, "could not set up listening multiaddr") + } + options := []libp2p.Option{ + privKeyOption(priv), + libp2p.ListenAddrs(listen), + libp2p.UserAgent(version.BuildData()), + libp2p.Transport(tcp.NewTCPTransport), + } + options = append(options, libp2p.Security(noise.ID, noise.New)) + options = append(options, libp2p.Ping(false)) + h, err := libp2p.New(options...) + if err != nil { + return nil, errors.Wrap(err, "could not start libp2p") + } + h.RemoveStreamHandler(identify.IDDelta) + if len(beaconEndpoints) == 0 { + return nil, errors.New("no specified beacon API endpoints") + } + conn, err := grpc.Dial(beaconEndpoints[0], grpc.WithInsecure()) + if err != nil { + return nil, err + } + beaconClient := pb.NewBeaconChainClient(conn) + nodeClient := pb.NewNodeClient(conn) + return &client{ + host: h, + meta: meta, + beaconClient: beaconClient, + nodeClient: nodeClient, + }, nil +} + +func (c *client) Close() { + if err := c.host.Close(); err != nil { + panic(err) + } +} + +func (c *client) Encoding() encoder.NetworkEncoding { + return &encoder.SszNetworkEncoder{} +} + +func (c *client) MetadataSeq() uint64 { + return c.meta.SequenceNumber() +} + +// Send a request to specific peer. The returned stream may be used for reading, +// but has been closed for writing. +// When done, the caller must Close() or Reset() on the stream. +func (c *client) Send( + ctx context.Context, + message interface{}, + baseTopic string, + pid peer.ID, +) (corenet.Stream, error) { + ctx, span := trace.StartSpan(ctx, "p2p.Send") + defer span.End() + topic := baseTopic + c.Encoding().ProtocolSuffix() + span.AddAttributes(trace.StringAttribute("topic", topic)) + + // Apply max dial timeout when opening a new stream. + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + + stream, err := c.host.NewStream(ctx, pid, protocol.ID(topic)) + if err != nil { + tracing.AnnotateError(span, err) + return nil, errors.Wrap(err, "could not open new stream") + } + // do not encode anything if we are sending a metadata request + if baseTopic != p2p.RPCMetaDataTopicV1 && baseTopic != p2p.RPCMetaDataTopicV2 { + castedMsg, ok := message.(ssz.Marshaler) + if !ok { + return nil, errors.Errorf("%T does not support the ssz marshaller interface", message) + } + if _, err := c.Encoding().EncodeWithMaxLength(stream, castedMsg); err != nil { + tracing.AnnotateError(span, err) + _err := stream.Reset() + _ = _err + return nil, err + } + } + // Close stream for writing. + if err := stream.CloseWrite(); err != nil { + tracing.AnnotateError(span, err) + _err := stream.Reset() + _ = _err + return nil, errors.Wrap(err, "could not close write") + } + + return stream, nil +} + +func (c *client) retrievePeerAddressesViaRPC(ctx context.Context, beaconEndpoints []string) ([]string, error) { + if len(beaconEndpoints) == 0 { + return nil, errors.New("no beacon RPC endpoints specified") + } + peers := make([]string, 0) + for i := 0; i < len(beaconEndpoints); i++ { + conn, err := grpc.Dial(beaconEndpoints[i], grpc.WithInsecure()) + if err != nil { + return nil, err + } + nodeClient := pb.NewNodeClient(conn) + hostData, err := nodeClient.GetHost(ctx, &emptypb.Empty{}) + if err != nil { + return nil, err + } + if len(hostData.Addresses) == 0 { + continue + } + peers = append(peers, hostData.Addresses[0]+"/p2p/"+hostData.PeerId) + } + return peers, nil +} + +func (c *client) initializeMockChainService(ctx context.Context) (*mockChain, error) { + genesisResp, err := c.nodeClient.GetGenesis(ctx, &emptypb.Empty{}) + if err != nil { + return nil, err + } + currEpoch := slots.ToEpoch(slots.SinceGenesis(genesisResp.GenesisTime.AsTime())) + currFork, err := forks.Fork(currEpoch) + if err != nil { + return nil, err + } + return &mockChain{ + genesisTime: genesisResp.GenesisTime.AsTime(), + currentFork: currFork, + genesisValsRoot: bytesutil.ToBytes32(genesisResp.GenesisValidatorsRoot), + }, nil +} + +// Retrieves an external ipv4 address and converts into a libp2p formatted value. +func ipAddr() net.IP { + ip, err := network.ExternalIP() + if err != nil { + panic(err) + } + return net.ParseIP(ip) +} + +// Determines a private key for p2p networking from the p2p service's +// configuration struct. If no key is found, it generates a new one. +func privKey() (*ecdsa.PrivateKey, error) { + priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) + if err != nil { + return nil, err + } + return ecdsaprysm.ConvertFromInterfacePrivKey(priv) +} + +// Adds a private key to the libp2p option if the option was provided. +// If the private key file is missing or cannot be read, or if the +// private key contents cannot be marshaled, an exception is thrown. +func privKeyOption(privkey *ecdsa.PrivateKey) libp2p.Option { + return func(cfg *libp2p.Config) error { + ifaceKey, err := ecdsaprysm.ConvertToInterfacePrivkey(privkey) + if err != nil { + return err + } + return cfg.Apply(libp2p.Identity(ifaceKey)) + } +} + +func readMetadata() (metadata.Metadata, error) { + metaData := &pb.MetaDataV1{ + SeqNumber: 0, + Attnets: bitfield.NewBitvector64(), + } + return wrapper.WrappedMetadataV1(metaData), nil +} diff --git a/cmd/prysmctl/p2p/handler.go b/cmd/prysmctl/p2p/handler.go new file mode 100644 index 000000000..857a682c0 --- /dev/null +++ b/cmd/prysmctl/p2p/handler.go @@ -0,0 +1,102 @@ +package p2p + +import ( + "context" + "reflect" + "runtime/debug" + "strings" + + libp2pcore "github.com/libp2p/go-libp2p-core" + corenet "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/protocol" + ssz "github.com/prysmaticlabs/fastssz" + "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p" + p2ptypes "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types" +) + +type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error + +// registerRPC for a given topic with an expected protobuf message type. +func (c *client) registerRPCHandler(baseTopic string, handle rpcHandler) { + topic := baseTopic + c.Encoding().ProtocolSuffix() + c.host.SetStreamHandler(protocol.ID(topic), func(stream corenet.Stream) { + defer func() { + if r := recover(); r != nil { + log.WithField("error", r).Error("Panic occurred") + log.Errorf("%s", debug.Stack()) + } + }() + // Resetting after closing is a no-op so defer a reset in case something goes wrong. + // It's up to the handler to Close the stream (send an EOF) if + // it successfully writes a response. We don't blindly call + // Close here because we may have only written a partial + // response. + defer func() { + _err := stream.Reset() + _ = _err + }() + + log.WithField("peer", stream.Conn().RemotePeer().Pretty()).WithField("topic", string(stream.Protocol())) + + base, ok := p2p.RPCTopicMappings[baseTopic] + if !ok { + log.Errorf("Could not retrieve base message for topic %s", baseTopic) + return + } + t := reflect.TypeOf(base) + // Copy Base + base = reflect.New(t) + + // since metadata requests do not have any data in the payload, we + // do not decode anything. + if baseTopic == p2p.RPCMetaDataTopicV1 || baseTopic == p2p.RPCMetaDataTopicV2 { + if err := handle(context.Background(), base, stream); err != nil { + if err != p2ptypes.ErrWrongForkDigestVersion { + log.WithError(err).Debug("Could not handle p2p RPC") + } + } + return + } + + // Given we have an input argument that can be pointer or the actual object, this gives us + // a way to check for its reflect.Kind and based on the result, we can decode + // accordingly. + if t.Kind() == reflect.Ptr { + msg, ok := reflect.New(t.Elem()).Interface().(ssz.Unmarshaler) + if !ok { + log.Errorf("message of %T does not support marshaller interface", msg) + return + } + if err := c.Encoding().DecodeWithMaxLength(stream, msg); err != nil { + // Debug logs for goodbye/status errors + if strings.Contains(topic, p2p.RPCGoodByeTopicV1) || strings.Contains(topic, p2p.RPCStatusTopicV1) { + log.WithError(err).Debug("Could not decode goodbye stream message") + return + } + log.WithError(err).Debug("Could not decode stream message") + return + } + if err := handle(context.Background(), msg, stream); err != nil { + if err != p2ptypes.ErrWrongForkDigestVersion { + log.WithError(err).Debug("Could not handle p2p RPC") + } + } + } else { + nTyp := reflect.New(t) + msg, ok := nTyp.Interface().(ssz.Unmarshaler) + if !ok { + log.Errorf("message of %T does not support marshaller interface", msg) + return + } + if err := c.Encoding().DecodeWithMaxLength(stream, msg); err != nil { + log.WithError(err).Debug("Could not decode stream message") + return + } + if err := handle(context.Background(), nTyp.Elem().Interface(), stream); err != nil { + if err != p2ptypes.ErrWrongForkDigestVersion { + log.WithError(err).Debug("Could not handle p2p RPC") + } + } + } + }) +} diff --git a/cmd/prysmctl/p2p/handshake.go b/cmd/prysmctl/p2p/handshake.go new file mode 100644 index 000000000..1c7ae1177 --- /dev/null +++ b/cmd/prysmctl/p2p/handshake.go @@ -0,0 +1,81 @@ +package p2p + +import ( + "context" + + libp2pcore "github.com/libp2p/go-libp2p-core" + "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p" + types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v3/network/forks" + pb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v3/time/slots" + "github.com/sirupsen/logrus" + "google.golang.org/protobuf/types/known/emptypb" +) + +var responseCodeSuccess = byte(0x00) + +func (c *client) registerHandshakeHandlers() { + c.registerRPCHandler(p2p.RPCPingTopicV1, c.pingHandler) + c.registerRPCHandler(p2p.RPCStatusTopicV1, c.statusRPCHandler) + c.registerRPCHandler(p2p.RPCGoodByeTopicV1, c.goodbyeHandler) +} + +// pingHandler reads the incoming ping rpc message from the peer. +func (c *client) pingHandler(_ context.Context, _ interface{}, stream libp2pcore.Stream) error { + defer closeStream(stream) + if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { + return err + } + sq := types.SSZUint64(c.MetadataSeq()) + if _, err := c.Encoding().EncodeWithMaxLength(stream, &sq); err != nil { + return err + } + return nil +} + +func (c *client) goodbyeHandler(_ context.Context, _ interface{}, _ libp2pcore.Stream) error { + return nil +} + +// statusRPCHandler reads the incoming Status RPC from the peer and responds with our version of a status message. +// This handler will disconnect any peer that does not match our fork version. +func (c *client) statusRPCHandler(ctx context.Context, _ interface{}, stream libp2pcore.Stream) error { + defer closeStream(stream) + chainHead, err := c.beaconClient.GetChainHead(ctx, &emptypb.Empty{}) + if err != nil { + return err + } + resp, err := c.nodeClient.GetGenesis(ctx, &emptypb.Empty{}) + if err != nil { + return err + } + digest, err := forks.CreateForkDigest(resp.GenesisTime.AsTime(), resp.GenesisValidatorsRoot) + if err != nil { + return err + } + kindOfFork, err := forks.Fork(slots.ToEpoch(chainHead.HeadSlot)) + if err != nil { + return err + } + log.WithFields(logrus.Fields{ + "genesisTime": resp.GenesisTime.AsTime(), + "forkDigest": digest, + "currentFork": kindOfFork.CurrentVersion, + "previousFork": kindOfFork.PreviousVersion, + }).Info("Responding to status RPC handler") + status := &pb.Status{ + ForkDigest: digest[:], + FinalizedRoot: chainHead.FinalizedBlockRoot, + FinalizedEpoch: chainHead.FinalizedEpoch, + HeadRoot: chainHead.HeadBlockRoot, + HeadSlot: chainHead.HeadSlot, + } + + if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { + log.WithError(err).Debug("Could not write to stream") + return err + } + _, err = c.Encoding().EncodeWithMaxLength(stream, status) + return err +} diff --git a/cmd/prysmctl/p2p/log.go b/cmd/prysmctl/p2p/log.go new file mode 100644 index 000000000..4066b4477 --- /dev/null +++ b/cmd/prysmctl/p2p/log.go @@ -0,0 +1,5 @@ +package p2p + +import "github.com/sirupsen/logrus" + +var log = logrus.WithField("prefix", "prysmctl-p2p") diff --git a/cmd/prysmctl/p2p/mock_chain.go b/cmd/prysmctl/p2p/mock_chain.go new file mode 100644 index 000000000..ae10c2235 --- /dev/null +++ b/cmd/prysmctl/p2p/mock_chain.go @@ -0,0 +1,36 @@ +package p2p + +import ( + "time" + + "github.com/prysmaticlabs/prysm/v3/beacon-chain/forkchoice" + types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v3/time/slots" +) + +type mockChain struct { + currentFork *ethpb.Fork + genesisValsRoot [32]byte + genesisTime time.Time +} + +func (m *mockChain) ForkChoicer() forkchoice.ForkChoicer { + return nil +} + +func (m *mockChain) CurrentFork() *ethpb.Fork { + return m.currentFork +} + +func (m *mockChain) GenesisValidatorsRoot() [32]byte { + return m.genesisValsRoot +} + +func (m *mockChain) GenesisTime() time.Time { + return m.genesisTime +} + +func (m *mockChain) CurrentSlot() types.Slot { + return slots.SinceGenesis(m.genesisTime) +} diff --git a/cmd/prysmctl/p2p/p2p.go b/cmd/prysmctl/p2p/p2p.go new file mode 100644 index 000000000..0eb66bd27 --- /dev/null +++ b/cmd/prysmctl/p2p/p2p.go @@ -0,0 +1,17 @@ +package p2p + +import "github.com/urfave/cli/v2" + +var Commands = []*cli.Command{ + { + Name: "p2p", + Usage: "commands for interacting with beacon nodes via p2p", + Subcommands: []*cli.Command{ + { + Name: "send", + Usage: "commands for sending p2p rpc requests to beacon nodes", + Subcommands: []*cli.Command{requestBlocksCmd}, + }, + }, + }, +} diff --git a/cmd/prysmctl/p2p/peers.go b/cmd/prysmctl/p2p/peers.go new file mode 100644 index 000000000..2c4e56dda --- /dev/null +++ b/cmd/prysmctl/p2p/peers.go @@ -0,0 +1,28 @@ +package p2p + +import ( + "context" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p" +) + +func (c *client) connectToPeers(ctx context.Context, peerMultiaddrs ...string) error { + peers, err := p2p.PeersFromStringAddrs(peerMultiaddrs) + if err != nil { + return err + } + addrInfos, err := peer.AddrInfosFromP2pAddrs(peers...) + if err != nil { + panic(err) + } + for _, info := range addrInfos { + if info.ID == c.host.ID() { + continue + } + if err := c.host.Connect(ctx, info); err != nil { + return err + } + } + return nil +} diff --git a/cmd/prysmctl/p2p/request_blocks.go b/cmd/prysmctl/p2p/request_blocks.go new file mode 100644 index 000000000..b3eb4efae --- /dev/null +++ b/cmd/prysmctl/p2p/request_blocks.go @@ -0,0 +1,221 @@ +package p2p + +import ( + "context" + "strings" + "time" + + libp2pcore "github.com/libp2p/go-libp2p-core" + corenet "github.com/libp2p/go-libp2p-core/network" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p" + p2ptypes "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types" + "github.com/prysmaticlabs/prysm/v3/beacon-chain/sync" + "github.com/prysmaticlabs/prysm/v3/cmd" + "github.com/prysmaticlabs/prysm/v3/config/params" + consensusblocks "github.com/prysmaticlabs/prysm/v3/consensus-types/blocks" + types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + pb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v3/time/slots" + "github.com/sirupsen/logrus" + "github.com/urfave/cli/v2" + "google.golang.org/protobuf/types/known/emptypb" +) + +var requestBlocksFlags = struct { + Peers string + ClientPort uint + APIEndpoints string + StartSlot uint64 + Count uint64 + Step uint64 +}{} + +var requestBlocksCmd = &cli.Command{ + Name: "beacon-blocks-by-range", + Usage: "Request a range of blocks from a beacon node via a p2p connection", + Action: cliActionRequestBlocks, + Flags: []cli.Flag{ + cmd.ChainConfigFileFlag, + &cli.StringFlag{ + Name: "peer-multiaddrs", + Usage: "comma-separated, peer multiaddr(s) to connect to for p2p requests", + Destination: &requestBlocksFlags.Peers, + Value: "", + }, + &cli.UintFlag{ + Name: "client-port", + Usage: "port to use for the client as a libp2p host", + Destination: &requestBlocksFlags.ClientPort, + Value: 13001, + }, + &cli.StringFlag{ + Name: "prysm-api-endpoints", + Usage: "comma-separated, gRPC API endpoint(s) for Prysm beacon node(s)", + Destination: &requestBlocksFlags.APIEndpoints, + Value: "localhost:4000", + }, + &cli.Uint64Flag{ + Name: "start-slot", + Usage: "start slot for blocks by range request. If unset, will use start_slot(current_epoch-1)", + Destination: &requestBlocksFlags.StartSlot, + Value: 0, + }, + &cli.Uint64Flag{ + Name: "count", + Usage: "number of blocks to request, (default 32)", + Destination: &requestBlocksFlags.Count, + Value: 32, + }, + &cli.Uint64Flag{ + Name: "step", + Usage: "number of steps of blocks in the range request, (default 1)", + Destination: &requestBlocksFlags.Step, + Value: 1, + }, + }, +} + +func cliActionRequestBlocks(cliCtx *cli.Context) error { + if cliCtx.IsSet(cmd.ChainConfigFileFlag.Name) { + chainConfigFileName := cliCtx.String(cmd.ChainConfigFileFlag.Name) + if err := params.LoadChainConfigFile(chainConfigFileName, nil); err != nil { + return err + } + } + p2ptypes.InitializeDataMaps() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + allAPIEndpoints := make([]string, 0) + if requestBlocksFlags.APIEndpoints != "" { + allAPIEndpoints = strings.Split(requestBlocksFlags.APIEndpoints, ",") + } + var err error + c, err := newClient(allAPIEndpoints, requestBlocksFlags.ClientPort) + if err != nil { + return err + } + defer c.Close() + + allPeers := make([]string, 0) + if requestBlocksFlags.Peers != "" { + allPeers = strings.Split(requestBlocksFlags.Peers, ",") + } + if len(allPeers) == 0 { + allPeers, err = c.retrievePeerAddressesViaRPC(ctx, allAPIEndpoints) + if err != nil { + return err + } + } + if len(allPeers) == 0 { + return errors.New("no peers found") + } + log.WithField("peers", allPeers).Info("List of peers") + chain, err := c.initializeMockChainService(ctx) + if err != nil { + return err + } + c.registerHandshakeHandlers() + + c.registerRPCHandler(p2p.RPCBlocksByRangeTopicV1, func( + ctx context.Context, i interface{}, stream libp2pcore.Stream, + ) error { + return nil + }) + c.registerRPCHandler(p2p.RPCBlocksByRangeTopicV2, func( + ctx context.Context, i interface{}, stream libp2pcore.Stream, + ) error { + return nil + }) + + if err := c.connectToPeers(ctx, allPeers...); err != nil { + return err + } + + startSlot := types.Slot(requestBlocksFlags.StartSlot) + var headSlot *types.Slot + if startSlot == 0 { + headResp, err := c.beaconClient.GetChainHead(ctx, &emptypb.Empty{}) + if err != nil { + return err + } + startSlot, err = slots.EpochStart(headResp.HeadEpoch.Sub(1)) + if err != nil { + return err + } + headSlot = &headResp.HeadSlot + } + + // Submit requests. + for _, pr := range c.host.Peerstore().Peers() { + if pr.String() == c.host.ID().String() { + continue + } + req := &pb.BeaconBlocksByRangeRequest{ + StartSlot: startSlot, + Count: requestBlocksFlags.Count, + Step: requestBlocksFlags.Step, + } + fields := logrus.Fields{ + "startSlot": startSlot, + "count": requestBlocksFlags.Count, + "step": requestBlocksFlags.Step, + "peer": pr.String(), + } + if headSlot != nil { + fields["headSlot"] = *headSlot + } + log.WithFields(fields).Info("Sending blocks by range p2p request to peer") + start := time.Now() + blocks, err := sync.SendBeaconBlocksByRangeRequest( + ctx, + chain, + c, + pr, + req, + nil, /* no extra block processing */ + ) + if err != nil { + return err + } + end := time.Since(start) + totalExecutionBlocks := 0 + for _, blk := range blocks { + exec, err := blk.Block().Body().Execution() + switch { + case errors.Is(err, consensusblocks.ErrUnsupportedGetter): + continue + case err != nil: + log.WithError(err).Error("Could not read execution data from block body") + continue + default: + } + _, err = exec.Transactions() + switch { + case errors.Is(err, consensusblocks.ErrUnsupportedGetter): + continue + case err != nil: + log.WithError(err).Error("Could not read transactions block execution payload") + continue + default: + } + totalExecutionBlocks++ + } + log.WithFields(logrus.Fields{ + "numBlocks": len(blocks), + "peer": pr.String(), + "timeFromSendingToProcessingResponse": end, + "totalBlocksWithExecutionPayloads": totalExecutionBlocks, + }).Info("Received blocks from peer") + + } + return nil +} + +func closeStream(stream corenet.Stream) { + if err := stream.Close(); err != nil { + log.Println(err) + } +} diff --git a/network/external_ip.go b/network/external_ip.go index 3bfb98549..dfc52c3c9 100644 --- a/network/external_ip.go +++ b/network/external_ip.go @@ -6,6 +6,15 @@ import ( "sort" ) +// IPAddr gets the external ipv4 address and converts into a libp2p formatted value. +func IPAddr() net.IP { + ip, err := ExternalIP() + if err != nil { + panic(err) + } + return net.ParseIP(ip) +} + // ExternalIPv4 returns the first IPv4 available. func ExternalIPv4() (string, error) { ips, err := ipAddrs()