Clean Up Discovery Filtering of Peers (#6128)

* clean up

* fix test

* rename

* context

* preston's review

* remove dep

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Nishant Das 2020-06-06 11:46:36 +08:00 committed by GitHub
parent a1e3fc9500
commit 0b70c3ea85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 122 additions and 78 deletions

View File

@ -11,9 +11,11 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
iaddr "github.com/ipfs/go-ipfs-addr"
core "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// Listener defines the discovery V5 network interface that is used
@ -122,6 +124,61 @@ func (s *Service) startDiscoveryV5(
return listener, nil
}
// filterPeer validates each node that we retrieve from our dht. We
// try to ascertain that the peer can be a valid protocol peer.
// Validity Conditions:
// 1) The local node is still actively looking for peers to
// connect to.
// 2) Peer has a valid IP and TCP port set in their enr.
// 3) Peer hasn't been marked as 'bad'
// 4) Peer is not currently active or connected.
// 5) Peer's fork digest in their ENR matches that of
// our localnodes.
func (s *Service) filterPeer(node *enode.Node) bool {
if len(s.Peers().Active()) >= int(s.cfg.MaxPeers) {
log.WithFields(logrus.Fields{"peer": node.String(),
"reason": "at peer limit"}).Trace("Not dialing peer")
return false
}
// ignore nodes with no ip address stored.
if node.IP() == nil {
return false
}
// do not dial nodes with their tcp ports not set
if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Debug("Could not retrieve tcp port")
}
return false
}
peerData, multiAddr, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Debug("Could not convert to peer data")
return false
}
if s.peers.IsBad(peerData.ID) {
return false
}
if s.peers.IsActive(peerData.ID) {
return false
}
if s.host.Network().Connectedness(peerData.ID) == network.Connected {
return false
}
nodeENR := node.Record()
// Decide whether or not to connect to peer that does not
// match the proper fork ENR data with our local node.
if s.genesisValidatorsRoot != nil {
if err := s.compareForkENR(nodeENR); err != nil {
log.WithError(err).Trace("Fork ENR mismatches between peer and local node")
return false
}
}
// Add peer to peer handler.
s.peers.Add(nodeENR, peerData.ID, multiAddr, network.DirUnknown)
return true
}
// startDHTDiscovery supports discovery via DHT.
func startDHTDiscovery(host core.Host, bootstrapAddr string) error {
multiAddr, err := multiAddrFromString(bootstrapAddr)
@ -182,6 +239,18 @@ func convertToMultiAddr(nodes []*enode.Node) []ma.Multiaddr {
return multiAddrs
}
func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, ma.Multiaddr, error) {
multiAddr, err := convertToSingleMultiAddr(node)
if err != nil {
return nil, nil, err
}
info, err := peer.AddrInfoFromP2pAddr(multiAddr)
if err != nil {
return nil, nil, err
}
return info, multiAddr, nil
}
func convertToSingleMultiAddr(node *enode.Node) (ma.Multiaddr, error) {
ip4 := node.IP().To4()
if ip4 == nil {

View File

@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
ma "github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/p2putils"
@ -84,6 +85,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {
// bootnode given all nodes provided by discv5 will have different fork digests.
cfg.UDPPort = 14000
cfg.TCPPort = 14001
cfg.MaxPeers = 30
s, err := NewService(cfg)
if err != nil {
t.Fatal(err)
@ -91,11 +93,21 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {
s.genesisTime = genesisTime
s.genesisValidatorsRoot = make([]byte, 32)
s.dv5Listener = lastListener
multiAddrs := s.processPeers(nodes)
addrs := []ma.Multiaddr{}
for _, n := range nodes {
if s.filterPeer(n) {
addr, err := convertToSingleMultiAddr(n)
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, addr)
}
}
// We should not have valid peers if the fork digest mismatched.
if len(multiAddrs) != 0 {
t.Errorf("Expected 0 valid peers, got %d", len(multiAddrs))
if len(addrs) != 0 {
t.Errorf("Expected 0 valid peers, got %d", len(addrs))
}
if err := s.Stop(); err != nil {
t.Fatal(err)
@ -104,7 +116,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {
func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
hook := logTest.NewGlobal()
logrus.SetLevel(logrus.DebugLevel)
logrus.SetLevel(logrus.TraceLevel)
port := 2000
ipAddr, pkey := createAddrAndPrivKey(t)
genesisTime := time.Now()
@ -171,6 +183,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
// bootnode given all nodes provided by discv5 will have different fork digests.
cfg.UDPPort = 14000
cfg.TCPPort = 14001
cfg.MaxPeers = 30
s, err := NewService(cfg)
if err != nil {
t.Fatal(err)
@ -179,8 +192,18 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
s.genesisTime = genesisTime
s.genesisValidatorsRoot = make([]byte, 32)
s.dv5Listener = lastListener
multiAddrs := s.processPeers(nodes)
if len(multiAddrs) == 0 {
addrs := []ma.Multiaddr{}
for _, n := range nodes {
if s.filterPeer(n) {
addr, err := convertToSingleMultiAddr(n)
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, addr)
}
}
if len(addrs) == 0 {
t.Error("Expected to have valid peers, got 0")
}

View File

@ -39,7 +39,6 @@ import (
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/sirupsen/logrus"
)
var _ = shared.Service(&Service{})
@ -438,11 +437,7 @@ func (s *Service) FindPeersWithSubnet(index uint64) (bool, error) {
}
for _, comIdx := range subnets {
if comIdx == index {
multiAddr, err := convertToSingleMultiAddr(node)
if err != nil {
return false, err
}
info, err := peer.AddrInfoFromP2pAddr(multiAddr)
info, multiAddr, err := convertToAddrInfo(node)
if err != nil {
return false, err
}
@ -510,17 +505,30 @@ func (s *Service) awaitStateInitialized() {
// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
func (s *Service) listenForNewNodes() {
runutil.RunEvery(s.ctx, pollingPeriod, func() {
iterator := s.dv5Listener.RandomNodes()
nodes := enode.ReadNodes(iterator, lookupLimit)
iterator.Close()
multiAddresses := s.processPeers(nodes)
// do not process a large amount than required peers.
if len(multiAddresses) > lookupLimit {
multiAddresses = multiAddresses[:lookupLimit]
iterator := s.dv5Listener.RandomNodes()
iterator = enode.Filter(iterator, s.filterPeer)
defer iterator.Close()
for {
// Exit if service's context is canceled
if s.ctx.Err() != nil {
break
}
s.connectWithAllPeers(multiAddresses)
})
exists := iterator.Next()
if !exists {
break
}
node := iterator.Node()
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
continue
}
go func(info *peer.AddrInfo) {
if err := s.connectWithPeer(*info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
}(peerInfo)
}
}
func (s *Service) connectWithAllPeers(multiAddrs []ma.Multiaddr) {
@ -540,11 +548,6 @@ func (s *Service) connectWithAllPeers(multiAddrs []ma.Multiaddr) {
}
func (s *Service) connectWithPeer(info peer.AddrInfo) error {
if len(s.Peers().Active()) >= int(s.cfg.MaxPeers) {
log.WithFields(logrus.Fields{"peer": info.ID.String(),
"reason": "at peer limit"}).Trace("Not dialing peer")
return nil
}
if info.ID == s.host.ID() {
return nil
}
@ -558,57 +561,6 @@ func (s *Service) connectWithPeer(info peer.AddrInfo) error {
return nil
}
// process new peers that come in from our dht.
func (s *Service) processPeers(nodes []*enode.Node) []ma.Multiaddr {
var multiAddrs []ma.Multiaddr
for _, node := range nodes {
// ignore nodes with no ip address stored.
if node.IP() == nil {
continue
}
// do not dial nodes with their tcp ports not set
if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Error("Could not retrieve tcp port")
}
continue
}
multiAddr, err := convertToSingleMultiAddr(node)
if err != nil {
log.WithError(err).Error("Could not convert to multiAddr")
continue
}
peerData, err := peer.AddrInfoFromP2pAddr(multiAddr)
if err != nil {
log.WithError(err).Error("Could not get peer id")
continue
}
if s.peers.IsBad(peerData.ID) {
continue
}
if s.peers.IsActive(peerData.ID) {
continue
}
if s.host.Network().Connectedness(peerData.ID) == network.Connected {
continue
}
nodeENR := node.Record()
// Decide whether or not to connect to peer that does not
// match the proper fork ENR data with our local node.
if s.genesisValidatorsRoot != nil {
if err := s.compareForkENR(nodeENR); err != nil {
log.WithError(err).Debug("Fork ENR mismatches between peer and local node")
continue
}
}
// Add peer to peer handler.
s.peers.Add(nodeENR, peerData.ID, multiAddr, network.DirUnknown)
multiAddrs = append(multiAddrs, multiAddr)
}
return multiAddrs
}
func (s *Service) connectToBootnodes() error {
nodes := make([]*enode.Node, 0, len(s.cfg.Discv5BootStrapAddr))
for _, addr := range s.cfg.Discv5BootStrapAddr {