mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-17 15:28:45 +00:00
caf61bd824
* use gater * Merge branch 'master' into connGater * Update beacon-chain/p2p/testing/p2p.go * fmt * gaz
307 lines
8.3 KiB
Go
307 lines
8.3 KiB
Go
package p2p
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"fmt"
|
|
"net"
|
|
|
|
"github.com/ethereum/go-ethereum/p2p/discover"
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
"github.com/ethereum/go-ethereum/p2p/enr"
|
|
iaddr "github.com/ipfs/go-ipfs-addr"
|
|
core "github.com/libp2p/go-libp2p-core"
|
|
"github.com/libp2p/go-libp2p-core/network"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// Listener defines the discovery V5 network interface that is used
|
|
// to communicate with other peers.
|
|
type Listener interface {
|
|
Self() *enode.Node
|
|
Close()
|
|
Lookup(enode.ID) []*enode.Node
|
|
Resolve(*enode.Node) *enode.Node
|
|
RandomNodes() enode.Iterator
|
|
Ping(*enode.Node) error
|
|
RequestENR(*enode.Node) (*enode.Node, error)
|
|
LocalNode() *enode.LocalNode
|
|
}
|
|
|
|
func (s *Service) createListener(
|
|
ipAddr net.IP,
|
|
privKey *ecdsa.PrivateKey,
|
|
) *discover.UDPv5 {
|
|
udpAddr := &net.UDPAddr{
|
|
IP: ipAddr,
|
|
Port: int(s.cfg.UDPPort),
|
|
}
|
|
// assume ip is either ipv4 or ipv6
|
|
networkVersion := ""
|
|
if ipAddr.To4() != nil {
|
|
networkVersion = "udp4"
|
|
} else {
|
|
networkVersion = "udp6"
|
|
}
|
|
conn, err := net.ListenUDP(networkVersion, udpAddr)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
localNode, err := s.createLocalNode(
|
|
privKey,
|
|
ipAddr,
|
|
int(s.cfg.UDPPort),
|
|
int(s.cfg.TCPPort),
|
|
)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
if s.cfg.HostAddress != "" {
|
|
hostIP := net.ParseIP(s.cfg.HostAddress)
|
|
if hostIP.To4() == nil && hostIP.To16() == nil {
|
|
log.Errorf("Invalid host address given: %s", hostIP.String())
|
|
} else {
|
|
localNode.SetFallbackIP(hostIP)
|
|
localNode.SetStaticIP(hostIP)
|
|
}
|
|
}
|
|
dv5Cfg := discover.Config{
|
|
PrivateKey: privKey,
|
|
}
|
|
dv5Cfg.Bootnodes = []*enode.Node{}
|
|
for _, addr := range s.cfg.Discv5BootStrapAddr {
|
|
bootNode, err := enode.Parse(enode.ValidSchemes, addr)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
dv5Cfg.Bootnodes = append(dv5Cfg.Bootnodes, bootNode)
|
|
}
|
|
|
|
network, err := discover.ListenV5(conn, localNode, dv5Cfg)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
return network
|
|
}
|
|
|
|
func (s *Service) createLocalNode(
|
|
privKey *ecdsa.PrivateKey,
|
|
ipAddr net.IP,
|
|
udpPort int,
|
|
tcpPort int,
|
|
) (*enode.LocalNode, error) {
|
|
db, err := enode.OpenDB("")
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not open node's peer database")
|
|
}
|
|
localNode := enode.NewLocalNode(db, privKey)
|
|
ipEntry := enr.IP(ipAddr)
|
|
udpEntry := enr.UDP(udpPort)
|
|
tcpEntry := enr.TCP(tcpPort)
|
|
localNode.Set(ipEntry)
|
|
localNode.Set(udpEntry)
|
|
localNode.Set(tcpEntry)
|
|
localNode.SetFallbackIP(ipAddr)
|
|
localNode.SetFallbackUDP(udpPort)
|
|
|
|
localNode, err = addForkEntry(localNode, s.genesisTime, s.genesisValidatorsRoot)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not add eth2 fork version entry to enr")
|
|
}
|
|
return intializeAttSubnets(localNode), nil
|
|
}
|
|
|
|
func (s *Service) startDiscoveryV5(
|
|
addr net.IP,
|
|
privKey *ecdsa.PrivateKey,
|
|
) (*discover.UDPv5, error) {
|
|
listener := s.createListener(addr, privKey)
|
|
record := listener.Self()
|
|
log.WithField("ENR", record.String()).Info("Started discovery v5")
|
|
return listener, nil
|
|
}
|
|
|
|
// filterPeer validates each node that we retrieve from our dht. We
|
|
// try to ascertain that the peer can be a valid protocol peer.
|
|
// Validity Conditions:
|
|
// 1) The local node is still actively looking for peers to
|
|
// connect to.
|
|
// 2) Peer has a valid IP and TCP port set in their enr.
|
|
// 3) Peer hasn't been marked as 'bad'
|
|
// 4) Peer is not currently active or connected.
|
|
// 5) Peer's fork digest in their ENR matches that of
|
|
// our localnodes.
|
|
func (s *Service) filterPeer(node *enode.Node) bool {
|
|
numOfConns := len(s.host.Network().Peers())
|
|
maxPeers := int(s.cfg.MaxPeers)
|
|
activePeers := len(s.Peers().Active())
|
|
if activePeers >= maxPeers || numOfConns >= maxPeers {
|
|
log.WithFields(logrus.Fields{"peer": node.String(),
|
|
"reason": "at peer limit"}).Trace("Not dialing peer")
|
|
return false
|
|
}
|
|
// ignore nodes with no ip address stored.
|
|
if node.IP() == nil {
|
|
return false
|
|
}
|
|
// do not dial nodes with their tcp ports not set
|
|
if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
|
|
if !enr.IsNotFound(err) {
|
|
log.WithError(err).Debug("Could not retrieve tcp port")
|
|
}
|
|
return false
|
|
}
|
|
peerData, multiAddr, err := convertToAddrInfo(node)
|
|
if err != nil {
|
|
log.WithError(err).Debug("Could not convert to peer data")
|
|
return false
|
|
}
|
|
if s.peers.IsBad(peerData.ID) {
|
|
return false
|
|
}
|
|
if s.peers.IsActive(peerData.ID) {
|
|
return false
|
|
}
|
|
if s.host.Network().Connectedness(peerData.ID) == network.Connected {
|
|
return false
|
|
}
|
|
nodeENR := node.Record()
|
|
// Decide whether or not to connect to peer that does not
|
|
// match the proper fork ENR data with our local node.
|
|
if s.genesisValidatorsRoot != nil {
|
|
if err := s.compareForkENR(nodeENR); err != nil {
|
|
log.WithError(err).Trace("Fork ENR mismatches between peer and local node")
|
|
return false
|
|
}
|
|
}
|
|
// Add peer to peer handler.
|
|
s.peers.Add(nodeENR, peerData.ID, multiAddr, network.DirUnknown)
|
|
return true
|
|
}
|
|
|
|
// startDHTDiscovery supports discovery via DHT.
|
|
func startDHTDiscovery(host core.Host, bootstrapAddr string) error {
|
|
multiAddr, err := multiAddrFromString(bootstrapAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
peerInfo, err := peer.AddrInfoFromP2pAddr(multiAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = host.Connect(context.Background(), *peerInfo)
|
|
return err
|
|
}
|
|
|
|
func parseBootStrapAddrs(addrs []string) (discv5Nodes []string, kadDHTNodes []string) {
|
|
discv5Nodes, kadDHTNodes = parseGenericAddrs(addrs)
|
|
if len(discv5Nodes) == 0 && len(kadDHTNodes) == 0 {
|
|
log.Warn("No bootstrap addresses supplied")
|
|
}
|
|
return discv5Nodes, kadDHTNodes
|
|
}
|
|
|
|
func parseGenericAddrs(addrs []string) (enodeString []string, multiAddrString []string) {
|
|
for _, addr := range addrs {
|
|
if addr == "" {
|
|
// Ignore empty entries
|
|
continue
|
|
}
|
|
_, err := enode.Parse(enode.ValidSchemes, addr)
|
|
if err == nil {
|
|
enodeString = append(enodeString, addr)
|
|
continue
|
|
}
|
|
_, err = multiAddrFromString(addr)
|
|
if err == nil {
|
|
multiAddrString = append(multiAddrString, addr)
|
|
continue
|
|
}
|
|
log.Errorf("Invalid address of %s provided", addr)
|
|
}
|
|
return enodeString, multiAddrString
|
|
}
|
|
|
|
func convertToMultiAddr(nodes []*enode.Node) []ma.Multiaddr {
|
|
var multiAddrs []ma.Multiaddr
|
|
for _, node := range nodes {
|
|
// ignore nodes with no ip address stored
|
|
if node.IP() == nil {
|
|
continue
|
|
}
|
|
multiAddr, err := convertToSingleMultiAddr(node)
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not convert to multiAddr")
|
|
continue
|
|
}
|
|
multiAddrs = append(multiAddrs, multiAddr)
|
|
}
|
|
return multiAddrs
|
|
}
|
|
|
|
func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, ma.Multiaddr, error) {
|
|
multiAddr, err := convertToSingleMultiAddr(node)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
info, err := peer.AddrInfoFromP2pAddr(multiAddr)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return info, multiAddr, nil
|
|
}
|
|
|
|
func convertToSingleMultiAddr(node *enode.Node) (ma.Multiaddr, error) {
|
|
ip4 := node.IP().To4()
|
|
if ip4 == nil {
|
|
return nil, errors.Errorf("node doesn't have an ip4 address, it's stated IP is %s", node.IP().String())
|
|
}
|
|
pubkey := node.Pubkey()
|
|
assertedKey := convertToInterfacePubkey(pubkey)
|
|
id, err := peer.IDFromPublicKey(assertedKey)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not get peer id")
|
|
}
|
|
multiAddrString := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", ip4.String(), node.TCP(), id)
|
|
multiAddr, err := ma.NewMultiaddr(multiAddrString)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not get multiaddr")
|
|
}
|
|
return multiAddr, nil
|
|
}
|
|
|
|
func peersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) {
|
|
var allAddrs []ma.Multiaddr
|
|
enodeString, multiAddrString := parseGenericAddrs(addrs)
|
|
for _, stringAddr := range multiAddrString {
|
|
addr, err := multiAddrFromString(stringAddr)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "Could not get multiaddr from string")
|
|
}
|
|
allAddrs = append(allAddrs, addr)
|
|
}
|
|
for _, stringAddr := range enodeString {
|
|
enodeAddr, err := enode.Parse(enode.ValidSchemes, stringAddr)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "Could not get enode from string")
|
|
}
|
|
addr, err := convertToSingleMultiAddr(enodeAddr)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "Could not get multiaddr")
|
|
}
|
|
allAddrs = append(allAddrs, addr)
|
|
}
|
|
return allAddrs, nil
|
|
}
|
|
|
|
func multiAddrFromString(address string) (ma.Multiaddr, error) {
|
|
addr, err := iaddr.ParseString(address)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return addr.Multiaddr(), nil
|
|
}
|