prysm-pulse/beacon-chain/p2p/discovery.go
Nishant Das 69e0e302b3
Use Less Intensive Search (#6848)
* use less intensive search
* Add mock method to implement Listener
* Merge refs/heads/master into lessIntensiveSearch
* Merge refs/heads/master into lessIntensiveSearch
2020-08-03 18:10:23 +00:00

407 lines
11 KiB
Go

package p2p
import (
"bytes"
"crypto/ecdsa"
"fmt"
"net"
"strconv"
"time"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
iaddr "github.com/ipfs/go-ipfs-addr"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
)
// Listener defines the discovery V5 network interface that is used
// to communicate with other peers.
type Listener interface {
Self() *enode.Node
Close()
Lookup(enode.ID) []*enode.Node
Resolve(*enode.Node) *enode.Node
RandomNodes() enode.Iterator
Ping(*enode.Node) error
RequestENR(*enode.Node) (*enode.Node, error)
LocalNode() *enode.LocalNode
AllNodes() []*enode.Node
}
// RefreshENR uses an epoch to refresh the enr entry for our node
// with the tracked committee ids for the epoch, allowing our node
// to be dynamically discoverable by others given our tracked committee ids.
func (s *Service) RefreshENR() {
// return early if discv5 isnt running
if s.dv5Listener == nil {
return
}
bitV := bitfield.NewBitvector64()
committees := cache.SubnetIDs.GetAllSubnets()
for _, idx := range committees {
bitV.SetBitAt(idx, true)
}
currentBitV, err := retrieveBitvector(s.dv5Listener.Self().Record())
if err != nil {
log.Errorf("Could not retrieve bitfield: %v", err)
return
}
if bytes.Equal(bitV, currentBitV) {
// return early if bitfield hasn't changed
return
}
s.updateSubnetRecordWithMetadata(bitV)
// ping all peers to inform them of new metadata
s.pingPeers()
}
// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
func (s *Service) listenForNewNodes() {
iterator := s.dv5Listener.RandomNodes()
iterator = enode.Filter(iterator, s.filterPeer)
defer iterator.Close()
for {
// Exit if service's context is canceled
if s.ctx.Err() != nil {
break
}
if s.isPeerAtLimit() {
// Pause the main loop for a period to stop looking
// for new peers.
log.Trace("Not looking for peers, at peer limit")
time.Sleep(pollingPeriod)
continue
}
exists := iterator.Next()
if !exists {
break
}
node := iterator.Node()
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
continue
}
go func(info *peer.AddrInfo) {
if err := s.connectWithPeer(*info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
}(peerInfo)
}
}
func (s *Service) createListener(
ipAddr net.IP,
privKey *ecdsa.PrivateKey,
) (*discover.UDPv5, error) {
// assume ip is either ipv4 or ipv6
networkVersion := ""
if ipAddr.To4() != nil {
networkVersion = "udp4"
} else {
networkVersion = "udp6"
}
// Check for the real local address which may
// be different in the presence of virtual networks.
ipAddr = s.localAddress(networkVersion, ipAddr)
// If local ip is specified then use that instead.
if s.cfg.LocalIP != "" {
ipAddr = net.ParseIP(s.cfg.LocalIP)
if ipAddr == nil {
return nil, errors.New("invalid local ip provided")
}
}
udpAddr := &net.UDPAddr{
IP: ipAddr,
Port: int(s.cfg.UDPPort),
}
conn, err := net.ListenUDP(networkVersion, udpAddr)
if err != nil {
return nil, errors.Wrap(err, "could not listen to UDP")
}
localNode, err := s.createLocalNode(
privKey,
ipAddr,
int(s.cfg.UDPPort),
int(s.cfg.TCPPort),
)
if err != nil {
return nil, errors.Wrap(err, "could not create local node")
}
if s.cfg.HostAddress != "" {
hostIP := net.ParseIP(s.cfg.HostAddress)
if hostIP.To4() == nil && hostIP.To16() == nil {
log.Errorf("Invalid host address given: %s", hostIP.String())
} else {
localNode.SetFallbackIP(hostIP)
localNode.SetStaticIP(hostIP)
}
}
dv5Cfg := discover.Config{
PrivateKey: privKey,
ValidSchemes: enode.ValidSchemes,
}
dv5Cfg.Bootnodes = []*enode.Node{}
for _, addr := range s.cfg.Discv5BootStrapAddr {
bootNode, err := enode.Parse(enode.ValidSchemes, addr)
if err != nil {
return nil, errors.Wrap(err, "could not bootstrap addr")
}
dv5Cfg.Bootnodes = append(dv5Cfg.Bootnodes, bootNode)
}
network, err := discover.ListenV5(conn, localNode, dv5Cfg)
if err != nil {
return nil, errors.Wrap(err, "could not listen to discV5")
}
return network, nil
}
func (s *Service) createLocalNode(
privKey *ecdsa.PrivateKey,
ipAddr net.IP,
udpPort int,
tcpPort int,
) (*enode.LocalNode, error) {
db, err := enode.OpenDB("")
if err != nil {
return nil, errors.Wrap(err, "could not open node's peer database")
}
localNode := enode.NewLocalNode(db, privKey)
ipEntry := enr.IP(ipAddr)
udpEntry := enr.UDP(udpPort)
tcpEntry := enr.TCP(tcpPort)
localNode.Set(ipEntry)
localNode.Set(udpEntry)
localNode.Set(tcpEntry)
localNode.SetFallbackIP(ipAddr)
localNode.SetFallbackUDP(udpPort)
localNode, err = addForkEntry(localNode, s.genesisTime, s.genesisValidatorsRoot)
if err != nil {
return nil, errors.Wrap(err, "could not add eth2 fork version entry to enr")
}
return intializeAttSubnets(localNode), nil
}
func (s *Service) startDiscoveryV5(
addr net.IP,
privKey *ecdsa.PrivateKey,
) (*discover.UDPv5, error) {
listener, err := s.createListener(addr, privKey)
if err != nil {
return nil, errors.Wrap(err, "could not create listener")
}
record := listener.Self()
log.WithField("ENR", record.String()).Info("Started discovery v5")
return listener, nil
}
// filterPeer validates each node that we retrieve from our dht. We
// try to ascertain that the peer can be a valid protocol peer.
// Validity Conditions:
// 1) The local node is still actively looking for peers to
// connect to.
// 2) Peer has a valid IP and TCP port set in their enr.
// 3) Peer hasn't been marked as 'bad'
// 4) Peer is not currently active or connected.
// 5) Peer's fork digest in their ENR matches that of
// our localnodes.
func (s *Service) filterPeer(node *enode.Node) bool {
// ignore nodes with no ip address stored.
if node.IP() == nil {
return false
}
// do not dial nodes with their tcp ports not set
if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Debug("Could not retrieve tcp port")
}
return false
}
peerData, multiAddr, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Debug("Could not convert to peer data")
return false
}
if s.peers.IsBad(peerData.ID) {
return false
}
if s.peers.IsActive(peerData.ID) {
return false
}
if s.host.Network().Connectedness(peerData.ID) == network.Connected {
return false
}
nodeENR := node.Record()
// Decide whether or not to connect to peer that does not
// match the proper fork ENR data with our local node.
if s.genesisValidatorsRoot != nil {
if err := s.compareForkENR(nodeENR); err != nil {
log.WithError(err).Trace("Fork ENR mismatches between peer and local node")
return false
}
}
// Add peer to peer handler.
s.peers.Add(nodeENR, peerData.ID, multiAddr, network.DirUnknown)
return true
}
// This checks our set max peers in our config, and
// determines whether our currently connected and
// active peers are above our set max peer limit.
func (s *Service) isPeerAtLimit() bool {
numOfConns := len(s.host.Network().Peers())
maxPeers := int(s.cfg.MaxPeers)
activePeers := len(s.Peers().Active())
return activePeers >= maxPeers || numOfConns >= maxPeers
}
// retrieve real local address of the node. In the event
// that is not possible we return the provided ip.
func (s *Service) localAddress(network string, addr net.IP) net.IP {
if len(s.cfg.BootstrapNodeAddr) == 0 {
return addr
}
// Dial the first bootnode to determine our 'real' local address.
bootNode, err := enode.Parse(enode.ValidSchemes, s.cfg.BootstrapNodeAddr[0])
if err != nil {
log.Error("Could not parse bootnode address")
return addr
}
conn, err := net.DialTimeout(network, net.JoinHostPort(bootNode.IP().String(), strconv.Itoa(bootNode.UDP())), dialTimeout)
if err != nil {
log.Error("Could not dial remote peer")
return addr
}
defer func() {
if err := conn.Close(); err != nil {
log.Error(err)
}
}()
// Determine the real address from which the initial connection was made.
realAddr, _, err := net.SplitHostPort(conn.LocalAddr().String())
if err != nil {
log.Error("Could not dial remote peer")
return addr
}
return net.ParseIP(realAddr)
}
func parseBootStrapAddrs(addrs []string) (discv5Nodes []string) {
discv5Nodes, _ = parseGenericAddrs(addrs)
if len(discv5Nodes) == 0 {
log.Warn("No bootstrap addresses supplied")
}
return discv5Nodes
}
func parseGenericAddrs(addrs []string) (enodeString []string, multiAddrString []string) {
for _, addr := range addrs {
if addr == "" {
// Ignore empty entries
continue
}
_, err := enode.Parse(enode.ValidSchemes, addr)
if err == nil {
enodeString = append(enodeString, addr)
continue
}
_, err = multiAddrFromString(addr)
if err == nil {
multiAddrString = append(multiAddrString, addr)
continue
}
log.Errorf("Invalid address of %s provided: %v", addr, err)
}
return enodeString, multiAddrString
}
func convertToMultiAddr(nodes []*enode.Node) []ma.Multiaddr {
var multiAddrs []ma.Multiaddr
for _, node := range nodes {
// ignore nodes with no ip address stored
if node.IP() == nil {
continue
}
multiAddr, err := convertToSingleMultiAddr(node)
if err != nil {
log.WithError(err).Error("Could not convert to multiAddr")
continue
}
multiAddrs = append(multiAddrs, multiAddr)
}
return multiAddrs
}
func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, ma.Multiaddr, error) {
multiAddr, err := convertToSingleMultiAddr(node)
if err != nil {
return nil, nil, err
}
info, err := peer.AddrInfoFromP2pAddr(multiAddr)
if err != nil {
return nil, nil, err
}
return info, multiAddr, nil
}
func convertToSingleMultiAddr(node *enode.Node) (ma.Multiaddr, error) {
ip4 := node.IP().To4()
if ip4 == nil {
return nil, errors.Errorf("node doesn't have an ip4 address, it's stated IP is %s", node.IP().String())
}
pubkey := node.Pubkey()
assertedKey := convertToInterfacePubkey(pubkey)
id, err := peer.IDFromPublicKey(assertedKey)
if err != nil {
return nil, errors.Wrap(err, "could not get peer id")
}
multiAddrString := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ip4.String(), node.TCP(), id)
multiAddr, err := ma.NewMultiaddr(multiAddrString)
if err != nil {
return nil, errors.Wrap(err, "could not get multiaddr")
}
return multiAddr, nil
}
func peersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) {
var allAddrs []ma.Multiaddr
enodeString, multiAddrString := parseGenericAddrs(addrs)
for _, stringAddr := range multiAddrString {
addr, err := multiAddrFromString(stringAddr)
if err != nil {
return nil, errors.Wrapf(err, "Could not get multiaddr from string")
}
allAddrs = append(allAddrs, addr)
}
for _, stringAddr := range enodeString {
enodeAddr, err := enode.Parse(enode.ValidSchemes, stringAddr)
if err != nil {
return nil, errors.Wrapf(err, "Could not get enode from string")
}
addr, err := convertToSingleMultiAddr(enodeAddr)
if err != nil {
return nil, errors.Wrapf(err, "Could not get multiaddr")
}
allAddrs = append(allAddrs, addr)
}
return allAddrs, nil
}
func multiAddrFromString(address string) (ma.Multiaddr, error) {
addr, err := iaddr.ParseString(address)
if err != nil {
return nil, err
}
return addr.Multiaddr(), nil
}