Add Support for Static Peering (#3272)

* add test and support for static peering

* gaz

* remove delay

* add log

* handle all peers
This commit is contained in:
Nishant Das 2019-08-22 20:53:16 +05:30 committed by Raul Jordan
parent bb542d2032
commit e0d3e78746
5 changed files with 96 additions and 15 deletions

View File

@ -31,6 +31,7 @@ go_library(
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/discv5:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_ipfs_go_ipfs_addr//:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p_core//crypto:go_default_library",
"@com_github_libp2p_go_libp2p_core//host:go_default_library",
@ -64,7 +65,6 @@ go_test(
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p_core//host:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",

View File

@ -7,8 +7,10 @@ import (
"time"
"github.com/ethereum/go-ethereum/p2p/discv5"
iaddr "github.com/ipfs/go-ipfs-addr"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
)
// Listener defines the discovery V5 network interface that is used
@ -83,3 +85,23 @@ func convertToMultiAddr(nodes []*discv5.Node) []ma.Multiaddr {
}
return multiAddrs
}
func manyMultiAddrsFromString(addrs []string) ([]ma.Multiaddr, error) {
var allAddrs []ma.Multiaddr
for _, stringAddr := range addrs {
addr, err := multiAddrFromString(stringAddr)
if err != nil {
return nil, errors.Wrapf(err, "Could not get multiaddr from string")
}
allAddrs = append(allAddrs, addr)
}
return allAddrs, nil
}
func multiAddrFromString(address string) (ma.Multiaddr, error) {
addr, err := iaddr.ParseString(address)
if err != nil {
return nil, err
}
return addr.Multiaddr(), nil
}

View File

@ -3,12 +3,14 @@ package p2p
import (
"crypto/ecdsa"
"crypto/rand"
"fmt"
"net"
"testing"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/libp2p/go-libp2p-core/host"
"github.com/prysmaticlabs/prysm/shared/iputils"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
@ -117,3 +119,40 @@ func TestMultiAddrConversion_OK(t *testing.T) {
testutil.AssertLogsDoNotContain(t, hook, "Invalid port, the tcp port of the node is a reserved port")
testutil.AssertLogsDoNotContain(t, hook, "Could not get multiaddr")
}
func TestStaticPeering_PeersAreAdded(t *testing.T) {
cfg := &Config{}
port := 3000
var staticPeers []string
var hosts []host.Host
// setup other nodes
for i := 1; i <= 5; i++ {
h, _, ipaddr := createHost(t, port+i)
staticPeers = append(staticPeers, fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ipaddr, port+i, h.ID()))
hosts = append(hosts, h)
}
defer func() {
for _, h := range hosts {
_ = h.Close()
}
}()
cfg.Port = 4000
cfg.UDPPort = 4000
cfg.StaticPeers = staticPeers
s, err := NewService(cfg)
if err != nil {
t.Fatal(err)
}
s.Start()
s.dv5Listener = &mockListener{}
defer s.Stop()
peers := s.host.Network().Peers()
if len(peers) != 5 {
t.Errorf("Not all peers added to peerstore, wanted %d but got %d", 5, len(peers))
}
}

View File

@ -80,6 +80,14 @@ func (s *Service) Start() {
go s.listenForNewNodes()
}
if len(s.cfg.StaticPeers) > 0 {
addrs, err := manyMultiAddrsFromString(s.cfg.StaticPeers)
if err != nil {
log.Errorf("Could not connect to static peer: %v", err)
}
s.connectWithAllPeers(addrs)
}
// TODO(3147): Add gossip sub options
gs, err := pubsub.NewGossipSub(s.ctx, s.host)
if err != nil {
@ -91,6 +99,9 @@ func (s *Service) Start() {
s.pubsub = gs
s.started = true
multiAddrs := s.host.Network().ListenAddresses()
log.Infof("Node currently listening at %s", multiAddrs[1].String())
}
// Stop the p2p service and terminate all peer connections.

View File

@ -2,6 +2,7 @@ package p2p
import (
"context"
"crypto/ecdsa"
"fmt"
"net"
"testing"
@ -10,7 +11,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
@ -51,13 +51,19 @@ func (m *mockListener) SearchTopic(discv5.Topic, <-chan time.Duration, chan<- *d
}
func createPeer(t *testing.T, cfg *Config, port int) (Listener, host.Host) {
h, pkey, ipAddr := createHost(t, port)
cfg.UDPPort = uint(port)
cfg.Port = uint(port)
listener, err := startDiscoveryV5(ipAddr, pkey, cfg)
if err != nil {
t.Errorf("Could not start discovery for node: %v", err)
}
return listener, h
}
func createHost(t *testing.T, port int) (host.Host, *ecdsa.PrivateKey, net.IP) {
ipAddr, pkey := createAddrAndPrivKey(t)
ipAddr = net.ParseIP("127.0.0.1")
convertedKey := convertToInterfacePrivkey(pkey)
_, err := peer.IDFromPrivateKey(convertedKey)
if err != nil {
t.Fatal(err)
}
listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port))
if err != nil {
t.Fatalf("Failed to p2p listen: %v", err)
@ -66,13 +72,7 @@ func createPeer(t *testing.T, cfg *Config, port int) (Listener, host.Host) {
if err != nil {
t.Fatal(err)
}
cfg.UDPPort = uint(port)
cfg.Port = uint(port)
listener, err := startDiscoveryV5(ipAddr, pkey, cfg)
if err != nil {
t.Errorf("Could not start discovery for node: %v", err)
}
return listener, h
return h, pkey, ipAddr
}
func TestService_Stop_SetsStartedToFalse(t *testing.T) {
@ -126,12 +126,21 @@ func TestListenForNewNodes(t *testing.T) {
BootstrapNodeAddr: bootNode.String(),
}
var listeners []*discv5.Network
var hosts []host.Host
// setup other nodes
for i := 1; i <= 5; i++ {
listener, _ := createPeer(t, cfg, port+i)
listener, h := createPeer(t, cfg, port+i)
listeners = append(listeners, listener.(*discv5.Network))
hosts = append(hosts, h)
}
// close peers upon exit of test
defer func() {
for _, h := range hosts {
_ = h.Close()
}
}()
cfg.Port = 4000
cfg.UDPPort = 4000