From 3cb32c379220a5aa62932fd0d59d474d57b0a07e Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Wed, 21 Aug 2019 11:38:30 +0530 Subject: [PATCH] Implement Discv5 in Prysm (#3211) * add discovery * gaz * add build options * add udpPort * add more changes * refactor private key * added discovery loop * add ttl * add ttl * use ip type instead of string * tests pass * gaz and new test file * add test * add more tests * add one more test * adding multiAddr tests * adding new protocol , listener * fix keys * more fixes * more changes dialing peers works now * gaz * add more changes * add more changes * gaz * add new test helpers * new test * fixed all tests * gaz * reduce sleep * lint * new changes * change formats * fix all this stuff * remove discv5 protocol * remove protocol * remove port condition,too restrictive * preston's feedback * preston's feedback * close all peers * gaz * remove unused func * Update beacon-chain/p2p/service.go Co-Authored-By: Preston Van Loon * remove build options * refactor tests --- beacon-chain/p2p/BUILD.bazel | 18 +++++ beacon-chain/p2p/config.go | 1 + beacon-chain/p2p/discovery.go | 85 ++++++++++++++++++++ beacon-chain/p2p/discovery_test.go | 119 ++++++++++++++++++++++++++++ beacon-chain/p2p/options.go | 42 ++++++++++ beacon-chain/p2p/options_test.go | 42 ++++++++++ beacon-chain/p2p/service.go | 80 ++++++++++++++++--- beacon-chain/p2p/service_test.go | 120 ++++++++++++++++++++++++++++- beacon-chain/p2p/testing/p2p.go | 11 ++- beacon-chain/p2p/utils.go | 65 ++++++++++++++++ 10 files changed, 566 insertions(+), 17 deletions(-) create mode 100644 beacon-chain/p2p/discovery.go create mode 100644 beacon-chain/p2p/discovery_test.go create mode 100644 beacon-chain/p2p/options.go create mode 100644 beacon-chain/p2p/options_test.go create mode 100644 beacon-chain/p2p/utils.go diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 32ed08469..773a828b3 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -6,13 +6,16 @@ go_library( "broadcaster.go", "config.go", "deprecated.go", + "discovery.go", "doc.go", "gossip_topic_mappings.go", "handshake.go", "interfaces.go", "log.go", + "options.go", "sender.go", "service.go", + "utils.go", ], importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p", visibility = ["//beacon-chain:__subpackages__"], @@ -23,13 +26,19 @@ go_library( "//shared:go_default_library", "//shared/deprecated-p2p:go_default_library", "//shared/event:go_default_library", + "//shared/iputils:go_default_library", + "@com_github_btcsuite_btcd//btcec:go_default_library", + "@com_github_ethereum_go_ethereum//crypto:go_default_library", + "@com_github_ethereum_go_ethereum//p2p/discv5:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", "@com_github_libp2p_go_libp2p//:go_default_library", + "@com_github_libp2p_go_libp2p_core//crypto:go_default_library", "@com_github_libp2p_go_libp2p_core//host:go_default_library", "@com_github_libp2p_go_libp2p_core//network:go_default_library", "@com_github_libp2p_go_libp2p_core//peer:go_default_library", "@com_github_libp2p_go_libp2p_core//protocol:go_default_library", "@com_github_libp2p_go_libp2p_pubsub//:go_default_library", + "@com_github_multiformats_go_multiaddr//:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", ], @@ -39,6 +48,8 @@ go_test( name = "go_default_test", srcs = [ "broadcaster_test.go", + "discovery_test.go", + "options_test.go", "parameter_test.go", "service_test.go", ], @@ -46,9 +57,16 @@ go_test( deps = [ "//beacon-chain/p2p/testing:go_default_library", "//proto/testing:go_default_library", + "//shared/iputils:go_default_library", "//shared/testutil:go_default_library", + "@com_github_ethereum_go_ethereum//crypto:go_default_library", + "@com_github_ethereum_go_ethereum//p2p/discv5:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_libp2p_go_libp2p//:go_default_library", + "@com_github_libp2p_go_libp2p_core//host:go_default_library", + "@com_github_libp2p_go_libp2p_core//peer:go_default_library", "@com_github_libp2p_go_libp2p_pubsub//:go_default_library", + "@com_github_multiformats_go_multiaddr//:go_default_library", "@com_github_sirupsen_logrus//hooks/test:go_default_library", ], ) diff --git a/beacon-chain/p2p/config.go b/beacon-chain/p2p/config.go index a15898b78..70b5d0f97 100644 --- a/beacon-chain/p2p/config.go +++ b/beacon-chain/p2p/config.go @@ -10,6 +10,7 @@ type Config struct { HostAddress string PrivateKey string Port uint + UDPPort uint MaxPeers uint WhitelistCIDR string EnableUPnP bool diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go new file mode 100644 index 000000000..4c237fa3e --- /dev/null +++ b/beacon-chain/p2p/discovery.go @@ -0,0 +1,85 @@ +package p2p + +import ( + "crypto/ecdsa" + "fmt" + "net" + "time" + + "github.com/ethereum/go-ethereum/p2p/discv5" + "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" +) + +// Listener defines the discovery V5 network interface that is used +// to communicate with other peers. +type Listener interface { + Self() *discv5.Node + Close() + Lookup(discv5.NodeID) []*discv5.Node + ReadRandomNodes([]*discv5.Node) int + SetFallbackNodes([]*discv5.Node) error + Resolve(discv5.NodeID) *discv5.Node + RegisterTopic(discv5.Topic, <-chan struct{}) + SearchTopic(discv5.Topic, <-chan time.Duration, chan<- *discv5.Node, chan<- bool) +} + +func createListener(ipAddr net.IP, port int, privKey *ecdsa.PrivateKey) *discv5.Network { + udpAddr := &net.UDPAddr{ + IP: ipAddr, + Port: port, + } + conn, err := net.ListenUDP("udp4", udpAddr) + if err != nil { + log.Fatal(err) + } + + network, err := discv5.ListenUDP(privKey, conn, "", nil) + if err != nil { + log.Fatal(err) + } + return network +} + +func startDiscoveryV5(addr net.IP, privKey *ecdsa.PrivateKey, cfg *Config) (*discv5.Network, error) { + listener := createListener(addr, int(cfg.UDPPort), privKey) + bootNode, err := discv5.ParseNode(cfg.BootstrapNodeAddr) + if err != nil { + return nil, err + } + if err := listener.SetFallbackNodes([]*discv5.Node{bootNode}); err != nil { + return nil, err + } + node := listener.Self() + log.Infof("Started Discovery: %s", node.String()) + return listener, nil +} + +func convertToMultiAddr(nodes []*discv5.Node) []ma.Multiaddr { + var multiAddrs []ma.Multiaddr + for _, node := range nodes { + ip4 := node.IP.To4() + if ip4 == nil { + log.Error("Node doesn't have an ip4 address") + continue + } + pubkey, err := node.ID.Pubkey() + if err != nil { + log.Errorf("Could not get pubkey from node ID: %v", err) + continue + } + assertedKey := convertToInterfacePubkey(pubkey) + id, err := peer.IDFromPublicKey(assertedKey) + if err != nil { + log.Errorf("Could not get peer id: %v", err) + } + multiAddrString := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ip4.String(), node.TCP, id) + multiAddr, err := ma.NewMultiaddr(multiAddrString) + if err != nil { + log.Errorf("Could not get multiaddr:%v", err) + continue + } + multiAddrs = append(multiAddrs, multiAddr) + } + return multiAddrs +} diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go new file mode 100644 index 000000000..e8ea78b6a --- /dev/null +++ b/beacon-chain/p2p/discovery_test.go @@ -0,0 +1,119 @@ +package p2p + +import ( + "crypto/ecdsa" + "crypto/rand" + "net" + "testing" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p/discv5" + "github.com/prysmaticlabs/prysm/shared/iputils" + "github.com/prysmaticlabs/prysm/shared/testutil" + logTest "github.com/sirupsen/logrus/hooks/test" +) + +func createAddrAndPrivKey(t *testing.T) (net.IP, *ecdsa.PrivateKey) { + ip, err := iputils.ExternalIPv4() + if err != nil { + t.Fatalf("Could not get ip: %v", err) + } + ipAddr := net.ParseIP(ip) + pkey, err := privKey(&Config{}) + if err != nil { + t.Fatalf("Could not get private key: %v", err) + } + return ipAddr, pkey +} + +func TestCreateListener(t *testing.T) { + port := 1024 + ipAddr, pkey := createAddrAndPrivKey(t) + listener := createListener(ipAddr, port, pkey) + defer listener.Close() + + if !listener.Self().IP.Equal(ipAddr) { + t.Errorf("Ip address is not the expected type, wanted %s but got %s", ipAddr.String(), listener.Self().IP.String()) + } + + if port != int(listener.Self().UDP) { + t.Errorf("In correct port number, wanted %d but got %d", port, listener.Self().UDP) + } + pubkey, err := listener.Self().ID.Pubkey() + if err != nil { + t.Error(err) + } + XisSame := pkey.PublicKey.X.Cmp(pubkey.X) == 0 + YisSame := pkey.PublicKey.Y.Cmp(pubkey.Y) == 0 + + if !(XisSame && YisSame) { + t.Error("Pubkey is different from what was used to create the listener") + } +} + +func TestStartDiscV5_DiscoverAllPeers(t *testing.T) { + port := 2000 + ipAddr, pkey := createAddrAndPrivKey(t) + bootListener := createListener(ipAddr, port, pkey) + defer bootListener.Close() + + bootNode := bootListener.Self() + + cfg := &Config{ + BootstrapNodeAddr: bootNode.String(), + } + + var listeners []*discv5.Network + for i := 1; i <= 10; i++ { + port = 2000 + i + cfg.UDPPort = uint(port) + ipAddr, pkey := createAddrAndPrivKey(t) + listener, err := startDiscoveryV5(ipAddr, pkey, cfg) + if err != nil { + t.Errorf("Could not start discovery for node: %v", err) + } + listeners = append(listeners, listener) + } + + // Wait for the nodes to have their local routing tables to be populated with the other nodes + time.Sleep(100 * time.Millisecond) + + lastListener := listeners[len(listeners)-1] + nodes := lastListener.Lookup(bootNode.ID) + if len(nodes) != 11 { + t.Errorf("The node's local table doesn't have the expected number of nodes. "+ + "Expected %d but got %d", 11, len(nodes)) + } + + // Close all ports + for _, listener := range listeners { + listener.Close() + } +} + +func TestMultiAddrsConversion_InvalidIPAddr(t *testing.T) { + hook := logTest.NewGlobal() + ipAddr := net.IPv6zero + pkey, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader) + if err != nil { + t.Fatalf("Could not generate key %v", err) + } + nodeID := discv5.PubkeyID(&pkey.PublicKey) + node := discv5.NewNode(nodeID, ipAddr, 0, 0) + _ = convertToMultiAddr([]*discv5.Node{node}) + + testutil.AssertLogsContain(t, hook, "Node doesn't have an ip4 address") +} + +func TestMultiAddrConversion_OK(t *testing.T) { + hook := logTest.NewGlobal() + port := 1024 + ipAddr, pkey := createAddrAndPrivKey(t) + listener := createListener(ipAddr, port, pkey) + + _ = convertToMultiAddr([]*discv5.Node{listener.Self()}) + testutil.AssertLogsDoNotContain(t, hook, "Node doesn't have an ip4 address") + testutil.AssertLogsDoNotContain(t, hook, "Invalid port, the tcp port of the node is a reserved port") + testutil.AssertLogsDoNotContain(t, hook, "Could not get multiaddr") +} diff --git a/beacon-chain/p2p/options.go b/beacon-chain/p2p/options.go new file mode 100644 index 000000000..ffc8aef2c --- /dev/null +++ b/beacon-chain/p2p/options.go @@ -0,0 +1,42 @@ +package p2p + +import ( + "crypto/ecdsa" + "fmt" + "net" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" +) + +// buildOptions for the libp2p host. +func buildOptions(cfg *Config, ip net.IP, priKey *ecdsa.PrivateKey) []libp2p.Option { + listen, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ip, cfg.Port)) + if err != nil { + log.Fatalf("Failed to p2p listen: %v", err) + } + options := []libp2p.Option{ + privKeyOption(priKey), + libp2p.ListenAddrs(listen), + } + if cfg.EnableUPnP { + options = append(options, libp2p.NATPortMap()) //Allow to use UPnP + } + return options +} + +// Adds a private key to the libp2p option if the option was provided. +// If the private key file is missing or cannot be read, or if the +// private key contents cannot be marshaled, an exception is thrown. +func privKeyOption(privkey *ecdsa.PrivateKey) libp2p.Option { + return func(cfg *libp2p.Config) error { + convertedKey := convertToInterfacePrivkey(privkey) + id, err := peer.IDFromPrivateKey(convertedKey) + if err != nil { + return err + } + log.WithField("peer id", id.Pretty()).Info("Private key generated. Announcing peer id") + return cfg.Apply(libp2p.Identity(convertedKey)) + } +} diff --git a/beacon-chain/p2p/options_test.go b/beacon-chain/p2p/options_test.go new file mode 100644 index 000000000..f4e6fec1f --- /dev/null +++ b/beacon-chain/p2p/options_test.go @@ -0,0 +1,42 @@ +package p2p + +import ( + "crypto/ecdsa" + "crypto/rand" + "encoding/hex" + "io/ioutil" + "os" + "testing" + + curve "github.com/ethereum/go-ethereum/crypto" + "github.com/prysmaticlabs/prysm/shared/testutil" +) + +func TestPrivateKeyLoading(t *testing.T) { + file, err := ioutil.TempFile(testutil.TempDir(), "key") + if err != nil { + log.Fatal(err) + } + defer os.Remove(file.Name()) + key, err := ecdsa.GenerateKey(curve.S256(), rand.Reader) + if err != nil { + t.Fatalf("Could not generate key: %v", err) + } + keyStr := hex.EncodeToString(curve.FromECDSA(key)) + err = ioutil.WriteFile(file.Name(), []byte(keyStr), 0600) + if err != nil { + t.Fatalf("Could not write key to file: %v", err) + } + log.WithField("file", file.Name()).WithField("key", keyStr).Info("Wrote key to file") + cfg := &Config{ + PrivateKey: file.Name(), + } + pKey, err := privKey(cfg) + if err != nil { + t.Fatalf("Could not apply option: %v", err) + } + newEncoded := hex.EncodeToString(curve.FromECDSA(pKey)) + if newEncoded != keyStr { + t.Error("Private keys do not match") + } +} diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index e4fb71a74..4f0e34132 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -2,7 +2,9 @@ package p2p import ( "context" + "time" + "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/gogo/protobuf/proto" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/host" @@ -10,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" pubsub "github.com/libp2p/go-libp2p-pubsub" + ma "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder" "github.com/prysmaticlabs/prysm/shared" @@ -18,18 +21,18 @@ import ( ) var _ = shared.Service(&Service{}) +var pollingPeriod = 1 * time.Second // Service for managing peer to peer (p2p) networking. type Service struct { - ctx context.Context - cancel context.CancelFunc - - started bool - cfg *Config - startupErr error - - host host.Host - pubsub *pubsub.PubSub + ctx context.Context + cancel context.CancelFunc + started bool + cfg *Config + startupErr error + dv5Listener Listener + host host.Host + pubsub *pubsub.PubSub } // NewService initializes a new p2p service compatible with shared.Service interface. No @@ -51,13 +54,29 @@ func (s *Service) Start() { } s.started = true + ipAddr := ipAddr(s.cfg) + privKey, err := privKey(s.cfg) + if err != nil { + s.startupErr = err + return + } + // TODO(3147): Add host options - h, err := libp2p.New(s.ctx) + opts := buildOptions(s.cfg, ipAddr, privKey) + h, err := libp2p.New(s.ctx, opts...) if err != nil { s.startupErr = err return } s.host = h + listener, err := startDiscoveryV5(ipAddr, privKey, s.cfg) + if err != nil { + s.startupErr = err + return + } + s.dv5Listener = listener + + go s.listenForNewNodes() // TODO(3147): Add gossip sub options gs, err := pubsub.NewGossipSub(s.ctx, s.host) @@ -71,6 +90,7 @@ func (s *Service) Start() { // Stop the p2p service and terminate all peer connections. func (s *Service) Stop() error { s.started = false + s.dv5Listener.Close() return nil } @@ -106,6 +126,46 @@ func (s *Service) Disconnect(pid peer.ID) error { return nil } +// listen for new nodes watches for new nodes in the network and adds them to the peerstore. +func (s *Service) listenForNewNodes() { + node, err := discv5.ParseNode(s.cfg.BootstrapNodeAddr) + if err != nil { + log.Fatalf("could not parse bootstrap address: %v", err) + } + nodeID := node.ID + ticker := time.NewTicker(pollingPeriod) + for { + select { + case <-ticker.C: + nodes := s.dv5Listener.Lookup(nodeID) + multiAddresses := convertToMultiAddr(nodes) + s.connectWithAllPeers(multiAddresses) + // store furthest node as the next to lookup + nodeID = nodes[len(nodes)-1].ID + case <-s.ctx.Done(): + log.Debug("p2p context is closed, exiting routine") + break + + } + } +} + +func (s *Service) connectWithAllPeers(multiAddrs []ma.Multiaddr) { + addrInfos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...) + if err != nil { + log.Errorf("Could not convert to peer address info's from multiaddresses: %v", err) + return + } + for _, info := range addrInfos { + if info.ID == s.host.ID() { + continue + } + if err := s.host.Connect(s.ctx, info); err != nil { + log.Errorf("Could not connect with peer: %v", err) + } + } +} + // Subscribe to some topic. // TODO(3147): Remove // DEPRECATED: Do not use. diff --git a/beacon-chain/p2p/service_test.go b/beacon-chain/p2p/service_test.go index 02d5eed87..8a40f4a1b 100644 --- a/beacon-chain/p2p/service_test.go +++ b/beacon-chain/p2p/service_test.go @@ -1,16 +1,86 @@ package p2p import ( + "context" + "fmt" + "net" "testing" + "time" + "github.com/ethereum/go-ethereum/p2p/discv5" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" "github.com/prysmaticlabs/prysm/shared/testutil" logTest "github.com/sirupsen/logrus/hooks/test" ) +type mockListener struct{} + +func (m *mockListener) Self() *discv5.Node { + panic("implement me") +} + +func (m *mockListener) Close() { + //no-op +} + +func (m *mockListener) Lookup(discv5.NodeID) []*discv5.Node { + panic("implement me") +} + +func (m *mockListener) ReadRandomNodes([]*discv5.Node) int { + panic("implement me") +} + +func (m *mockListener) SetFallbackNodes([]*discv5.Node) error { + panic("implement me") +} + +func (m *mockListener) Resolve(discv5.NodeID) *discv5.Node { + panic("implement me") +} + +func (m *mockListener) RegisterTopic(discv5.Topic, <-chan struct{}) { + panic("implement me") +} + +func (m *mockListener) SearchTopic(discv5.Topic, <-chan time.Duration, chan<- *discv5.Node, chan<- bool) { + panic("implement me") +} + +func createPeer(t *testing.T, cfg *Config, port int) (Listener, host.Host) { + ipAddr, pkey := createAddrAndPrivKey(t) + ipAddr = net.ParseIP("127.0.0.1") + convertedKey := convertToInterfacePrivkey(pkey) + _, err := peer.IDFromPrivateKey(convertedKey) + if err != nil { + t.Fatal(err) + } + listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port)) + if err != nil { + t.Fatalf("Failed to p2p listen: %v", err) + } + h, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey), libp2p.ListenAddrs(listen)}...) + if err != nil { + t.Fatal(err) + } + cfg.UDPPort = uint(port) + cfg.Port = uint(port) + listener, err := startDiscoveryV5(ipAddr, pkey, cfg) + if err != nil { + t.Errorf("Could not start discovery for node: %v", err) + } + return listener, h +} + func TestService_Stop_SetsStartedToFalse(t *testing.T) { s, _ := NewService(nil) s.started = true + s.dv5Listener = &mockListener{} _ = s.Stop() + if s.started != false { t.Error("Expected Service.started to be false, got true") } @@ -19,7 +89,12 @@ func TestService_Stop_SetsStartedToFalse(t *testing.T) { func TestService_Start_OnlyStartsOnce(t *testing.T) { hook := logTest.NewGlobal() - s, _ := NewService(nil) + cfg := &Config{ + Port: 2000, + UDPPort: 2000, + } + s, _ := NewService(cfg) + s.dv5Listener = &mockListener{} defer s.Stop() s.Start() if s.started != true { @@ -31,7 +106,50 @@ func TestService_Start_OnlyStartsOnce(t *testing.T) { func TestService_Status_NotRunning(t *testing.T) { s := &Service{started: false} + s.dv5Listener = &mockListener{} if s.Status().Error() != "not running" { t.Errorf("Status returned wrong error, got %v", s.Status()) } } + +func TestListenForNewNodes(t *testing.T) { + // setup bootnode + port := 2000 + _, pkey := createAddrAndPrivKey(t) + ipAddr := net.ParseIP("127.0.0.1") + bootListener := createListener(ipAddr, port, pkey) + defer bootListener.Close() + + bootNode := bootListener.Self() + + cfg := &Config{ + BootstrapNodeAddr: bootNode.String(), + } + var listeners []*discv5.Network + // setup other nodes + for i := 1; i <= 5; i++ { + listener, _ := createPeer(t, cfg, port+i) + listeners = append(listeners, listener.(*discv5.Network)) + } + + cfg.Port = 4000 + cfg.UDPPort = 4000 + + s, err := NewService(cfg) + if err != nil { + t.Fatal(err) + } + + s.Start() + defer s.Stop() + + time.Sleep(2 * time.Second) + peers := s.host.Network().Peers() + if len(peers) != 5 { + t.Errorf("Not all peers added to peerstore, wanted %d but got %d", 5, len(peers)) + } + // close down all peers + for _, listener := range listeners { + listener.Close() + } +} diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 582b97e3e..2b7d0d2c2 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -22,10 +22,9 @@ import ( // TestP2P represents a p2p implementation that can be used for testing. type TestP2P struct { - t *testing.T - Host host.Host - pubsub *pubsub.PubSub - + t *testing.T + Host host.Host + pubsub *pubsub.PubSub BroadcastCalled bool } @@ -56,8 +55,8 @@ func (p *TestP2P) Connect(b *TestP2P) { } func connect(a, b host.Host) error { - pinfo := a.Peerstore().PeerInfo(a.ID()) - return b.Connect(context.Background(), pinfo) + pinfo := b.Peerstore().PeerInfo(b.ID()) + return a.Connect(context.Background(), pinfo) } // ReceiveRPC simulates an incoming RPC. diff --git a/beacon-chain/p2p/utils.go b/beacon-chain/p2p/utils.go new file mode 100644 index 000000000..8baf6e9b5 --- /dev/null +++ b/beacon-chain/p2p/utils.go @@ -0,0 +1,65 @@ +package p2p + +import ( + "crypto/ecdsa" + "crypto/rand" + "net" + "os" + + "github.com/btcsuite/btcd/btcec" + curve "github.com/ethereum/go-ethereum/crypto" + "github.com/libp2p/go-libp2p-core/crypto" + "github.com/prysmaticlabs/prysm/shared/iputils" +) + +func convertFromInterfacePrivKey(privkey crypto.PrivKey) *ecdsa.PrivateKey { + typeAssertedKey := (*ecdsa.PrivateKey)((*btcec.PrivateKey)(privkey.(*crypto.Secp256k1PrivateKey))) + return typeAssertedKey +} + +func convertToInterfacePrivkey(privkey *ecdsa.PrivateKey) crypto.PrivKey { + typeAssertedKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)((*btcec.PrivateKey)(privkey))) + return typeAssertedKey +} + +func convertFromInterfacePubKey(pubkey crypto.PubKey) *ecdsa.PublicKey { + typeAssertedKey := (*ecdsa.PublicKey)((*btcec.PublicKey)(pubkey.(*crypto.Secp256k1PublicKey))) + return typeAssertedKey +} + +func convertToInterfacePubkey(pubkey *ecdsa.PublicKey) crypto.PubKey { + typeAssertedKey := crypto.PubKey((*crypto.Secp256k1PublicKey)((*btcec.PublicKey)(pubkey))) + return typeAssertedKey +} + +func privKey(cfg *Config) (*ecdsa.PrivateKey, error) { + if cfg.PrivateKey == "" { + priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader) + if err != nil { + return nil, err + } + convertedKey := convertFromInterfacePrivKey(priv) + return convertedKey, nil + } + if _, err := os.Stat(cfg.PrivateKey); os.IsNotExist(err) { + log.WithField("private key file", cfg.PrivateKey).Warn("Could not read private key, file is missing or unreadable") + return nil, err + } + priv, err := curve.LoadECDSA(cfg.PrivateKey) + if err != nil { + log.WithError(err).Error("Error reading private key from file") + return nil, err + } + return priv, nil +} + +func ipAddr(cfg *Config) net.IP { + if cfg.HostAddress == "" { + ip, err := iputils.ExternalIPv4() + if err != nil { + log.Fatalf("Could not get IPv4 address: %v", err) + } + return net.ParseIP(ip) + } + return net.ParseIP(cfg.HostAddress) +}