prysm-pulse/beacon-chain/p2p/peers/status.go
Victor Farazdagi 6a2bb65fe2
Fix BestFinalized method (#7619)
* turn tests to use testing vector

* add regression test case

* complete tests
2020-10-23 02:49:42 +00:00

552 lines
17 KiB
Go

// Package peers provides information about peers at the eth2 protocol level.
//
// "Protocol level" is the level above the network level, so this layer never sees or interacts with
// (for example) hosts that are uncontactable due to being down, firewalled, etc. Instead, this works
// with peers that are contactable but may or may not be of the correct fork version, not currently
// required due to the number of current connections, etc.
//
// A peer can have one of a number of states:
//
// - connected if we are able to talk to the remote peer
// - connecting if we are attempting to be able to talk to the remote peer
// - disconnecting if we are attempting to stop being able to talk to the remote peer
// - disconnected if we are not able to talk to the remote peer
//
// For convenience, there are two aggregate states expressed in functions:
//
// - active if we are connecting or connected
// - inactive if we are disconnecting or disconnected
//
// Peer information is persistent for the run of the service. This allows for collection of useful
// long-term statistics such as number of bad responses obtained from the peer, giving the basis for
// decisions to not talk to known-bad peers (by de-scoring them).
package peers
import (
"context"
"sort"
"time"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/timeutils"
)
const (
// PeerDisconnected means there is no connection to the peer.
PeerDisconnected peerdata.PeerConnectionState = iota
// PeerDisconnecting means there is an on-going attempt to disconnect from the peer.
PeerDisconnecting
// PeerConnected means the peer has an active connection.
PeerConnected
// PeerConnecting means there is an on-going attempt to connect to the peer.
PeerConnecting
)
// Additional buffer beyond current peer limit, from which we can store the relevant peer statuses.
const maxLimitBuffer = 150
// Status is the structure holding the peer status information.
type Status struct {
ctx context.Context
scorers *scorers.Service
store *peerdata.Store
}
// StatusConfig represents peer status service params.
type StatusConfig struct {
// PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node.
PeerLimit int
// ScorerParams holds peer scorer configuration params.
ScorerParams *scorers.Config
}
// NewStatus creates a new status entity.
func NewStatus(ctx context.Context, config *StatusConfig) *Status {
store := peerdata.NewStore(ctx, &peerdata.StoreConfig{
MaxPeers: maxLimitBuffer + config.PeerLimit,
})
return &Status{
ctx: ctx,
store: store,
scorers: scorers.NewService(ctx, store, config.ScorerParams),
}
}
// Scorers exposes peer scoring management service.
func (p *Status) Scorers() *scorers.Service {
return p.scorers
}
// MaxPeerLimit returns the max peer limit stored in the current peer store.
func (p *Status) MaxPeerLimit() int {
return p.store.Config().MaxPeers
}
// Add adds a peer.
// If a peer already exists with this ID its address and direction are updated with the supplied data.
func (p *Status) Add(record *enr.Record, pid peer.ID, address ma.Multiaddr, direction network.Direction) {
p.store.Lock()
defer p.store.Unlock()
if peerData, ok := p.store.PeerData(pid); ok {
// Peer already exists, just update its address info.
peerData.Address = address
peerData.Direction = direction
if record != nil {
peerData.Enr = record
}
return
}
peerData := &peerdata.PeerData{
Address: address,
Direction: direction,
// Peers start disconnected; state will be updated when the handshake process begins.
ConnState: PeerDisconnected,
}
if record != nil {
peerData.Enr = record
}
p.store.SetPeerData(pid, peerData)
}
// Address returns the multiaddress of the given remote peer.
// This will error if the peer does not exist.
func (p *Status) Address(pid peer.ID) (ma.Multiaddr, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.PeerData(pid); ok {
return peerData.Address, nil
}
return nil, peerdata.ErrPeerUnknown
}
// Direction returns the direction of the given remote peer.
// This will error if the peer does not exist.
func (p *Status) Direction(pid peer.ID) (network.Direction, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.PeerData(pid); ok {
return peerData.Direction, nil
}
return network.DirUnknown, peerdata.ErrPeerUnknown
}
// ENR returns the enr for the corresponding peer id.
func (p *Status) ENR(pid peer.ID) (*enr.Record, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.PeerData(pid); ok {
return peerData.Enr, nil
}
return nil, peerdata.ErrPeerUnknown
}
// SetChainState sets the chain state of the given remote peer.
func (p *Status) SetChainState(pid peer.ID, chainState *pb.Status) {
p.store.Lock()
defer p.store.Unlock()
peerData := p.store.PeerDataGetOrCreate(pid)
peerData.ChainState = chainState
peerData.ChainStateLastUpdated = timeutils.Now()
}
// ChainState gets the chain state of the given remote peer.
// This can return nil if there is no known chain state for the peer.
// This will error if the peer does not exist.
func (p *Status) ChainState(pid peer.ID) (*pb.Status, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.PeerData(pid); ok {
return peerData.ChainState, nil
}
return nil, peerdata.ErrPeerUnknown
}
// IsActive checks if a peers is active and returns the result appropriately.
func (p *Status) IsActive(pid peer.ID) bool {
p.store.RLock()
defer p.store.RUnlock()
peerData, ok := p.store.PeerData(pid)
return ok && (peerData.ConnState == PeerConnected || peerData.ConnState == PeerConnecting)
}
// SetMetadata sets the metadata of the given remote peer.
func (p *Status) SetMetadata(pid peer.ID, metaData *pb.MetaData) {
p.store.Lock()
defer p.store.Unlock()
peerData := p.store.PeerDataGetOrCreate(pid)
peerData.MetaData = metaData
}
// Metadata returns a copy of the metadata corresponding to the provided
// peer id.
func (p *Status) Metadata(pid peer.ID) (*pb.MetaData, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.PeerData(pid); ok {
return proto.Clone(peerData.MetaData).(*pb.MetaData), nil
}
return nil, peerdata.ErrPeerUnknown
}
// CommitteeIndices retrieves the committee subnets the peer is subscribed to.
func (p *Status) CommitteeIndices(pid peer.ID) ([]uint64, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.PeerData(pid); ok {
if peerData.Enr == nil || peerData.MetaData == nil {
return []uint64{}, nil
}
return retrieveIndicesFromBitfield(peerData.MetaData.Attnets), nil
}
return nil, peerdata.ErrPeerUnknown
}
// SubscribedToSubnet retrieves the peers subscribed to the given
// committee subnet.
func (p *Status) SubscribedToSubnet(index uint64) []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
// look at active peers
connectedStatus := peerData.ConnState == PeerConnecting || peerData.ConnState == PeerConnected
if connectedStatus && peerData.MetaData != nil && peerData.MetaData.Attnets != nil {
indices := retrieveIndicesFromBitfield(peerData.MetaData.Attnets)
for _, idx := range indices {
if idx == index {
peers = append(peers, pid)
break
}
}
}
}
return peers
}
// SetConnectionState sets the connection state of the given remote peer.
func (p *Status) SetConnectionState(pid peer.ID, state peerdata.PeerConnectionState) {
p.store.Lock()
defer p.store.Unlock()
peerData := p.store.PeerDataGetOrCreate(pid)
peerData.ConnState = state
}
// ConnectionState gets the connection state of the given remote peer.
// This will error if the peer does not exist.
func (p *Status) ConnectionState(pid peer.ID) (peerdata.PeerConnectionState, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.PeerData(pid); ok {
return peerData.ConnState, nil
}
return PeerDisconnected, peerdata.ErrPeerUnknown
}
// ChainStateLastUpdated gets the last time the chain state of the given remote peer was updated.
// This will error if the peer does not exist.
func (p *Status) ChainStateLastUpdated(pid peer.ID) (time.Time, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.PeerData(pid); ok {
return peerData.ChainStateLastUpdated, nil
}
return timeutils.Now(), peerdata.ErrPeerUnknown
}
// IsBad states if the peer is to be considered bad.
// If the peer is unknown this will return `false`, which makes using this function easier than returning an error.
func (p *Status) IsBad(pid peer.ID) bool {
return p.scorers.BadResponsesScorer().IsBadPeer(pid)
}
// Connecting returns the peers that are connecting.
func (p *Status) Connecting() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnecting {
peers = append(peers, pid)
}
}
return peers
}
// Connected returns the peers that are connected.
func (p *Status) Connected() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected {
peers = append(peers, pid)
}
}
return peers
}
// Active returns the peers that are connecting or connected.
func (p *Status) Active() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnecting || peerData.ConnState == PeerConnected {
peers = append(peers, pid)
}
}
return peers
}
// Disconnecting returns the peers that are disconnecting.
func (p *Status) Disconnecting() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerDisconnecting {
peers = append(peers, pid)
}
}
return peers
}
// Disconnected returns the peers that are disconnected.
func (p *Status) Disconnected() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerDisconnected {
peers = append(peers, pid)
}
}
return peers
}
// Inactive returns the peers that are disconnecting or disconnected.
func (p *Status) Inactive() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerDisconnecting || peerData.ConnState == PeerDisconnected {
peers = append(peers, pid)
}
}
return peers
}
// Bad returns the peers that are bad.
func (p *Status) Bad() []peer.ID {
return p.scorers.BadResponsesScorer().BadPeers()
}
// All returns all the peers regardless of state.
func (p *Status) All() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
pids := make([]peer.ID, 0, len(p.store.Peers()))
for pid := range p.store.Peers() {
pids = append(pids, pid)
}
return pids
}
// Prune clears out and removes outdated and disconnected peers.
func (p *Status) Prune() {
p.store.Lock()
defer p.store.Unlock()
// Exit early if there is nothing to prune.
if len(p.store.Peers()) <= p.store.Config().MaxPeers {
return
}
notBadPeer := func(peerData *peerdata.PeerData) bool {
return peerData.BadResponses < p.scorers.BadResponsesScorer().Params().Threshold
}
type peerResp struct {
pid peer.ID
badResp int
}
peersToPrune := make([]*peerResp, 0)
// Select disconnected peers with a smaller bad response count.
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerDisconnected && notBadPeer(peerData) {
peersToPrune = append(peersToPrune, &peerResp{
pid: pid,
badResp: peerData.BadResponses,
})
}
}
// Sort peers in ascending order, so the peers with the
// least amount of bad responses are pruned first. This
// is to protect the node from malicious/lousy peers so
// that their memory is still kept.
sort.Slice(peersToPrune, func(i, j int) bool {
return peersToPrune[i].badResp < peersToPrune[j].badResp
})
limitDiff := len(p.store.Peers()) - p.store.Config().MaxPeers
if limitDiff > len(peersToPrune) {
limitDiff = len(peersToPrune)
}
peersToPrune = peersToPrune[:limitDiff]
// Delete peers from map.
for _, peerData := range peersToPrune {
p.store.DeletePeerData(peerData.pid)
}
}
// BestFinalized returns the highest finalized epoch equal to or higher than ours that is agreed
// upon by the majority of peers. This method may not return the absolute highest finalized, but
// the finalized epoch in which most peers can serve blocks (plurality voting).
// Ideally, all peers would be reporting the same finalized epoch but some may be behind due to their
// own latency, or because of their finalized epoch at the time we queried them.
// Returns epoch number and list of peers that are at or beyond that epoch.
func (p *Status) BestFinalized(maxPeers int, ourFinalizedEpoch uint64) (uint64, []peer.ID) {
connected := p.Connected()
finalizedEpochVotes := make(map[uint64]uint64)
pidEpoch := make(map[peer.ID]uint64, len(connected))
pidHead := make(map[peer.ID]uint64, len(connected))
potentialPIDs := make([]peer.ID, 0, len(connected))
for _, pid := range connected {
peerChainState, err := p.ChainState(pid)
if err == nil && peerChainState != nil && peerChainState.FinalizedEpoch >= ourFinalizedEpoch {
finalizedEpochVotes[peerChainState.FinalizedEpoch]++
pidEpoch[pid] = peerChainState.FinalizedEpoch
potentialPIDs = append(potentialPIDs, pid)
pidHead[pid] = peerChainState.HeadSlot
}
}
// Select the target epoch, which is the epoch most peers agree upon.
var targetEpoch uint64
var mostVotes uint64
for epoch, count := range finalizedEpochVotes {
if count > mostVotes {
mostVotes = count
targetEpoch = epoch
}
}
// Sort PIDs by finalized epoch, in decreasing order.
sort.Slice(potentialPIDs, func(i, j int) bool {
if pidEpoch[potentialPIDs[i]] == pidEpoch[potentialPIDs[j]] {
return pidHead[potentialPIDs[i]] > pidHead[potentialPIDs[j]]
}
return pidEpoch[potentialPIDs[i]] > pidEpoch[potentialPIDs[j]]
})
// Trim potential peers to those on or after target epoch.
for i, pid := range potentialPIDs {
if pidEpoch[pid] < targetEpoch {
potentialPIDs = potentialPIDs[:i]
break
}
}
// Trim potential peers to at most maxPeers.
if len(potentialPIDs) > maxPeers {
potentialPIDs = potentialPIDs[:maxPeers]
}
return targetEpoch, potentialPIDs
}
// BestNonFinalized returns the highest known epoch, which is higher than ours, and is shared
// by at least minPeers.
func (p *Status) BestNonFinalized(minPeers int, ourFinalizedEpoch uint64) (uint64, []peer.ID) {
connected := p.Connected()
epochVotes := make(map[uint64]uint64)
pidEpoch := make(map[peer.ID]uint64, len(connected))
pidHead := make(map[peer.ID]uint64, len(connected))
potentialPIDs := make([]peer.ID, 0, len(connected))
ourFinalizedSlot := ourFinalizedEpoch * params.BeaconConfig().SlotsPerEpoch
for _, pid := range connected {
peerChainState, err := p.ChainState(pid)
if err == nil && peerChainState != nil && peerChainState.HeadSlot > ourFinalizedSlot {
epoch := helpers.SlotToEpoch(peerChainState.HeadSlot)
epochVotes[epoch]++
pidEpoch[pid] = epoch
pidHead[pid] = peerChainState.HeadSlot
potentialPIDs = append(potentialPIDs, pid)
}
}
// Select the target epoch, which has enough peers' votes (>= minPeers).
var targetEpoch uint64
for epoch, votes := range epochVotes {
if votes >= uint64(minPeers) && targetEpoch < epoch {
targetEpoch = epoch
}
}
// Sort PIDs by head slot, in decreasing order.
sort.Slice(potentialPIDs, func(i, j int) bool {
return pidHead[potentialPIDs[i]] > pidHead[potentialPIDs[j]]
})
// Trim potential peers to those on or after target epoch.
for i, pid := range potentialPIDs {
if pidEpoch[pid] < targetEpoch {
potentialPIDs = potentialPIDs[:i]
break
}
}
return targetEpoch, potentialPIDs
}
// HighestEpoch returns the highest epoch reported epoch amongst peers.
func (p *Status) HighestEpoch() uint64 {
p.store.RLock()
defer p.store.RUnlock()
var highestSlot uint64
for _, peerData := range p.store.Peers() {
if peerData != nil && peerData.ChainState != nil && peerData.ChainState.HeadSlot > highestSlot {
highestSlot = peerData.ChainState.HeadSlot
}
}
return helpers.SlotToEpoch(highestSlot)
}
func retrieveIndicesFromBitfield(bitV bitfield.Bitvector64) []uint64 {
committeeIdxs := make([]uint64, 0, bitV.Count())
for i := uint64(0); i < 64; i++ {
if bitV.BitAt(i) {
committeeIdxs = append(committeeIdxs, i)
}
}
return committeeIdxs
}