From 0b70c3ea856262934780b13fa28ceff0a987e50f Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Sat, 6 Jun 2020 11:46:36 +0800 Subject: [PATCH] Clean Up Discovery Filtering of Peers (#6128) * clean up * fix test * rename * context * preston's review * remove dep Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- beacon-chain/p2p/discovery.go | 69 +++++++++++++++++++++++++ beacon-chain/p2p/fork_test.go | 35 ++++++++++--- beacon-chain/p2p/service.go | 96 +++++++++-------------------------- 3 files changed, 122 insertions(+), 78 deletions(-) diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index b976bb1fe..e16f5240b 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -11,9 +11,11 @@ import ( "github.com/ethereum/go-ethereum/p2p/enr" iaddr "github.com/ipfs/go-ipfs-addr" core "github.com/libp2p/go-libp2p-core" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) // Listener defines the discovery V5 network interface that is used @@ -122,6 +124,61 @@ func (s *Service) startDiscoveryV5( return listener, nil } +// filterPeer validates each node that we retrieve from our dht. We +// try to ascertain that the peer can be a valid protocol peer. +// Validity Conditions: +// 1) The local node is still actively looking for peers to +// connect to. +// 2) Peer has a valid IP and TCP port set in their enr. +// 3) Peer hasn't been marked as 'bad' +// 4) Peer is not currently active or connected. +// 5) Peer's fork digest in their ENR matches that of +// our localnodes. +func (s *Service) filterPeer(node *enode.Node) bool { + if len(s.Peers().Active()) >= int(s.cfg.MaxPeers) { + log.WithFields(logrus.Fields{"peer": node.String(), + "reason": "at peer limit"}).Trace("Not dialing peer") + return false + } + // ignore nodes with no ip address stored. + if node.IP() == nil { + return false + } + // do not dial nodes with their tcp ports not set + if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil { + if !enr.IsNotFound(err) { + log.WithError(err).Debug("Could not retrieve tcp port") + } + return false + } + peerData, multiAddr, err := convertToAddrInfo(node) + if err != nil { + log.WithError(err).Debug("Could not convert to peer data") + return false + } + if s.peers.IsBad(peerData.ID) { + return false + } + if s.peers.IsActive(peerData.ID) { + return false + } + if s.host.Network().Connectedness(peerData.ID) == network.Connected { + return false + } + nodeENR := node.Record() + // Decide whether or not to connect to peer that does not + // match the proper fork ENR data with our local node. + if s.genesisValidatorsRoot != nil { + if err := s.compareForkENR(nodeENR); err != nil { + log.WithError(err).Trace("Fork ENR mismatches between peer and local node") + return false + } + } + // Add peer to peer handler. + s.peers.Add(nodeENR, peerData.ID, multiAddr, network.DirUnknown) + return true +} + // startDHTDiscovery supports discovery via DHT. func startDHTDiscovery(host core.Host, bootstrapAddr string) error { multiAddr, err := multiAddrFromString(bootstrapAddr) @@ -182,6 +239,18 @@ func convertToMultiAddr(nodes []*enode.Node) []ma.Multiaddr { return multiAddrs } +func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, ma.Multiaddr, error) { + multiAddr, err := convertToSingleMultiAddr(node) + if err != nil { + return nil, nil, err + } + info, err := peer.AddrInfoFromP2pAddr(multiAddr) + if err != nil { + return nil, nil, err + } + return info, multiAddr, nil +} + func convertToSingleMultiAddr(node *enode.Node) (ma.Multiaddr, error) { ip4 := node.IP().To4() if ip4 == nil { diff --git a/beacon-chain/p2p/fork_test.go b/beacon-chain/p2p/fork_test.go index 808409c34..999f27ee5 100644 --- a/beacon-chain/p2p/fork_test.go +++ b/beacon-chain/p2p/fork_test.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" + ma "github.com/multiformats/go-multiaddr" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/p2putils" @@ -84,6 +85,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) { // bootnode given all nodes provided by discv5 will have different fork digests. cfg.UDPPort = 14000 cfg.TCPPort = 14001 + cfg.MaxPeers = 30 s, err := NewService(cfg) if err != nil { t.Fatal(err) @@ -91,11 +93,21 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) { s.genesisTime = genesisTime s.genesisValidatorsRoot = make([]byte, 32) s.dv5Listener = lastListener - multiAddrs := s.processPeers(nodes) + addrs := []ma.Multiaddr{} + + for _, n := range nodes { + if s.filterPeer(n) { + addr, err := convertToSingleMultiAddr(n) + if err != nil { + t.Fatal(err) + } + addrs = append(addrs, addr) + } + } // We should not have valid peers if the fork digest mismatched. - if len(multiAddrs) != 0 { - t.Errorf("Expected 0 valid peers, got %d", len(multiAddrs)) + if len(addrs) != 0 { + t.Errorf("Expected 0 valid peers, got %d", len(addrs)) } if err := s.Stop(); err != nil { t.Fatal(err) @@ -104,7 +116,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) { func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) { hook := logTest.NewGlobal() - logrus.SetLevel(logrus.DebugLevel) + logrus.SetLevel(logrus.TraceLevel) port := 2000 ipAddr, pkey := createAddrAndPrivKey(t) genesisTime := time.Now() @@ -171,6 +183,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) { // bootnode given all nodes provided by discv5 will have different fork digests. cfg.UDPPort = 14000 cfg.TCPPort = 14001 + cfg.MaxPeers = 30 s, err := NewService(cfg) if err != nil { t.Fatal(err) @@ -179,8 +192,18 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) { s.genesisTime = genesisTime s.genesisValidatorsRoot = make([]byte, 32) s.dv5Listener = lastListener - multiAddrs := s.processPeers(nodes) - if len(multiAddrs) == 0 { + addrs := []ma.Multiaddr{} + + for _, n := range nodes { + if s.filterPeer(n) { + addr, err := convertToSingleMultiAddr(n) + if err != nil { + t.Fatal(err) + } + addrs = append(addrs, addr) + } + } + if len(addrs) == 0 { t.Error("Expected to have valid peers, got 0") } diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 4e2066b7b..036eea6e3 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -39,7 +39,6 @@ import ( "github.com/prysmaticlabs/prysm/shared" "github.com/prysmaticlabs/prysm/shared/runutil" "github.com/prysmaticlabs/prysm/shared/slotutil" - "github.com/sirupsen/logrus" ) var _ = shared.Service(&Service{}) @@ -438,11 +437,7 @@ func (s *Service) FindPeersWithSubnet(index uint64) (bool, error) { } for _, comIdx := range subnets { if comIdx == index { - multiAddr, err := convertToSingleMultiAddr(node) - if err != nil { - return false, err - } - info, err := peer.AddrInfoFromP2pAddr(multiAddr) + info, multiAddr, err := convertToAddrInfo(node) if err != nil { return false, err } @@ -510,17 +505,30 @@ func (s *Service) awaitStateInitialized() { // listen for new nodes watches for new nodes in the network and adds them to the peerstore. func (s *Service) listenForNewNodes() { - runutil.RunEvery(s.ctx, pollingPeriod, func() { - iterator := s.dv5Listener.RandomNodes() - nodes := enode.ReadNodes(iterator, lookupLimit) - iterator.Close() - multiAddresses := s.processPeers(nodes) - // do not process a large amount than required peers. - if len(multiAddresses) > lookupLimit { - multiAddresses = multiAddresses[:lookupLimit] + iterator := s.dv5Listener.RandomNodes() + iterator = enode.Filter(iterator, s.filterPeer) + defer iterator.Close() + for { + // Exit if service's context is canceled + if s.ctx.Err() != nil { + break } - s.connectWithAllPeers(multiAddresses) - }) + exists := iterator.Next() + if !exists { + break + } + node := iterator.Node() + peerInfo, _, err := convertToAddrInfo(node) + if err != nil { + log.WithError(err).Error("Could not convert to peer info") + continue + } + go func(info *peer.AddrInfo) { + if err := s.connectWithPeer(*info); err != nil { + log.WithError(err).Tracef("Could not connect with peer %s", info.String()) + } + }(peerInfo) + } } func (s *Service) connectWithAllPeers(multiAddrs []ma.Multiaddr) { @@ -540,11 +548,6 @@ func (s *Service) connectWithAllPeers(multiAddrs []ma.Multiaddr) { } func (s *Service) connectWithPeer(info peer.AddrInfo) error { - if len(s.Peers().Active()) >= int(s.cfg.MaxPeers) { - log.WithFields(logrus.Fields{"peer": info.ID.String(), - "reason": "at peer limit"}).Trace("Not dialing peer") - return nil - } if info.ID == s.host.ID() { return nil } @@ -558,57 +561,6 @@ func (s *Service) connectWithPeer(info peer.AddrInfo) error { return nil } -// process new peers that come in from our dht. -func (s *Service) processPeers(nodes []*enode.Node) []ma.Multiaddr { - var multiAddrs []ma.Multiaddr - for _, node := range nodes { - // ignore nodes with no ip address stored. - if node.IP() == nil { - continue - } - // do not dial nodes with their tcp ports not set - if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil { - if !enr.IsNotFound(err) { - log.WithError(err).Error("Could not retrieve tcp port") - } - continue - } - multiAddr, err := convertToSingleMultiAddr(node) - if err != nil { - log.WithError(err).Error("Could not convert to multiAddr") - continue - } - peerData, err := peer.AddrInfoFromP2pAddr(multiAddr) - if err != nil { - log.WithError(err).Error("Could not get peer id") - continue - } - if s.peers.IsBad(peerData.ID) { - continue - } - if s.peers.IsActive(peerData.ID) { - continue - } - if s.host.Network().Connectedness(peerData.ID) == network.Connected { - continue - } - nodeENR := node.Record() - // Decide whether or not to connect to peer that does not - // match the proper fork ENR data with our local node. - if s.genesisValidatorsRoot != nil { - if err := s.compareForkENR(nodeENR); err != nil { - log.WithError(err).Debug("Fork ENR mismatches between peer and local node") - continue - } - } - - // Add peer to peer handler. - s.peers.Add(nodeENR, peerData.ID, multiAddr, network.DirUnknown) - multiAddrs = append(multiAddrs, multiAddr) - } - return multiAddrs -} - func (s *Service) connectToBootnodes() error { nodes := make([]*enode.Node, 0, len(s.cfg.Discv5BootStrapAddr)) for _, addr := range s.cfg.Discv5BootStrapAddr {