mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-25 21:07:18 +00:00
d281ef9c56
* clean up * cleanup * fix * fix tests * change * deepsource * fix test
181 lines
6.1 KiB
Go
181 lines
6.1 KiB
Go
package p2p
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p-core/network"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
|
|
"github.com/prysmaticlabs/prysm/shared/timeutils"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
// The time to wait for a status request.
|
|
timeForStatus = 10 * time.Second
|
|
)
|
|
|
|
func peerMultiaddrString(conn network.Conn) string {
|
|
return fmt.Sprintf("%s/p2p/%s", conn.RemoteMultiaddr().String(), conn.RemotePeer().String())
|
|
}
|
|
|
|
// AddConnectionHandler adds a callback function which handles the connection with a
|
|
// newly added peer. It performs a handshake with that peer by sending a hello request
|
|
// and validating the response from the peer.
|
|
func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Context, id peer.ID) error) {
|
|
// Peer map and lock to keep track of current connection attempts.
|
|
peerMap := make(map[peer.ID]bool)
|
|
peerLock := new(sync.Mutex)
|
|
|
|
// This is run at the start of each connection attempt, to ensure
|
|
// that there aren't multiple inflight connection requests for the
|
|
// same peer at once.
|
|
peerHandshaking := func(id peer.ID) bool {
|
|
peerLock.Lock()
|
|
defer peerLock.Unlock()
|
|
|
|
if peerMap[id] {
|
|
repeatPeerConnections.Inc()
|
|
return true
|
|
}
|
|
|
|
peerMap[id] = true
|
|
return false
|
|
}
|
|
|
|
peerFinished := func(id peer.ID) {
|
|
peerLock.Lock()
|
|
defer peerLock.Unlock()
|
|
|
|
delete(peerMap, id)
|
|
}
|
|
|
|
s.host.Network().Notify(&network.NotifyBundle{
|
|
ConnectedF: func(net network.Network, conn network.Conn) {
|
|
remotePeer := conn.RemotePeer()
|
|
disconnectFromPeer := func() {
|
|
s.peers.SetConnectionState(remotePeer, peers.PeerDisconnecting)
|
|
// Only attempt a goodbye if we are still connected to the peer.
|
|
if s.host.Network().Connectedness(remotePeer) == network.Connected {
|
|
if err := goodByeFunc(context.TODO(), remotePeer); err != nil {
|
|
log.WithError(err).Error("Unable to disconnect from peer")
|
|
}
|
|
}
|
|
s.peers.SetConnectionState(remotePeer, peers.PeerDisconnected)
|
|
}
|
|
// Connection handler must be non-blocking as part of libp2p design.
|
|
go func() {
|
|
if peerHandshaking(remotePeer) {
|
|
// Exit this if there is already another connection
|
|
// request in flight.
|
|
return
|
|
}
|
|
defer peerFinished(remotePeer)
|
|
// Handle the various pre-existing conditions that will result in us not handshaking.
|
|
peerConnectionState, err := s.peers.ConnectionState(remotePeer)
|
|
if err == nil && (peerConnectionState == peers.PeerConnected || peerConnectionState == peers.PeerConnecting) {
|
|
log.WithField("currentState", peerConnectionState).WithField("reason", "already active").Trace("Ignoring connection request")
|
|
return
|
|
}
|
|
s.peers.Add(nil /* ENR */, remotePeer, conn.RemoteMultiaddr(), conn.Stat().Direction)
|
|
// Defensive check in the event we still get a bad peer.
|
|
if s.peers.IsBad(remotePeer) {
|
|
log.WithField("reason", "bad peer").Trace("Ignoring connection request")
|
|
disconnectFromPeer()
|
|
return
|
|
}
|
|
validPeerConnection := func() {
|
|
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected)
|
|
// Go through the handshake process.
|
|
log.WithFields(logrus.Fields{
|
|
"direction": conn.Stat().Direction,
|
|
"multiAddr": peerMultiaddrString(conn),
|
|
"activePeers": len(s.peers.Active()),
|
|
}).Debug("Peer connected")
|
|
}
|
|
|
|
// Do not perform handshake on inbound dials.
|
|
if conn.Stat().Direction == network.DirInbound {
|
|
_, err := s.peers.ChainState(remotePeer)
|
|
peerExists := err == nil
|
|
currentTime := timeutils.Now()
|
|
|
|
// Wait for peer to initiate handshake
|
|
time.Sleep(timeForStatus)
|
|
|
|
// Exit if we are disconnected with the peer.
|
|
if s.host.Network().Connectedness(remotePeer) != network.Connected {
|
|
return
|
|
}
|
|
|
|
// If peer hasn't sent a status request, we disconnect with them
|
|
if _, err := s.peers.ChainState(remotePeer); errors.Is(err, peerdata.ErrPeerUnknown) {
|
|
disconnectFromPeer()
|
|
return
|
|
}
|
|
if peerExists {
|
|
updated, err := s.peers.ChainStateLastUpdated(remotePeer)
|
|
if err != nil {
|
|
disconnectFromPeer()
|
|
return
|
|
}
|
|
// exit if we don't receive any current status messages from
|
|
// peer.
|
|
if updated.IsZero() || !updated.After(currentTime) {
|
|
disconnectFromPeer()
|
|
return
|
|
}
|
|
}
|
|
validPeerConnection()
|
|
return
|
|
}
|
|
|
|
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting)
|
|
if err := reqFunc(context.TODO(), conn.RemotePeer()); err != nil && err != io.EOF {
|
|
log.WithError(err).Trace("Handshake failed")
|
|
disconnectFromPeer()
|
|
return
|
|
}
|
|
validPeerConnection()
|
|
}()
|
|
},
|
|
})
|
|
}
|
|
|
|
// AddDisconnectionHandler disconnects from peers. It handles updating the peer status.
|
|
// This also calls the handler responsible for maintaining other parts of the sync or p2p system.
|
|
func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id peer.ID) error) {
|
|
s.host.Network().Notify(&network.NotifyBundle{
|
|
DisconnectedF: func(net network.Network, conn network.Conn) {
|
|
log := log.WithField("multiAddr", peerMultiaddrString(conn))
|
|
// Must be handled in a goroutine as this callback cannot be blocking.
|
|
go func() {
|
|
// Exit early if we are still connected to the peer.
|
|
if net.Connectedness(conn.RemotePeer()) == network.Connected {
|
|
return
|
|
}
|
|
priorState, err := s.peers.ConnectionState(conn.RemotePeer())
|
|
if err != nil {
|
|
// Can happen if the peer has already disconnected, so...
|
|
priorState = peers.PeerDisconnected
|
|
}
|
|
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting)
|
|
if err := handler(context.TODO(), conn.RemotePeer()); err != nil {
|
|
log.WithError(err).Error("Disconnect handler failed")
|
|
}
|
|
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
|
|
// Only log disconnections if we were fully connected.
|
|
if priorState == peers.PeerConnected {
|
|
log.WithField("activePeers", len(s.peers.Active())).Debug("Peer disconnected")
|
|
}
|
|
}()
|
|
},
|
|
})
|
|
}
|