mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-24 12:27:18 +00:00
Implement Discv5 in Prysm (#3211)
* add discovery * gaz * add build options * add udpPort * add more changes * refactor private key * added discovery loop * add ttl * add ttl * use ip type instead of string * tests pass * gaz and new test file * add test * add more tests * add one more test * adding multiAddr tests * adding new protocol , listener * fix keys * more fixes * more changes dialing peers works now * gaz * add more changes * add more changes * gaz * add new test helpers * new test * fixed all tests * gaz * reduce sleep * lint * new changes * change formats * fix all this stuff * remove discv5 protocol * remove protocol * remove port condition,too restrictive * preston's feedback * preston's feedback * close all peers * gaz * remove unused func * Update beacon-chain/p2p/service.go Co-Authored-By: Preston Van Loon <preston@prysmaticlabs.com> * remove build options * refactor tests
This commit is contained in:
parent
8fc3c55199
commit
3cb32c3792
@ -6,13 +6,16 @@ go_library(
|
||||
"broadcaster.go",
|
||||
"config.go",
|
||||
"deprecated.go",
|
||||
"discovery.go",
|
||||
"doc.go",
|
||||
"gossip_topic_mappings.go",
|
||||
"handshake.go",
|
||||
"interfaces.go",
|
||||
"log.go",
|
||||
"options.go",
|
||||
"sender.go",
|
||||
"service.go",
|
||||
"utils.go",
|
||||
],
|
||||
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p",
|
||||
visibility = ["//beacon-chain:__subpackages__"],
|
||||
@ -23,13 +26,19 @@ go_library(
|
||||
"//shared:go_default_library",
|
||||
"//shared/deprecated-p2p:go_default_library",
|
||||
"//shared/event:go_default_library",
|
||||
"//shared/iputils:go_default_library",
|
||||
"@com_github_btcsuite_btcd//btcec:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//p2p/discv5:go_default_library",
|
||||
"@com_github_gogo_protobuf//proto:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//crypto:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//host:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//protocol:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
|
||||
"@com_github_multiformats_go_multiaddr//:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
],
|
||||
@ -39,6 +48,8 @@ go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"broadcaster_test.go",
|
||||
"discovery_test.go",
|
||||
"options_test.go",
|
||||
"parameter_test.go",
|
||||
"service_test.go",
|
||||
],
|
||||
@ -46,9 +57,16 @@ go_test(
|
||||
deps = [
|
||||
"//beacon-chain/p2p/testing:go_default_library",
|
||||
"//proto/testing:go_default_library",
|
||||
"//shared/iputils:go_default_library",
|
||||
"//shared/testutil:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//p2p/discv5:go_default_library",
|
||||
"@com_github_gogo_protobuf//proto:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//host:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
|
||||
"@com_github_multiformats_go_multiaddr//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -10,6 +10,7 @@ type Config struct {
|
||||
HostAddress string
|
||||
PrivateKey string
|
||||
Port uint
|
||||
UDPPort uint
|
||||
MaxPeers uint
|
||||
WhitelistCIDR string
|
||||
EnableUPnP bool
|
||||
|
85
beacon-chain/p2p/discovery.go
Normal file
85
beacon-chain/p2p/discovery.go
Normal file
@ -0,0 +1,85 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/discv5"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
// Listener defines the discovery V5 network interface that is used
|
||||
// to communicate with other peers.
|
||||
type Listener interface {
|
||||
Self() *discv5.Node
|
||||
Close()
|
||||
Lookup(discv5.NodeID) []*discv5.Node
|
||||
ReadRandomNodes([]*discv5.Node) int
|
||||
SetFallbackNodes([]*discv5.Node) error
|
||||
Resolve(discv5.NodeID) *discv5.Node
|
||||
RegisterTopic(discv5.Topic, <-chan struct{})
|
||||
SearchTopic(discv5.Topic, <-chan time.Duration, chan<- *discv5.Node, chan<- bool)
|
||||
}
|
||||
|
||||
func createListener(ipAddr net.IP, port int, privKey *ecdsa.PrivateKey) *discv5.Network {
|
||||
udpAddr := &net.UDPAddr{
|
||||
IP: ipAddr,
|
||||
Port: port,
|
||||
}
|
||||
conn, err := net.ListenUDP("udp4", udpAddr)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
network, err := discv5.ListenUDP(privKey, conn, "", nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
return network
|
||||
}
|
||||
|
||||
func startDiscoveryV5(addr net.IP, privKey *ecdsa.PrivateKey, cfg *Config) (*discv5.Network, error) {
|
||||
listener := createListener(addr, int(cfg.UDPPort), privKey)
|
||||
bootNode, err := discv5.ParseNode(cfg.BootstrapNodeAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := listener.SetFallbackNodes([]*discv5.Node{bootNode}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
node := listener.Self()
|
||||
log.Infof("Started Discovery: %s", node.String())
|
||||
return listener, nil
|
||||
}
|
||||
|
||||
func convertToMultiAddr(nodes []*discv5.Node) []ma.Multiaddr {
|
||||
var multiAddrs []ma.Multiaddr
|
||||
for _, node := range nodes {
|
||||
ip4 := node.IP.To4()
|
||||
if ip4 == nil {
|
||||
log.Error("Node doesn't have an ip4 address")
|
||||
continue
|
||||
}
|
||||
pubkey, err := node.ID.Pubkey()
|
||||
if err != nil {
|
||||
log.Errorf("Could not get pubkey from node ID: %v", err)
|
||||
continue
|
||||
}
|
||||
assertedKey := convertToInterfacePubkey(pubkey)
|
||||
id, err := peer.IDFromPublicKey(assertedKey)
|
||||
if err != nil {
|
||||
log.Errorf("Could not get peer id: %v", err)
|
||||
}
|
||||
multiAddrString := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ip4.String(), node.TCP, id)
|
||||
multiAddr, err := ma.NewMultiaddr(multiAddrString)
|
||||
if err != nil {
|
||||
log.Errorf("Could not get multiaddr:%v", err)
|
||||
continue
|
||||
}
|
||||
multiAddrs = append(multiAddrs, multiAddr)
|
||||
}
|
||||
return multiAddrs
|
||||
}
|
119
beacon-chain/p2p/discovery_test.go
Normal file
119
beacon-chain/p2p/discovery_test.go
Normal file
@ -0,0 +1,119 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"crypto/rand"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/p2p/discv5"
|
||||
"github.com/prysmaticlabs/prysm/shared/iputils"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
func createAddrAndPrivKey(t *testing.T) (net.IP, *ecdsa.PrivateKey) {
|
||||
ip, err := iputils.ExternalIPv4()
|
||||
if err != nil {
|
||||
t.Fatalf("Could not get ip: %v", err)
|
||||
}
|
||||
ipAddr := net.ParseIP(ip)
|
||||
pkey, err := privKey(&Config{})
|
||||
if err != nil {
|
||||
t.Fatalf("Could not get private key: %v", err)
|
||||
}
|
||||
return ipAddr, pkey
|
||||
}
|
||||
|
||||
func TestCreateListener(t *testing.T) {
|
||||
port := 1024
|
||||
ipAddr, pkey := createAddrAndPrivKey(t)
|
||||
listener := createListener(ipAddr, port, pkey)
|
||||
defer listener.Close()
|
||||
|
||||
if !listener.Self().IP.Equal(ipAddr) {
|
||||
t.Errorf("Ip address is not the expected type, wanted %s but got %s", ipAddr.String(), listener.Self().IP.String())
|
||||
}
|
||||
|
||||
if port != int(listener.Self().UDP) {
|
||||
t.Errorf("In correct port number, wanted %d but got %d", port, listener.Self().UDP)
|
||||
}
|
||||
pubkey, err := listener.Self().ID.Pubkey()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
XisSame := pkey.PublicKey.X.Cmp(pubkey.X) == 0
|
||||
YisSame := pkey.PublicKey.Y.Cmp(pubkey.Y) == 0
|
||||
|
||||
if !(XisSame && YisSame) {
|
||||
t.Error("Pubkey is different from what was used to create the listener")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartDiscV5_DiscoverAllPeers(t *testing.T) {
|
||||
port := 2000
|
||||
ipAddr, pkey := createAddrAndPrivKey(t)
|
||||
bootListener := createListener(ipAddr, port, pkey)
|
||||
defer bootListener.Close()
|
||||
|
||||
bootNode := bootListener.Self()
|
||||
|
||||
cfg := &Config{
|
||||
BootstrapNodeAddr: bootNode.String(),
|
||||
}
|
||||
|
||||
var listeners []*discv5.Network
|
||||
for i := 1; i <= 10; i++ {
|
||||
port = 2000 + i
|
||||
cfg.UDPPort = uint(port)
|
||||
ipAddr, pkey := createAddrAndPrivKey(t)
|
||||
listener, err := startDiscoveryV5(ipAddr, pkey, cfg)
|
||||
if err != nil {
|
||||
t.Errorf("Could not start discovery for node: %v", err)
|
||||
}
|
||||
listeners = append(listeners, listener)
|
||||
}
|
||||
|
||||
// Wait for the nodes to have their local routing tables to be populated with the other nodes
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
lastListener := listeners[len(listeners)-1]
|
||||
nodes := lastListener.Lookup(bootNode.ID)
|
||||
if len(nodes) != 11 {
|
||||
t.Errorf("The node's local table doesn't have the expected number of nodes. "+
|
||||
"Expected %d but got %d", 11, len(nodes))
|
||||
}
|
||||
|
||||
// Close all ports
|
||||
for _, listener := range listeners {
|
||||
listener.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func TestMultiAddrsConversion_InvalidIPAddr(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ipAddr := net.IPv6zero
|
||||
pkey, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not generate key %v", err)
|
||||
}
|
||||
nodeID := discv5.PubkeyID(&pkey.PublicKey)
|
||||
node := discv5.NewNode(nodeID, ipAddr, 0, 0)
|
||||
_ = convertToMultiAddr([]*discv5.Node{node})
|
||||
|
||||
testutil.AssertLogsContain(t, hook, "Node doesn't have an ip4 address")
|
||||
}
|
||||
|
||||
func TestMultiAddrConversion_OK(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
port := 1024
|
||||
ipAddr, pkey := createAddrAndPrivKey(t)
|
||||
listener := createListener(ipAddr, port, pkey)
|
||||
|
||||
_ = convertToMultiAddr([]*discv5.Node{listener.Self()})
|
||||
testutil.AssertLogsDoNotContain(t, hook, "Node doesn't have an ip4 address")
|
||||
testutil.AssertLogsDoNotContain(t, hook, "Invalid port, the tcp port of the node is a reserved port")
|
||||
testutil.AssertLogsDoNotContain(t, hook, "Could not get multiaddr")
|
||||
}
|
42
beacon-chain/p2p/options.go
Normal file
42
beacon-chain/p2p/options.go
Normal file
@ -0,0 +1,42 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
// buildOptions for the libp2p host.
|
||||
func buildOptions(cfg *Config, ip net.IP, priKey *ecdsa.PrivateKey) []libp2p.Option {
|
||||
listen, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ip, cfg.Port))
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to p2p listen: %v", err)
|
||||
}
|
||||
options := []libp2p.Option{
|
||||
privKeyOption(priKey),
|
||||
libp2p.ListenAddrs(listen),
|
||||
}
|
||||
if cfg.EnableUPnP {
|
||||
options = append(options, libp2p.NATPortMap()) //Allow to use UPnP
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
// Adds a private key to the libp2p option if the option was provided.
|
||||
// If the private key file is missing or cannot be read, or if the
|
||||
// private key contents cannot be marshaled, an exception is thrown.
|
||||
func privKeyOption(privkey *ecdsa.PrivateKey) libp2p.Option {
|
||||
return func(cfg *libp2p.Config) error {
|
||||
convertedKey := convertToInterfacePrivkey(privkey)
|
||||
id, err := peer.IDFromPrivateKey(convertedKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.WithField("peer id", id.Pretty()).Info("Private key generated. Announcing peer id")
|
||||
return cfg.Apply(libp2p.Identity(convertedKey))
|
||||
}
|
||||
}
|
42
beacon-chain/p2p/options_test.go
Normal file
42
beacon-chain/p2p/options_test.go
Normal file
@ -0,0 +1,42 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
curve "github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
)
|
||||
|
||||
func TestPrivateKeyLoading(t *testing.T) {
|
||||
file, err := ioutil.TempFile(testutil.TempDir(), "key")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer os.Remove(file.Name())
|
||||
key, err := ecdsa.GenerateKey(curve.S256(), rand.Reader)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not generate key: %v", err)
|
||||
}
|
||||
keyStr := hex.EncodeToString(curve.FromECDSA(key))
|
||||
err = ioutil.WriteFile(file.Name(), []byte(keyStr), 0600)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not write key to file: %v", err)
|
||||
}
|
||||
log.WithField("file", file.Name()).WithField("key", keyStr).Info("Wrote key to file")
|
||||
cfg := &Config{
|
||||
PrivateKey: file.Name(),
|
||||
}
|
||||
pKey, err := privKey(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not apply option: %v", err)
|
||||
}
|
||||
newEncoded := hex.EncodeToString(curve.FromECDSA(pKey))
|
||||
if newEncoded != keyStr {
|
||||
t.Error("Private keys do not match")
|
||||
}
|
||||
}
|
@ -2,7 +2,9 @@ package p2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/discv5"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
@ -10,6 +12,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
|
||||
"github.com/prysmaticlabs/prysm/shared"
|
||||
@ -18,18 +21,18 @@ import (
|
||||
)
|
||||
|
||||
var _ = shared.Service(&Service{})
|
||||
var pollingPeriod = 1 * time.Second
|
||||
|
||||
// Service for managing peer to peer (p2p) networking.
|
||||
type Service struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
started bool
|
||||
cfg *Config
|
||||
startupErr error
|
||||
|
||||
host host.Host
|
||||
pubsub *pubsub.PubSub
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
started bool
|
||||
cfg *Config
|
||||
startupErr error
|
||||
dv5Listener Listener
|
||||
host host.Host
|
||||
pubsub *pubsub.PubSub
|
||||
}
|
||||
|
||||
// NewService initializes a new p2p service compatible with shared.Service interface. No
|
||||
@ -51,13 +54,29 @@ func (s *Service) Start() {
|
||||
}
|
||||
s.started = true
|
||||
|
||||
ipAddr := ipAddr(s.cfg)
|
||||
privKey, err := privKey(s.cfg)
|
||||
if err != nil {
|
||||
s.startupErr = err
|
||||
return
|
||||
}
|
||||
|
||||
// TODO(3147): Add host options
|
||||
h, err := libp2p.New(s.ctx)
|
||||
opts := buildOptions(s.cfg, ipAddr, privKey)
|
||||
h, err := libp2p.New(s.ctx, opts...)
|
||||
if err != nil {
|
||||
s.startupErr = err
|
||||
return
|
||||
}
|
||||
s.host = h
|
||||
listener, err := startDiscoveryV5(ipAddr, privKey, s.cfg)
|
||||
if err != nil {
|
||||
s.startupErr = err
|
||||
return
|
||||
}
|
||||
s.dv5Listener = listener
|
||||
|
||||
go s.listenForNewNodes()
|
||||
|
||||
// TODO(3147): Add gossip sub options
|
||||
gs, err := pubsub.NewGossipSub(s.ctx, s.host)
|
||||
@ -71,6 +90,7 @@ func (s *Service) Start() {
|
||||
// Stop the p2p service and terminate all peer connections.
|
||||
func (s *Service) Stop() error {
|
||||
s.started = false
|
||||
s.dv5Listener.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -106,6 +126,46 @@ func (s *Service) Disconnect(pid peer.ID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
|
||||
func (s *Service) listenForNewNodes() {
|
||||
node, err := discv5.ParseNode(s.cfg.BootstrapNodeAddr)
|
||||
if err != nil {
|
||||
log.Fatalf("could not parse bootstrap address: %v", err)
|
||||
}
|
||||
nodeID := node.ID
|
||||
ticker := time.NewTicker(pollingPeriod)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
nodes := s.dv5Listener.Lookup(nodeID)
|
||||
multiAddresses := convertToMultiAddr(nodes)
|
||||
s.connectWithAllPeers(multiAddresses)
|
||||
// store furthest node as the next to lookup
|
||||
nodeID = nodes[len(nodes)-1].ID
|
||||
case <-s.ctx.Done():
|
||||
log.Debug("p2p context is closed, exiting routine")
|
||||
break
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) connectWithAllPeers(multiAddrs []ma.Multiaddr) {
|
||||
addrInfos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...)
|
||||
if err != nil {
|
||||
log.Errorf("Could not convert to peer address info's from multiaddresses: %v", err)
|
||||
return
|
||||
}
|
||||
for _, info := range addrInfos {
|
||||
if info.ID == s.host.ID() {
|
||||
continue
|
||||
}
|
||||
if err := s.host.Connect(s.ctx, info); err != nil {
|
||||
log.Errorf("Could not connect with peer: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe to some topic.
|
||||
// TODO(3147): Remove
|
||||
// DEPRECATED: Do not use.
|
||||
|
@ -1,16 +1,86 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/discv5"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
type mockListener struct{}
|
||||
|
||||
func (m *mockListener) Self() *discv5.Node {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mockListener) Close() {
|
||||
//no-op
|
||||
}
|
||||
|
||||
func (m *mockListener) Lookup(discv5.NodeID) []*discv5.Node {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mockListener) ReadRandomNodes([]*discv5.Node) int {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mockListener) SetFallbackNodes([]*discv5.Node) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mockListener) Resolve(discv5.NodeID) *discv5.Node {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mockListener) RegisterTopic(discv5.Topic, <-chan struct{}) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mockListener) SearchTopic(discv5.Topic, <-chan time.Duration, chan<- *discv5.Node, chan<- bool) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func createPeer(t *testing.T, cfg *Config, port int) (Listener, host.Host) {
|
||||
ipAddr, pkey := createAddrAndPrivKey(t)
|
||||
ipAddr = net.ParseIP("127.0.0.1")
|
||||
convertedKey := convertToInterfacePrivkey(pkey)
|
||||
_, err := peer.IDFromPrivateKey(convertedKey)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to p2p listen: %v", err)
|
||||
}
|
||||
h, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey), libp2p.ListenAddrs(listen)}...)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cfg.UDPPort = uint(port)
|
||||
cfg.Port = uint(port)
|
||||
listener, err := startDiscoveryV5(ipAddr, pkey, cfg)
|
||||
if err != nil {
|
||||
t.Errorf("Could not start discovery for node: %v", err)
|
||||
}
|
||||
return listener, h
|
||||
}
|
||||
|
||||
func TestService_Stop_SetsStartedToFalse(t *testing.T) {
|
||||
s, _ := NewService(nil)
|
||||
s.started = true
|
||||
s.dv5Listener = &mockListener{}
|
||||
_ = s.Stop()
|
||||
|
||||
if s.started != false {
|
||||
t.Error("Expected Service.started to be false, got true")
|
||||
}
|
||||
@ -19,7 +89,12 @@ func TestService_Stop_SetsStartedToFalse(t *testing.T) {
|
||||
func TestService_Start_OnlyStartsOnce(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
|
||||
s, _ := NewService(nil)
|
||||
cfg := &Config{
|
||||
Port: 2000,
|
||||
UDPPort: 2000,
|
||||
}
|
||||
s, _ := NewService(cfg)
|
||||
s.dv5Listener = &mockListener{}
|
||||
defer s.Stop()
|
||||
s.Start()
|
||||
if s.started != true {
|
||||
@ -31,7 +106,50 @@ func TestService_Start_OnlyStartsOnce(t *testing.T) {
|
||||
|
||||
func TestService_Status_NotRunning(t *testing.T) {
|
||||
s := &Service{started: false}
|
||||
s.dv5Listener = &mockListener{}
|
||||
if s.Status().Error() != "not running" {
|
||||
t.Errorf("Status returned wrong error, got %v", s.Status())
|
||||
}
|
||||
}
|
||||
|
||||
func TestListenForNewNodes(t *testing.T) {
|
||||
// setup bootnode
|
||||
port := 2000
|
||||
_, pkey := createAddrAndPrivKey(t)
|
||||
ipAddr := net.ParseIP("127.0.0.1")
|
||||
bootListener := createListener(ipAddr, port, pkey)
|
||||
defer bootListener.Close()
|
||||
|
||||
bootNode := bootListener.Self()
|
||||
|
||||
cfg := &Config{
|
||||
BootstrapNodeAddr: bootNode.String(),
|
||||
}
|
||||
var listeners []*discv5.Network
|
||||
// setup other nodes
|
||||
for i := 1; i <= 5; i++ {
|
||||
listener, _ := createPeer(t, cfg, port+i)
|
||||
listeners = append(listeners, listener.(*discv5.Network))
|
||||
}
|
||||
|
||||
cfg.Port = 4000
|
||||
cfg.UDPPort = 4000
|
||||
|
||||
s, err := NewService(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s.Start()
|
||||
defer s.Stop()
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
peers := s.host.Network().Peers()
|
||||
if len(peers) != 5 {
|
||||
t.Errorf("Not all peers added to peerstore, wanted %d but got %d", 5, len(peers))
|
||||
}
|
||||
// close down all peers
|
||||
for _, listener := range listeners {
|
||||
listener.Close()
|
||||
}
|
||||
}
|
||||
|
@ -22,10 +22,9 @@ import (
|
||||
|
||||
// TestP2P represents a p2p implementation that can be used for testing.
|
||||
type TestP2P struct {
|
||||
t *testing.T
|
||||
Host host.Host
|
||||
pubsub *pubsub.PubSub
|
||||
|
||||
t *testing.T
|
||||
Host host.Host
|
||||
pubsub *pubsub.PubSub
|
||||
BroadcastCalled bool
|
||||
}
|
||||
|
||||
@ -56,8 +55,8 @@ func (p *TestP2P) Connect(b *TestP2P) {
|
||||
}
|
||||
|
||||
func connect(a, b host.Host) error {
|
||||
pinfo := a.Peerstore().PeerInfo(a.ID())
|
||||
return b.Connect(context.Background(), pinfo)
|
||||
pinfo := b.Peerstore().PeerInfo(b.ID())
|
||||
return a.Connect(context.Background(), pinfo)
|
||||
}
|
||||
|
||||
// ReceiveRPC simulates an incoming RPC.
|
||||
|
65
beacon-chain/p2p/utils.go
Normal file
65
beacon-chain/p2p/utils.go
Normal file
@ -0,0 +1,65 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"crypto/rand"
|
||||
"net"
|
||||
"os"
|
||||
|
||||
"github.com/btcsuite/btcd/btcec"
|
||||
curve "github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/prysmaticlabs/prysm/shared/iputils"
|
||||
)
|
||||
|
||||
func convertFromInterfacePrivKey(privkey crypto.PrivKey) *ecdsa.PrivateKey {
|
||||
typeAssertedKey := (*ecdsa.PrivateKey)((*btcec.PrivateKey)(privkey.(*crypto.Secp256k1PrivateKey)))
|
||||
return typeAssertedKey
|
||||
}
|
||||
|
||||
func convertToInterfacePrivkey(privkey *ecdsa.PrivateKey) crypto.PrivKey {
|
||||
typeAssertedKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)((*btcec.PrivateKey)(privkey)))
|
||||
return typeAssertedKey
|
||||
}
|
||||
|
||||
func convertFromInterfacePubKey(pubkey crypto.PubKey) *ecdsa.PublicKey {
|
||||
typeAssertedKey := (*ecdsa.PublicKey)((*btcec.PublicKey)(pubkey.(*crypto.Secp256k1PublicKey)))
|
||||
return typeAssertedKey
|
||||
}
|
||||
|
||||
func convertToInterfacePubkey(pubkey *ecdsa.PublicKey) crypto.PubKey {
|
||||
typeAssertedKey := crypto.PubKey((*crypto.Secp256k1PublicKey)((*btcec.PublicKey)(pubkey)))
|
||||
return typeAssertedKey
|
||||
}
|
||||
|
||||
func privKey(cfg *Config) (*ecdsa.PrivateKey, error) {
|
||||
if cfg.PrivateKey == "" {
|
||||
priv, _, err := crypto.GenerateSecp256k1Key(rand.Reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
convertedKey := convertFromInterfacePrivKey(priv)
|
||||
return convertedKey, nil
|
||||
}
|
||||
if _, err := os.Stat(cfg.PrivateKey); os.IsNotExist(err) {
|
||||
log.WithField("private key file", cfg.PrivateKey).Warn("Could not read private key, file is missing or unreadable")
|
||||
return nil, err
|
||||
}
|
||||
priv, err := curve.LoadECDSA(cfg.PrivateKey)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error reading private key from file")
|
||||
return nil, err
|
||||
}
|
||||
return priv, nil
|
||||
}
|
||||
|
||||
func ipAddr(cfg *Config) net.IP {
|
||||
if cfg.HostAddress == "" {
|
||||
ip, err := iputils.ExternalIPv4()
|
||||
if err != nil {
|
||||
log.Fatalf("Could not get IPv4 address: %v", err)
|
||||
}
|
||||
return net.ParseIP(ip)
|
||||
}
|
||||
return net.ParseIP(cfg.HostAddress)
|
||||
}
|
Loading…
Reference in New Issue
Block a user