mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-14 22:18:20 +00:00
719e99ffd9
* remove roughtime * change all references * rename Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
559 lines
17 KiB
Go
559 lines
17 KiB
Go
// Package peers provides information about peers at the eth2 protocol level.
|
|
// "Protocol level" is the level above the network level, so this layer never sees or interacts with (for example) hosts that are
|
|
// uncontactable due to being down, firewalled, etc. Instead, this works with peers that are contactable but may or may not be of
|
|
// the correct fork version, not currently required due to the number of current connections, etc.
|
|
//
|
|
// A peer can have one of a number of states:
|
|
//
|
|
// - connected if we are able to talk to the remote peer
|
|
// - connecting if we are attempting to be able to talk to the remote peer
|
|
// - disconnecting if we are attempting to stop being able to talk to the remote peer
|
|
// - disconnected if we are not able to talk to the remote peer
|
|
//
|
|
// For convenience, there are two aggregate states expressed in functions:
|
|
//
|
|
// - active if we are connecting or connected
|
|
// - inactive if we are disconnecting or disconnected
|
|
//
|
|
// Peer information is persistent for the run of the service. This allows for collection of useful long-term statistics such as
|
|
// number of bad responses obtained from the peer, giving the basis for decisions to not talk to known-bad peers.
|
|
package peers
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/p2p/enr"
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/libp2p/go-libp2p-core/network"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
|
"github.com/prysmaticlabs/go-bitfield"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
|
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
|
"github.com/prysmaticlabs/prysm/shared/params"
|
|
"github.com/prysmaticlabs/prysm/shared/timeutils"
|
|
)
|
|
|
|
// PeerConnectionState is the state of the connection.
|
|
type PeerConnectionState ethpb.ConnectionState
|
|
|
|
const (
|
|
// PeerDisconnected means there is no connection to the peer.
|
|
PeerDisconnected PeerConnectionState = iota
|
|
// PeerDisconnecting means there is an on-going attempt to disconnect from the peer.
|
|
PeerDisconnecting
|
|
// PeerConnected means the peer has an active connection.
|
|
PeerConnected
|
|
// PeerConnecting means there is an on-going attempt to connect to the peer.
|
|
PeerConnecting
|
|
)
|
|
|
|
// Additional buffer beyond current peer limit, from which we can store the relevant peer statuses.
|
|
const maxLimitBuffer = 150
|
|
|
|
var (
|
|
// ErrPeerUnknown is returned when there is an attempt to obtain data from a peer that is not known.
|
|
ErrPeerUnknown = errors.New("peer unknown")
|
|
)
|
|
|
|
// Status is the structure holding the peer status information.
|
|
type Status struct {
|
|
ctx context.Context
|
|
scorers *PeerScorerManager
|
|
store *peerDataStore
|
|
}
|
|
|
|
// StatusConfig represents peer status service params.
|
|
type StatusConfig struct {
|
|
// PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node.
|
|
PeerLimit int
|
|
// ScorerParams holds peer scorer configuration params.
|
|
ScorerParams *PeerScorerConfig
|
|
}
|
|
|
|
// NewStatus creates a new status entity.
|
|
func NewStatus(ctx context.Context, config *StatusConfig) *Status {
|
|
store := newPeerDataStore(ctx, &peerDataStoreConfig{
|
|
maxPeers: maxLimitBuffer + config.PeerLimit,
|
|
})
|
|
return &Status{
|
|
ctx: ctx,
|
|
store: store,
|
|
scorers: newPeerScorerManager(ctx, store, config.ScorerParams),
|
|
}
|
|
}
|
|
|
|
// Scorers exposes peer scoring management service.
|
|
func (p *Status) Scorers() *PeerScorerManager {
|
|
return p.scorers
|
|
}
|
|
|
|
// MaxPeerLimit returns the max peer limit stored in the current peer store.
|
|
func (p *Status) MaxPeerLimit() int {
|
|
return p.store.config.maxPeers
|
|
}
|
|
|
|
// Add adds a peer.
|
|
// If a peer already exists with this ID its address and direction are updated with the supplied data.
|
|
func (p *Status) Add(record *enr.Record, pid peer.ID, address ma.Multiaddr, direction network.Direction) {
|
|
p.store.Lock()
|
|
defer p.store.Unlock()
|
|
|
|
if peerData, ok := p.store.peers[pid]; ok {
|
|
// Peer already exists, just update its address info.
|
|
peerData.address = address
|
|
peerData.direction = direction
|
|
if record != nil {
|
|
peerData.enr = record
|
|
}
|
|
return
|
|
}
|
|
peerData := &peerData{
|
|
address: address,
|
|
direction: direction,
|
|
// Peers start disconnected; state will be updated when the handshake process begins.
|
|
connState: PeerDisconnected,
|
|
}
|
|
if record != nil {
|
|
peerData.enr = record
|
|
}
|
|
p.store.peers[pid] = peerData
|
|
}
|
|
|
|
// Address returns the multiaddress of the given remote peer.
|
|
// This will error if the peer does not exist.
|
|
func (p *Status) Address(pid peer.ID) (ma.Multiaddr, error) {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
|
|
if peerData, ok := p.store.peers[pid]; ok {
|
|
return peerData.address, nil
|
|
}
|
|
return nil, ErrPeerUnknown
|
|
}
|
|
|
|
// Direction returns the direction of the given remote peer.
|
|
// This will error if the peer does not exist.
|
|
func (p *Status) Direction(pid peer.ID) (network.Direction, error) {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
|
|
if peerData, ok := p.store.peers[pid]; ok {
|
|
return peerData.direction, nil
|
|
}
|
|
return network.DirUnknown, ErrPeerUnknown
|
|
}
|
|
|
|
// ENR returns the enr for the corresponding peer id.
|
|
func (p *Status) ENR(pid peer.ID) (*enr.Record, error) {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
|
|
if peerData, ok := p.store.peers[pid]; ok {
|
|
return peerData.enr, nil
|
|
}
|
|
return nil, ErrPeerUnknown
|
|
}
|
|
|
|
// SetChainState sets the chain state of the given remote peer.
|
|
func (p *Status) SetChainState(pid peer.ID, chainState *pb.Status) {
|
|
p.store.Lock()
|
|
defer p.store.Unlock()
|
|
|
|
peerData := p.fetch(pid)
|
|
peerData.chainState = chainState
|
|
peerData.chainStateLastUpdated = timeutils.Now()
|
|
}
|
|
|
|
// ChainState gets the chain state of the given remote peer.
|
|
// This can return nil if there is no known chain state for the peer.
|
|
// This will error if the peer does not exist.
|
|
func (p *Status) ChainState(pid peer.ID) (*pb.Status, error) {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
|
|
if peerData, ok := p.store.peers[pid]; ok {
|
|
return peerData.chainState, nil
|
|
}
|
|
return nil, ErrPeerUnknown
|
|
}
|
|
|
|
// IsActive checks if a peers is active and returns the result appropriately.
|
|
func (p *Status) IsActive(pid peer.ID) bool {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
|
|
peerData, ok := p.store.peers[pid]
|
|
return ok && (peerData.connState == PeerConnected || peerData.connState == PeerConnecting)
|
|
}
|
|
|
|
// SetMetadata sets the metadata of the given remote peer.
|
|
func (p *Status) SetMetadata(pid peer.ID, metaData *pb.MetaData) {
|
|
p.store.Lock()
|
|
defer p.store.Unlock()
|
|
|
|
peerData := p.fetch(pid)
|
|
peerData.metaData = metaData
|
|
}
|
|
|
|
// Metadata returns a copy of the metadata corresponding to the provided
|
|
// peer id.
|
|
func (p *Status) Metadata(pid peer.ID) (*pb.MetaData, error) {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
|
|
if peerData, ok := p.store.peers[pid]; ok {
|
|
return proto.Clone(peerData.metaData).(*pb.MetaData), nil
|
|
}
|
|
return nil, ErrPeerUnknown
|
|
}
|
|
|
|
// CommitteeIndices retrieves the committee subnets the peer is subscribed to.
|
|
func (p *Status) CommitteeIndices(pid peer.ID) ([]uint64, error) {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
|
|
if peerData, ok := p.store.peers[pid]; ok {
|
|
if peerData.enr == nil || peerData.metaData == nil {
|
|
return []uint64{}, nil
|
|
}
|
|
return retrieveIndicesFromBitfield(peerData.metaData.Attnets), nil
|
|
}
|
|
return nil, ErrPeerUnknown
|
|
}
|
|
|
|
// SubscribedToSubnet retrieves the peers subscribed to the given
|
|
// committee subnet.
|
|
func (p *Status) SubscribedToSubnet(index uint64) []peer.ID {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
|
|
peers := make([]peer.ID, 0)
|
|
for pid, peerData := range p.store.peers {
|
|
// look at active peers
|
|
connectedStatus := peerData.connState == PeerConnecting || peerData.connState == PeerConnected
|
|
if connectedStatus && peerData.metaData != nil && peerData.metaData.Attnets != nil {
|
|
indices := retrieveIndicesFromBitfield(peerData.metaData.Attnets)
|
|
for _, idx := range indices {
|
|
if idx == index {
|
|
peers = append(peers, pid)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return peers
|
|
}
|
|
|
|
// SetConnectionState sets the connection state of the given remote peer.
|
|
func (p *Status) SetConnectionState(pid peer.ID, state PeerConnectionState) {
|
|
p.store.Lock()
|
|
defer p.store.Unlock()
|
|
|
|
peerData := p.fetch(pid)
|
|
peerData.connState = state
|
|
}
|
|
|
|
// ConnectionState gets the connection state of the given remote peer.
|
|
// This will error if the peer does not exist.
|
|
func (p *Status) ConnectionState(pid peer.ID) (PeerConnectionState, error) {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
|
|
if peerData, ok := p.store.peers[pid]; ok {
|
|
return peerData.connState, nil
|
|
}
|
|
return PeerDisconnected, ErrPeerUnknown
|
|
}
|
|
|
|
// ChainStateLastUpdated gets the last time the chain state of the given remote peer was updated.
|
|
// This will error if the peer does not exist.
|
|
func (p *Status) ChainStateLastUpdated(pid peer.ID) (time.Time, error) {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
|
|
if peerData, ok := p.store.peers[pid]; ok {
|
|
return peerData.chainStateLastUpdated, nil
|
|
}
|
|
return timeutils.Now(), ErrPeerUnknown
|
|
}
|
|
|
|
// IsBad states if the peer is to be considered bad.
|
|
// If the peer is unknown this will return `false`, which makes using this function easier than returning an error.
|
|
func (p *Status) IsBad(pid peer.ID) bool {
|
|
return p.scorers.BadResponsesScorer().IsBadPeer(pid)
|
|
}
|
|
|
|
// Connecting returns the peers that are connecting.
|
|
func (p *Status) Connecting() []peer.ID {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
peers := make([]peer.ID, 0)
|
|
for pid, peerData := range p.store.peers {
|
|
if peerData.connState == PeerConnecting {
|
|
peers = append(peers, pid)
|
|
}
|
|
}
|
|
return peers
|
|
}
|
|
|
|
// Connected returns the peers that are connected.
|
|
func (p *Status) Connected() []peer.ID {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
peers := make([]peer.ID, 0)
|
|
for pid, peerData := range p.store.peers {
|
|
if peerData.connState == PeerConnected {
|
|
peers = append(peers, pid)
|
|
}
|
|
}
|
|
return peers
|
|
}
|
|
|
|
// Active returns the peers that are connecting or connected.
|
|
func (p *Status) Active() []peer.ID {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
peers := make([]peer.ID, 0)
|
|
for pid, peerData := range p.store.peers {
|
|
if peerData.connState == PeerConnecting || peerData.connState == PeerConnected {
|
|
peers = append(peers, pid)
|
|
}
|
|
}
|
|
return peers
|
|
}
|
|
|
|
// Disconnecting returns the peers that are disconnecting.
|
|
func (p *Status) Disconnecting() []peer.ID {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
peers := make([]peer.ID, 0)
|
|
for pid, peerData := range p.store.peers {
|
|
if peerData.connState == PeerDisconnecting {
|
|
peers = append(peers, pid)
|
|
}
|
|
}
|
|
return peers
|
|
}
|
|
|
|
// Disconnected returns the peers that are disconnected.
|
|
func (p *Status) Disconnected() []peer.ID {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
peers := make([]peer.ID, 0)
|
|
for pid, peerData := range p.store.peers {
|
|
if peerData.connState == PeerDisconnected {
|
|
peers = append(peers, pid)
|
|
}
|
|
}
|
|
return peers
|
|
}
|
|
|
|
// Inactive returns the peers that are disconnecting or disconnected.
|
|
func (p *Status) Inactive() []peer.ID {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
peers := make([]peer.ID, 0)
|
|
for pid, peerData := range p.store.peers {
|
|
if peerData.connState == PeerDisconnecting || peerData.connState == PeerDisconnected {
|
|
peers = append(peers, pid)
|
|
}
|
|
}
|
|
return peers
|
|
}
|
|
|
|
// Bad returns the peers that are bad.
|
|
func (p *Status) Bad() []peer.ID {
|
|
return p.scorers.BadResponsesScorer().BadPeers()
|
|
}
|
|
|
|
// All returns all the peers regardless of state.
|
|
func (p *Status) All() []peer.ID {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
pids := make([]peer.ID, 0, len(p.store.peers))
|
|
for pid := range p.store.peers {
|
|
pids = append(pids, pid)
|
|
}
|
|
return pids
|
|
}
|
|
|
|
// Prune clears out and removes outdated and disconnected peers.
|
|
func (p *Status) Prune() {
|
|
p.store.Lock()
|
|
defer p.store.Unlock()
|
|
|
|
// Exit early if there is nothing to prune.
|
|
if len(p.store.peers) <= p.store.config.maxPeers {
|
|
return
|
|
}
|
|
|
|
type peerResp struct {
|
|
pid peer.ID
|
|
badResp int
|
|
}
|
|
peersToPrune := make([]*peerResp, 0)
|
|
// Select disconnected peers with a smaller bad response count.
|
|
for pid, peerData := range p.store.peers {
|
|
if peerData.connState == PeerDisconnected && !p.scorers.BadResponsesScorer().isBadPeer(pid) {
|
|
peersToPrune = append(peersToPrune, &peerResp{
|
|
pid: pid,
|
|
badResp: p.store.peers[pid].badResponses,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Sort peers in ascending order, so the peers with the
|
|
// least amount of bad responses are pruned first. This
|
|
// is to protect the node from malicious/lousy peers so
|
|
// that their memory is still kept.
|
|
sort.Slice(peersToPrune, func(i, j int) bool {
|
|
return peersToPrune[i].badResp < peersToPrune[j].badResp
|
|
})
|
|
|
|
limitDiff := len(p.store.peers) - p.store.config.maxPeers
|
|
if limitDiff > len(peersToPrune) {
|
|
limitDiff = len(peersToPrune)
|
|
}
|
|
|
|
peersToPrune = peersToPrune[:limitDiff]
|
|
|
|
// Delete peers from map.
|
|
for _, peerData := range peersToPrune {
|
|
delete(p.store.peers, peerData.pid)
|
|
}
|
|
}
|
|
|
|
// BestFinalized returns the highest finalized epoch equal to or higher than ours that is agreed upon by the majority of peers.
|
|
// This method may not return the absolute highest finalized, but the finalized epoch in which most peers can serve blocks.
|
|
// Ideally, all peers would be reporting the same finalized epoch but some may be behind due to their own latency, or because of
|
|
// their finalized epoch at the time we queried them.
|
|
// Returns the best finalized root, epoch number, and list of peers that are at or beyond that epoch.
|
|
func (p *Status) BestFinalized(maxPeers int, ourFinalizedEpoch uint64) (uint64, []peer.ID) {
|
|
connected := p.Connected()
|
|
finalizedEpochVotes := make(map[uint64]uint64)
|
|
pidEpoch := make(map[peer.ID]uint64, len(connected))
|
|
pidHead := make(map[peer.ID]uint64, len(connected))
|
|
potentialPIDs := make([]peer.ID, 0, len(connected))
|
|
for _, pid := range connected {
|
|
peerChainState, err := p.ChainState(pid)
|
|
if err == nil && peerChainState != nil && peerChainState.FinalizedEpoch >= ourFinalizedEpoch {
|
|
finalizedEpochVotes[peerChainState.FinalizedEpoch]++
|
|
pidEpoch[pid] = peerChainState.FinalizedEpoch
|
|
potentialPIDs = append(potentialPIDs, pid)
|
|
pidHead[pid] = peerChainState.HeadSlot
|
|
}
|
|
}
|
|
|
|
// Select the target epoch, which is the epoch most peers agree upon.
|
|
var targetEpoch uint64
|
|
var mostVotes uint64
|
|
for epoch, count := range finalizedEpochVotes {
|
|
if count > mostVotes {
|
|
mostVotes = count
|
|
targetEpoch = epoch
|
|
}
|
|
}
|
|
|
|
// Sort PIDs by finalized epoch, in decreasing order.
|
|
sort.Slice(potentialPIDs, func(i, j int) bool {
|
|
return pidEpoch[potentialPIDs[i]] > pidEpoch[potentialPIDs[j]] &&
|
|
pidHead[potentialPIDs[i]] > pidHead[potentialPIDs[j]]
|
|
})
|
|
|
|
// Trim potential peers to those on or after target epoch.
|
|
for i, pid := range potentialPIDs {
|
|
if pidEpoch[pid] < targetEpoch {
|
|
potentialPIDs = potentialPIDs[:i]
|
|
break
|
|
}
|
|
}
|
|
|
|
// Trim potential peers to at most maxPeers.
|
|
if len(potentialPIDs) > maxPeers {
|
|
potentialPIDs = potentialPIDs[:maxPeers]
|
|
}
|
|
|
|
return targetEpoch, potentialPIDs
|
|
}
|
|
|
|
// BestNonFinalized returns the highest known epoch, which is higher than ours, and is shared
|
|
// by at least minPeers.
|
|
func (p *Status) BestNonFinalized(minPeers int, ourFinalizedEpoch uint64) (uint64, []peer.ID) {
|
|
connected := p.Connected()
|
|
epochVotes := make(map[uint64]uint64)
|
|
pidEpoch := make(map[peer.ID]uint64, len(connected))
|
|
pidHead := make(map[peer.ID]uint64, len(connected))
|
|
potentialPIDs := make([]peer.ID, 0, len(connected))
|
|
|
|
ourFinalizedSlot := ourFinalizedEpoch * params.BeaconConfig().SlotsPerEpoch
|
|
for _, pid := range connected {
|
|
peerChainState, err := p.ChainState(pid)
|
|
if err == nil && peerChainState != nil && peerChainState.HeadSlot > ourFinalizedSlot {
|
|
epoch := helpers.SlotToEpoch(peerChainState.HeadSlot)
|
|
epochVotes[epoch]++
|
|
pidEpoch[pid] = epoch
|
|
pidHead[pid] = peerChainState.HeadSlot
|
|
potentialPIDs = append(potentialPIDs, pid)
|
|
}
|
|
}
|
|
|
|
// Select the target epoch, which has enough peers' votes (>= minPeers).
|
|
var targetEpoch uint64
|
|
for epoch, votes := range epochVotes {
|
|
if votes >= uint64(minPeers) && targetEpoch < epoch {
|
|
targetEpoch = epoch
|
|
}
|
|
}
|
|
|
|
// Sort PIDs by head slot, in decreasing order.
|
|
sort.Slice(potentialPIDs, func(i, j int) bool {
|
|
return pidHead[potentialPIDs[i]] > pidHead[potentialPIDs[j]]
|
|
})
|
|
|
|
// Trim potential peers to those on or after target epoch.
|
|
for i, pid := range potentialPIDs {
|
|
if pidEpoch[pid] < targetEpoch {
|
|
potentialPIDs = potentialPIDs[:i]
|
|
break
|
|
}
|
|
}
|
|
|
|
return targetEpoch, potentialPIDs
|
|
}
|
|
|
|
// fetch is a helper function that fetches a peer status, possibly creating it.
|
|
func (p *Status) fetch(pid peer.ID) *peerData {
|
|
if _, ok := p.store.peers[pid]; !ok {
|
|
p.store.peers[pid] = &peerData{}
|
|
}
|
|
return p.store.peers[pid]
|
|
}
|
|
|
|
// HighestEpoch returns the highest epoch reported epoch amongst peers.
|
|
func (p *Status) HighestEpoch() uint64 {
|
|
p.store.RLock()
|
|
defer p.store.RUnlock()
|
|
var highestSlot uint64
|
|
for _, peerData := range p.store.peers {
|
|
if peerData != nil && peerData.chainState != nil && peerData.chainState.HeadSlot > highestSlot {
|
|
highestSlot = peerData.chainState.HeadSlot
|
|
}
|
|
}
|
|
return helpers.SlotToEpoch(highestSlot)
|
|
}
|
|
|
|
func retrieveIndicesFromBitfield(bitV bitfield.Bitvector64) []uint64 {
|
|
committeeIdxs := make([]uint64, 0, bitV.Count())
|
|
for i := uint64(0); i < 64; i++ {
|
|
if bitV.BitAt(i) {
|
|
committeeIdxs = append(committeeIdxs, i)
|
|
}
|
|
}
|
|
return committeeIdxs
|
|
}
|