diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 773a828b3..0c91c0dc9 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -31,6 +31,7 @@ go_library( "@com_github_ethereum_go_ethereum//crypto:go_default_library", "@com_github_ethereum_go_ethereum//p2p/discv5:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_ipfs_go_ipfs_addr//:go_default_library", "@com_github_libp2p_go_libp2p//:go_default_library", "@com_github_libp2p_go_libp2p_core//crypto:go_default_library", "@com_github_libp2p_go_libp2p_core//host:go_default_library", @@ -64,7 +65,6 @@ go_test( "@com_github_gogo_protobuf//proto:go_default_library", "@com_github_libp2p_go_libp2p//:go_default_library", "@com_github_libp2p_go_libp2p_core//host:go_default_library", - "@com_github_libp2p_go_libp2p_core//peer:go_default_library", "@com_github_libp2p_go_libp2p_pubsub//:go_default_library", "@com_github_multiformats_go_multiaddr//:go_default_library", "@com_github_sirupsen_logrus//hooks/test:go_default_library", diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 4c237fa3e..ce4acc859 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -7,8 +7,10 @@ import ( "time" "github.com/ethereum/go-ethereum/p2p/discv5" + iaddr "github.com/ipfs/go-ipfs-addr" "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" + "github.com/pkg/errors" ) // Listener defines the discovery V5 network interface that is used @@ -83,3 +85,23 @@ func convertToMultiAddr(nodes []*discv5.Node) []ma.Multiaddr { } return multiAddrs } + +func manyMultiAddrsFromString(addrs []string) ([]ma.Multiaddr, error) { + var allAddrs []ma.Multiaddr + for _, stringAddr := range addrs { + addr, err := multiAddrFromString(stringAddr) + if err != nil { + return nil, errors.Wrapf(err, "Could not get multiaddr from string") + } + allAddrs = append(allAddrs, addr) + } + return allAddrs, nil +} + +func multiAddrFromString(address string) (ma.Multiaddr, error) { + addr, err := iaddr.ParseString(address) + if err != nil { + return nil, err + } + return addr.Multiaddr(), nil +} diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index e8ea78b6a..c1028d618 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -3,12 +3,14 @@ package p2p import ( "crypto/ecdsa" "crypto/rand" + "fmt" "net" "testing" "time" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/discv5" + "github.com/libp2p/go-libp2p-core/host" "github.com/prysmaticlabs/prysm/shared/iputils" "github.com/prysmaticlabs/prysm/shared/testutil" logTest "github.com/sirupsen/logrus/hooks/test" @@ -117,3 +119,40 @@ func TestMultiAddrConversion_OK(t *testing.T) { testutil.AssertLogsDoNotContain(t, hook, "Invalid port, the tcp port of the node is a reserved port") testutil.AssertLogsDoNotContain(t, hook, "Could not get multiaddr") } + +func TestStaticPeering_PeersAreAdded(t *testing.T) { + cfg := &Config{} + port := 3000 + var staticPeers []string + var hosts []host.Host + // setup other nodes + for i := 1; i <= 5; i++ { + h, _, ipaddr := createHost(t, port+i) + staticPeers = append(staticPeers, fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ipaddr, port+i, h.ID())) + hosts = append(hosts, h) + } + + defer func() { + for _, h := range hosts { + _ = h.Close() + } + }() + + cfg.Port = 4000 + cfg.UDPPort = 4000 + cfg.StaticPeers = staticPeers + + s, err := NewService(cfg) + if err != nil { + t.Fatal(err) + } + + s.Start() + s.dv5Listener = &mockListener{} + defer s.Stop() + + peers := s.host.Network().Peers() + if len(peers) != 5 { + t.Errorf("Not all peers added to peerstore, wanted %d but got %d", 5, len(peers)) + } +} diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index e2125b761..8f9a45930 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -80,6 +80,14 @@ func (s *Service) Start() { go s.listenForNewNodes() } + if len(s.cfg.StaticPeers) > 0 { + addrs, err := manyMultiAddrsFromString(s.cfg.StaticPeers) + if err != nil { + log.Errorf("Could not connect to static peer: %v", err) + } + s.connectWithAllPeers(addrs) + } + // TODO(3147): Add gossip sub options gs, err := pubsub.NewGossipSub(s.ctx, s.host) if err != nil { @@ -91,6 +99,9 @@ func (s *Service) Start() { s.pubsub = gs s.started = true + + multiAddrs := s.host.Network().ListenAddresses() + log.Infof("Node currently listening at %s", multiAddrs[1].String()) } // Stop the p2p service and terminate all peer connections. diff --git a/beacon-chain/p2p/service_test.go b/beacon-chain/p2p/service_test.go index 8a40f4a1b..b1ebd4995 100644 --- a/beacon-chain/p2p/service_test.go +++ b/beacon-chain/p2p/service_test.go @@ -2,6 +2,7 @@ package p2p import ( "context" + "crypto/ecdsa" "fmt" "net" "testing" @@ -10,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multiaddr" "github.com/prysmaticlabs/prysm/shared/testutil" logTest "github.com/sirupsen/logrus/hooks/test" @@ -51,13 +51,19 @@ func (m *mockListener) SearchTopic(discv5.Topic, <-chan time.Duration, chan<- *d } func createPeer(t *testing.T, cfg *Config, port int) (Listener, host.Host) { + h, pkey, ipAddr := createHost(t, port) + cfg.UDPPort = uint(port) + cfg.Port = uint(port) + listener, err := startDiscoveryV5(ipAddr, pkey, cfg) + if err != nil { + t.Errorf("Could not start discovery for node: %v", err) + } + return listener, h +} + +func createHost(t *testing.T, port int) (host.Host, *ecdsa.PrivateKey, net.IP) { ipAddr, pkey := createAddrAndPrivKey(t) ipAddr = net.ParseIP("127.0.0.1") - convertedKey := convertToInterfacePrivkey(pkey) - _, err := peer.IDFromPrivateKey(convertedKey) - if err != nil { - t.Fatal(err) - } listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port)) if err != nil { t.Fatalf("Failed to p2p listen: %v", err) @@ -66,13 +72,7 @@ func createPeer(t *testing.T, cfg *Config, port int) (Listener, host.Host) { if err != nil { t.Fatal(err) } - cfg.UDPPort = uint(port) - cfg.Port = uint(port) - listener, err := startDiscoveryV5(ipAddr, pkey, cfg) - if err != nil { - t.Errorf("Could not start discovery for node: %v", err) - } - return listener, h + return h, pkey, ipAddr } func TestService_Stop_SetsStartedToFalse(t *testing.T) { @@ -126,12 +126,21 @@ func TestListenForNewNodes(t *testing.T) { BootstrapNodeAddr: bootNode.String(), } var listeners []*discv5.Network + var hosts []host.Host // setup other nodes for i := 1; i <= 5; i++ { - listener, _ := createPeer(t, cfg, port+i) + listener, h := createPeer(t, cfg, port+i) listeners = append(listeners, listener.(*discv5.Network)) + hosts = append(hosts, h) } + // close peers upon exit of test + defer func() { + for _, h := range hosts { + _ = h.Close() + } + }() + cfg.Port = 4000 cfg.UDPPort = 4000