mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-24 20:37:17 +00:00
Extracts peer data store in p2p/peers status (#6639)
* introduces peer data store to p2p/peers * Merge refs/heads/master into p2p-extract-data-store * Merge refs/heads/master into p2p-extract-data-store * refactores status service * Merge branch 'p2p-extract-data-store' of github.com:prysmaticlabs/prysm into p2p-extract-data-store * updates tests * gofmt * rever decay test
This commit is contained in:
parent
c419e4ed8f
commit
568238009e
@ -21,7 +21,10 @@ func TestPeer_AtMaxLimit(t *testing.T) {
|
||||
listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000))
|
||||
require.NoError(t, err, "Failed to p2p listen")
|
||||
s := &Service{}
|
||||
s.peers = peers.NewStatus(3, 0)
|
||||
s.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 0,
|
||||
MaxBadResponses: 3,
|
||||
})
|
||||
s.cfg = &Config{MaxPeers: 0}
|
||||
s.addrFilter, err = configureFilter(&Config{})
|
||||
require.NoError(t, err)
|
||||
@ -57,7 +60,10 @@ func TestPeer_BelowMaxLimit(t *testing.T) {
|
||||
listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000))
|
||||
require.NoError(t, err, "Failed to p2p listen")
|
||||
s := &Service{}
|
||||
s.peers = peers.NewStatus(3, 1)
|
||||
s.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 1,
|
||||
MaxBadResponses: 3,
|
||||
})
|
||||
s.cfg = &Config{MaxPeers: 1}
|
||||
s.addrFilter, err = configureFilter(&Config{})
|
||||
require.NoError(t, err)
|
||||
|
@ -3,7 +3,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["status.go"],
|
||||
srcs = [
|
||||
"status.go",
|
||||
"store.go",
|
||||
],
|
||||
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers",
|
||||
visibility = ["//beacon-chain:__subpackages__"],
|
||||
deps = [
|
||||
|
@ -20,9 +20,9 @@
|
||||
package peers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
@ -62,79 +62,76 @@ var (
|
||||
|
||||
// Status is the structure holding the peer status information.
|
||||
type Status struct {
|
||||
lock sync.RWMutex
|
||||
maxBadResponses int
|
||||
status map[peer.ID]*peerStatus
|
||||
maxLimit int
|
||||
ctx context.Context
|
||||
store *peerDataStore
|
||||
config *StatusConfig
|
||||
}
|
||||
|
||||
// peerStatus is the status of an individual peer at the protocol level.
|
||||
type peerStatus struct {
|
||||
address ma.Multiaddr
|
||||
direction network.Direction
|
||||
peerState PeerConnectionState
|
||||
chainState *pb.Status
|
||||
enr *enr.Record
|
||||
metaData *pb.MetaData
|
||||
chainStateLastUpdated time.Time
|
||||
badResponses int
|
||||
// StatusConfig represents peer status service params.
|
||||
type StatusConfig struct {
|
||||
// PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node.
|
||||
PeerLimit int
|
||||
// MaxBadResponses specifies number of bad responses tolerated, before peer is banned.
|
||||
MaxBadResponses int
|
||||
}
|
||||
|
||||
// NewStatus creates a new status entity.
|
||||
func NewStatus(maxBadResponses int, peerLimit int) *Status {
|
||||
func NewStatus(ctx context.Context, config *StatusConfig) *Status {
|
||||
store := newPeerDataStore(ctx, &peerDataStoreConfig{
|
||||
maxPeers: maxLimitBuffer + config.PeerLimit,
|
||||
})
|
||||
return &Status{
|
||||
maxBadResponses: maxBadResponses,
|
||||
status: make(map[peer.ID]*peerStatus),
|
||||
maxLimit: maxLimitBuffer + peerLimit,
|
||||
ctx: ctx,
|
||||
store: store,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
// MaxBadResponses returns the maximum number of bad responses a peer can provide before it is considered bad.
|
||||
func (p *Status) MaxBadResponses() int {
|
||||
return p.maxBadResponses
|
||||
return p.config.MaxBadResponses
|
||||
}
|
||||
|
||||
// MaxPeerLimit returns the max peer limit stored in
|
||||
// the current peer store.
|
||||
// MaxPeerLimit returns the max peer limit stored in the current peer store.
|
||||
func (p *Status) MaxPeerLimit() int {
|
||||
return p.maxLimit
|
||||
return p.store.config.maxPeers
|
||||
}
|
||||
|
||||
// Add adds a peer.
|
||||
// If a peer already exists with this ID its address and direction are updated with the supplied data.
|
||||
func (p *Status) Add(record *enr.Record, pid peer.ID, address ma.Multiaddr, direction network.Direction) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.store.Lock()
|
||||
defer p.store.Unlock()
|
||||
|
||||
if status, ok := p.status[pid]; ok {
|
||||
if peerData, ok := p.store.peers[pid]; ok {
|
||||
// Peer already exists, just update its address info.
|
||||
status.address = address
|
||||
status.direction = direction
|
||||
peerData.address = address
|
||||
peerData.direction = direction
|
||||
if record != nil {
|
||||
status.enr = record
|
||||
peerData.enr = record
|
||||
}
|
||||
return
|
||||
}
|
||||
status := &peerStatus{
|
||||
peerData := &peerData{
|
||||
address: address,
|
||||
direction: direction,
|
||||
// Peers start disconnected; state will be updated when the handshake process begins.
|
||||
peerState: PeerDisconnected,
|
||||
connState: PeerDisconnected,
|
||||
}
|
||||
if record != nil {
|
||||
status.enr = record
|
||||
peerData.enr = record
|
||||
}
|
||||
p.status[pid] = status
|
||||
p.store.peers[pid] = peerData
|
||||
}
|
||||
|
||||
// Address returns the multiaddress of the given remote peer.
|
||||
// This will error if the peer does not exist.
|
||||
func (p *Status) Address(pid peer.ID) (ma.Multiaddr, error) {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
|
||||
if status, ok := p.status[pid]; ok {
|
||||
return status.address, nil
|
||||
if peerData, ok := p.store.peers[pid]; ok {
|
||||
return peerData.address, nil
|
||||
}
|
||||
return nil, ErrPeerUnknown
|
||||
}
|
||||
@ -142,89 +139,89 @@ func (p *Status) Address(pid peer.ID) (ma.Multiaddr, error) {
|
||||
// Direction returns the direction of the given remote peer.
|
||||
// This will error if the peer does not exist.
|
||||
func (p *Status) Direction(pid peer.ID) (network.Direction, error) {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
|
||||
if status, ok := p.status[pid]; ok {
|
||||
return status.direction, nil
|
||||
if peerData, ok := p.store.peers[pid]; ok {
|
||||
return peerData.direction, nil
|
||||
}
|
||||
return network.DirUnknown, ErrPeerUnknown
|
||||
}
|
||||
|
||||
// ENR returns the enr for the corresponding peer id.
|
||||
func (p *Status) ENR(pid peer.ID) (*enr.Record, error) {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
|
||||
if status, ok := p.status[pid]; ok {
|
||||
return status.enr, nil
|
||||
if peerData, ok := p.store.peers[pid]; ok {
|
||||
return peerData.enr, nil
|
||||
}
|
||||
return nil, ErrPeerUnknown
|
||||
}
|
||||
|
||||
// SetChainState sets the chain state of the given remote peer.
|
||||
func (p *Status) SetChainState(pid peer.ID, chainState *pb.Status) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.store.Lock()
|
||||
defer p.store.Unlock()
|
||||
|
||||
status := p.fetch(pid)
|
||||
status.chainState = chainState
|
||||
status.chainStateLastUpdated = roughtime.Now()
|
||||
peerData := p.fetch(pid)
|
||||
peerData.chainState = chainState
|
||||
peerData.chainStateLastUpdated = roughtime.Now()
|
||||
}
|
||||
|
||||
// ChainState gets the chain state of the given remote peer.
|
||||
// This can return nil if there is no known chain state for the peer.
|
||||
// This will error if the peer does not exist.
|
||||
func (p *Status) ChainState(pid peer.ID) (*pb.Status, error) {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
|
||||
if status, ok := p.status[pid]; ok {
|
||||
return status.chainState, nil
|
||||
if peerData, ok := p.store.peers[pid]; ok {
|
||||
return peerData.chainState, nil
|
||||
}
|
||||
return nil, ErrPeerUnknown
|
||||
}
|
||||
|
||||
// IsActive checks if a peers is active and returns the result appropriately.
|
||||
func (p *Status) IsActive(pid peer.ID) bool {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
|
||||
status, ok := p.status[pid]
|
||||
return ok && (status.peerState == PeerConnected || status.peerState == PeerConnecting)
|
||||
peerData, ok := p.store.peers[pid]
|
||||
return ok && (peerData.connState == PeerConnected || peerData.connState == PeerConnecting)
|
||||
}
|
||||
|
||||
// SetMetadata sets the metadata of the given remote peer.
|
||||
func (p *Status) SetMetadata(pid peer.ID, metaData *pb.MetaData) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.store.Lock()
|
||||
defer p.store.Unlock()
|
||||
|
||||
status := p.fetch(pid)
|
||||
status.metaData = metaData
|
||||
peerData := p.fetch(pid)
|
||||
peerData.metaData = metaData
|
||||
}
|
||||
|
||||
// Metadata returns a copy of the metadata corresponding to the provided
|
||||
// peer id.
|
||||
func (p *Status) Metadata(pid peer.ID) (*pb.MetaData, error) {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
|
||||
if status, ok := p.status[pid]; ok {
|
||||
return proto.Clone(status.metaData).(*pb.MetaData), nil
|
||||
if peerData, ok := p.store.peers[pid]; ok {
|
||||
return proto.Clone(peerData.metaData).(*pb.MetaData), nil
|
||||
}
|
||||
return nil, ErrPeerUnknown
|
||||
}
|
||||
|
||||
// CommitteeIndices retrieves the committee subnets the peer is subscribed to.
|
||||
func (p *Status) CommitteeIndices(pid peer.ID) ([]uint64, error) {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
|
||||
if status, ok := p.status[pid]; ok {
|
||||
if status.enr == nil || status.metaData == nil {
|
||||
if peerData, ok := p.store.peers[pid]; ok {
|
||||
if peerData.enr == nil || peerData.metaData == nil {
|
||||
return []uint64{}, nil
|
||||
}
|
||||
return retrieveIndicesFromBitfield(status.metaData.Attnets), nil
|
||||
return retrieveIndicesFromBitfield(peerData.metaData.Attnets), nil
|
||||
}
|
||||
return nil, ErrPeerUnknown
|
||||
}
|
||||
@ -232,15 +229,15 @@ func (p *Status) CommitteeIndices(pid peer.ID) ([]uint64, error) {
|
||||
// SubscribedToSubnet retrieves the peers subscribed to the given
|
||||
// committee subnet.
|
||||
func (p *Status) SubscribedToSubnet(index uint64) []peer.ID {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
|
||||
peers := make([]peer.ID, 0)
|
||||
for pid, status := range p.status {
|
||||
for pid, peerData := range p.store.peers {
|
||||
// look at active peers
|
||||
connectedStatus := status.peerState == PeerConnecting || status.peerState == PeerConnected
|
||||
if connectedStatus && status.metaData != nil && status.metaData.Attnets != nil {
|
||||
indices := retrieveIndicesFromBitfield(status.metaData.Attnets)
|
||||
connectedStatus := peerData.connState == PeerConnecting || peerData.connState == PeerConnected
|
||||
if connectedStatus && peerData.metaData != nil && peerData.metaData.Attnets != nil {
|
||||
indices := retrieveIndicesFromBitfield(peerData.metaData.Attnets)
|
||||
for _, idx := range indices {
|
||||
if idx == index {
|
||||
peers = append(peers, pid)
|
||||
@ -254,21 +251,21 @@ func (p *Status) SubscribedToSubnet(index uint64) []peer.ID {
|
||||
|
||||
// SetConnectionState sets the connection state of the given remote peer.
|
||||
func (p *Status) SetConnectionState(pid peer.ID, state PeerConnectionState) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.store.Lock()
|
||||
defer p.store.Unlock()
|
||||
|
||||
status := p.fetch(pid)
|
||||
status.peerState = state
|
||||
peerData := p.fetch(pid)
|
||||
peerData.connState = state
|
||||
}
|
||||
|
||||
// ConnectionState gets the connection state of the given remote peer.
|
||||
// This will error if the peer does not exist.
|
||||
func (p *Status) ConnectionState(pid peer.ID) (PeerConnectionState, error) {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
|
||||
if status, ok := p.status[pid]; ok {
|
||||
return status.peerState, nil
|
||||
if peerData, ok := p.store.peers[pid]; ok {
|
||||
return peerData.connState, nil
|
||||
}
|
||||
return PeerDisconnected, ErrPeerUnknown
|
||||
}
|
||||
@ -276,32 +273,32 @@ func (p *Status) ConnectionState(pid peer.ID) (PeerConnectionState, error) {
|
||||
// ChainStateLastUpdated gets the last time the chain state of the given remote peer was updated.
|
||||
// This will error if the peer does not exist.
|
||||
func (p *Status) ChainStateLastUpdated(pid peer.ID) (time.Time, error) {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
|
||||
if status, ok := p.status[pid]; ok {
|
||||
return status.chainStateLastUpdated, nil
|
||||
if peerData, ok := p.store.peers[pid]; ok {
|
||||
return peerData.chainStateLastUpdated, nil
|
||||
}
|
||||
return roughtime.Now(), ErrPeerUnknown
|
||||
}
|
||||
|
||||
// IncrementBadResponses increments the number of bad responses we have received from the given remote peer.
|
||||
func (p *Status) IncrementBadResponses(pid peer.ID) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.store.Lock()
|
||||
defer p.store.Unlock()
|
||||
|
||||
status := p.fetch(pid)
|
||||
status.badResponses++
|
||||
peerData := p.fetch(pid)
|
||||
peerData.badResponsesCount++
|
||||
}
|
||||
|
||||
// BadResponses obtains the number of bad responses we have received from the given remote peer.
|
||||
// This will error if the peer does not exist.
|
||||
func (p *Status) BadResponses(pid peer.ID) (int, error) {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
|
||||
if status, ok := p.status[pid]; ok {
|
||||
return status.badResponses, nil
|
||||
if peerData, ok := p.store.peers[pid]; ok {
|
||||
return peerData.badResponsesCount, nil
|
||||
}
|
||||
return -1, ErrPeerUnknown
|
||||
}
|
||||
@ -309,22 +306,22 @@ func (p *Status) BadResponses(pid peer.ID) (int, error) {
|
||||
// IsBad states if the peer is to be considered bad.
|
||||
// If the peer is unknown this will return `false`, which makes using this function easier than returning an error.
|
||||
func (p *Status) IsBad(pid peer.ID) bool {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
|
||||
if status, ok := p.status[pid]; ok {
|
||||
return status.badResponses >= p.maxBadResponses
|
||||
if peerData, ok := p.store.peers[pid]; ok {
|
||||
return peerData.badResponsesCount >= p.config.MaxBadResponses
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Connecting returns the peers that are connecting.
|
||||
func (p *Status) Connecting() []peer.ID {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
peers := make([]peer.ID, 0)
|
||||
for pid, status := range p.status {
|
||||
if status.peerState == PeerConnecting {
|
||||
for pid, peerData := range p.store.peers {
|
||||
if peerData.connState == PeerConnecting {
|
||||
peers = append(peers, pid)
|
||||
}
|
||||
}
|
||||
@ -333,11 +330,11 @@ func (p *Status) Connecting() []peer.ID {
|
||||
|
||||
// Connected returns the peers that are connected.
|
||||
func (p *Status) Connected() []peer.ID {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
peers := make([]peer.ID, 0)
|
||||
for pid, status := range p.status {
|
||||
if status.peerState == PeerConnected {
|
||||
for pid, peerData := range p.store.peers {
|
||||
if peerData.connState == PeerConnected {
|
||||
peers = append(peers, pid)
|
||||
}
|
||||
}
|
||||
@ -346,11 +343,11 @@ func (p *Status) Connected() []peer.ID {
|
||||
|
||||
// Active returns the peers that are connecting or connected.
|
||||
func (p *Status) Active() []peer.ID {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
peers := make([]peer.ID, 0)
|
||||
for pid, status := range p.status {
|
||||
if status.peerState == PeerConnecting || status.peerState == PeerConnected {
|
||||
for pid, peerData := range p.store.peers {
|
||||
if peerData.connState == PeerConnecting || peerData.connState == PeerConnected {
|
||||
peers = append(peers, pid)
|
||||
}
|
||||
}
|
||||
@ -359,11 +356,11 @@ func (p *Status) Active() []peer.ID {
|
||||
|
||||
// Disconnecting returns the peers that are disconnecting.
|
||||
func (p *Status) Disconnecting() []peer.ID {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
peers := make([]peer.ID, 0)
|
||||
for pid, status := range p.status {
|
||||
if status.peerState == PeerDisconnecting {
|
||||
for pid, peerData := range p.store.peers {
|
||||
if peerData.connState == PeerDisconnecting {
|
||||
peers = append(peers, pid)
|
||||
}
|
||||
}
|
||||
@ -372,11 +369,11 @@ func (p *Status) Disconnecting() []peer.ID {
|
||||
|
||||
// Disconnected returns the peers that are disconnected.
|
||||
func (p *Status) Disconnected() []peer.ID {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
peers := make([]peer.ID, 0)
|
||||
for pid, status := range p.status {
|
||||
if status.peerState == PeerDisconnected {
|
||||
for pid, peerData := range p.store.peers {
|
||||
if peerData.connState == PeerDisconnected {
|
||||
peers = append(peers, pid)
|
||||
}
|
||||
}
|
||||
@ -385,11 +382,11 @@ func (p *Status) Disconnected() []peer.ID {
|
||||
|
||||
// Inactive returns the peers that are disconnecting or disconnected.
|
||||
func (p *Status) Inactive() []peer.ID {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
peers := make([]peer.ID, 0)
|
||||
for pid, status := range p.status {
|
||||
if status.peerState == PeerDisconnecting || status.peerState == PeerDisconnected {
|
||||
for pid, peerData := range p.store.peers {
|
||||
if peerData.connState == PeerDisconnecting || peerData.connState == PeerDisconnected {
|
||||
peers = append(peers, pid)
|
||||
}
|
||||
}
|
||||
@ -398,11 +395,11 @@ func (p *Status) Inactive() []peer.ID {
|
||||
|
||||
// Bad returns the peers that are bad.
|
||||
func (p *Status) Bad() []peer.ID {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
peers := make([]peer.ID, 0)
|
||||
for pid, status := range p.status {
|
||||
if status.badResponses >= p.maxBadResponses {
|
||||
for pid, peerData := range p.store.peers {
|
||||
if peerData.badResponsesCount >= p.config.MaxBadResponses {
|
||||
peers = append(peers, pid)
|
||||
}
|
||||
}
|
||||
@ -411,10 +408,10 @@ func (p *Status) Bad() []peer.ID {
|
||||
|
||||
// All returns all the peers regardless of state.
|
||||
func (p *Status) All() []peer.ID {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
pids := make([]peer.ID, 0, len(p.status))
|
||||
for pid := range p.status {
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
pids := make([]peer.ID, 0, len(p.store.peers))
|
||||
for pid := range p.store.peers {
|
||||
pids = append(pids, pid)
|
||||
}
|
||||
return pids
|
||||
@ -424,42 +421,40 @@ func (p *Status) All() []peer.ID {
|
||||
// This can be run periodically, although note that each time it runs it does give all bad peers another chance as well to clog up
|
||||
// the network with bad responses, so should not be run too frequently; once an hour would be reasonable.
|
||||
func (p *Status) Decay() {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
for _, status := range p.status {
|
||||
if status.badResponses > 0 {
|
||||
status.badResponses--
|
||||
p.store.Lock()
|
||||
defer p.store.Unlock()
|
||||
for _, peerData := range p.store.peers {
|
||||
if peerData.badResponsesCount > 0 {
|
||||
peerData.badResponsesCount--
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Prune clears out and removes outdated and disconnected peers.
|
||||
func (p *Status) Prune() {
|
||||
currSize := p.totalSize()
|
||||
// Exit early if there is nothing
|
||||
// to prune.
|
||||
if currSize <= p.maxLimit {
|
||||
p.store.Lock()
|
||||
defer p.store.Unlock()
|
||||
|
||||
// Exit early if there is nothing to prune.
|
||||
if len(p.store.peers) <= p.store.config.maxPeers {
|
||||
return
|
||||
}
|
||||
disconnected := p.Disconnected()
|
||||
|
||||
type peerResp struct {
|
||||
pid peer.ID
|
||||
badResp int
|
||||
}
|
||||
peersToPrune := make([]*peerResp, 0, len(disconnected))
|
||||
p.lock.RLock()
|
||||
peersToPrune := make([]*peerResp, 0)
|
||||
// Select disconnected peers with a smaller
|
||||
// bad response count.
|
||||
for _, pid := range disconnected {
|
||||
if p.status[pid].badResponses < p.maxBadResponses {
|
||||
for pid, peerData := range p.store.peers {
|
||||
if peerData.connState == PeerDisconnected && peerData.badResponsesCount < p.config.MaxBadResponses {
|
||||
peersToPrune = append(peersToPrune, &peerResp{
|
||||
pid: pid,
|
||||
badResp: p.status[pid].badResponses,
|
||||
badResp: p.store.peers[pid].badResponsesCount,
|
||||
})
|
||||
}
|
||||
}
|
||||
p.lock.RUnlock()
|
||||
|
||||
// Sort peers in ascending order, so the peers with the
|
||||
// least amount of bad responses are pruned first. This
|
||||
@ -469,7 +464,7 @@ func (p *Status) Prune() {
|
||||
return peersToPrune[i].badResp < peersToPrune[j].badResp
|
||||
})
|
||||
|
||||
limitDiff := currSize - p.maxLimit
|
||||
limitDiff := len(p.store.peers) - p.store.config.maxPeers
|
||||
|
||||
if limitDiff > len(peersToPrune) {
|
||||
limitDiff = len(peersToPrune)
|
||||
@ -477,11 +472,9 @@ func (p *Status) Prune() {
|
||||
|
||||
peersToPrune = peersToPrune[:limitDiff]
|
||||
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
// Delete peers from map.
|
||||
for _, peerRes := range peersToPrune {
|
||||
delete(p.status, peerRes.pid)
|
||||
for _, peerData := range peersToPrune {
|
||||
delete(p.store.peers, peerData.pid)
|
||||
}
|
||||
}
|
||||
|
||||
@ -536,32 +529,26 @@ func (p *Status) BestFinalized(maxPeers int, ourFinalizedEpoch uint64) (uint64,
|
||||
}
|
||||
|
||||
// fetch is a helper function that fetches a peer status, possibly creating it.
|
||||
func (p *Status) fetch(pid peer.ID) *peerStatus {
|
||||
if _, ok := p.status[pid]; !ok {
|
||||
p.status[pid] = &peerStatus{}
|
||||
func (p *Status) fetch(pid peer.ID) *peerData {
|
||||
if _, ok := p.store.peers[pid]; !ok {
|
||||
p.store.peers[pid] = &peerData{}
|
||||
}
|
||||
return p.status[pid]
|
||||
return p.store.peers[pid]
|
||||
}
|
||||
|
||||
// HighestEpoch returns the highest epoch reported epoch amongst peers.
|
||||
func (p *Status) HighestEpoch() uint64 {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
p.store.RLock()
|
||||
defer p.store.RUnlock()
|
||||
var highestSlot uint64
|
||||
for _, ps := range p.status {
|
||||
if ps != nil && ps.chainState != nil && ps.chainState.HeadSlot > highestSlot {
|
||||
highestSlot = ps.chainState.HeadSlot
|
||||
for _, peerData := range p.store.peers {
|
||||
if peerData != nil && peerData.chainState != nil && peerData.chainState.HeadSlot > highestSlot {
|
||||
highestSlot = peerData.chainState.HeadSlot
|
||||
}
|
||||
}
|
||||
return helpers.SlotToEpoch(highestSlot)
|
||||
}
|
||||
|
||||
func (p *Status) totalSize() int {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
return len(p.status)
|
||||
}
|
||||
|
||||
func retrieveIndicesFromBitfield(bitV bitfield.Bitvector64) []uint64 {
|
||||
committeeIdxs := make([]uint64, 0, bitV.Count())
|
||||
for i := uint64(0); i < 64; i++ {
|
||||
|
@ -1,6 +1,7 @@
|
||||
package peers_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
|
||||
@ -18,14 +19,20 @@ import (
|
||||
|
||||
func TestStatus(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
require.NotNil(t, p, "p not created")
|
||||
assert.Equal(t, maxBadResponses, p.MaxBadResponses(), "maxBadResponses incorrect value")
|
||||
}
|
||||
|
||||
func TestPeerExplicitAdd(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
|
||||
require.NoError(t, err, "Failed to create ID")
|
||||
@ -59,7 +66,10 @@ func TestPeerExplicitAdd(t *testing.T) {
|
||||
|
||||
func TestPeerNoENR(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
|
||||
require.NoError(t, err, "Failed to create ID")
|
||||
@ -76,7 +86,10 @@ func TestPeerNoENR(t *testing.T) {
|
||||
|
||||
func TestPeerNoOverwriteENR(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
|
||||
require.NoError(t, err, "Failed to create ID")
|
||||
@ -96,7 +109,10 @@ func TestPeerNoOverwriteENR(t *testing.T) {
|
||||
|
||||
func TestErrUnknownPeer(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
|
||||
require.NoError(t, err)
|
||||
@ -122,7 +138,10 @@ func TestErrUnknownPeer(t *testing.T) {
|
||||
|
||||
func TestPeerCommitteeIndices(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
|
||||
require.NoError(t, err, "Failed to create ID")
|
||||
@ -152,7 +171,10 @@ func TestPeerCommitteeIndices(t *testing.T) {
|
||||
|
||||
func TestPeerSubscribedToSubnet(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
// Add some peers with different states
|
||||
numPeers := 2
|
||||
@ -189,7 +211,10 @@ func TestPeerSubscribedToSubnet(t *testing.T) {
|
||||
|
||||
func TestPeerImplicitAdd(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
|
||||
require.NoError(t, err)
|
||||
@ -205,7 +230,10 @@ func TestPeerImplicitAdd(t *testing.T) {
|
||||
|
||||
func TestPeerChainState(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
|
||||
require.NoError(t, err)
|
||||
@ -233,7 +261,10 @@ func TestPeerChainState(t *testing.T) {
|
||||
|
||||
func TestPeerBadResponses(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
|
||||
require.NoError(t, err)
|
||||
@ -275,7 +306,10 @@ func TestPeerBadResponses(t *testing.T) {
|
||||
|
||||
func TestAddMetaData(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
// Add some peers with different states
|
||||
numPeers := 5
|
||||
@ -297,7 +331,10 @@ func TestAddMetaData(t *testing.T) {
|
||||
|
||||
func TestPeerConnectionStatuses(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
// Add some peers with different states
|
||||
numPeersDisconnected := 11
|
||||
@ -332,7 +369,10 @@ func TestPeerConnectionStatuses(t *testing.T) {
|
||||
|
||||
func TestDecay(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
// Peer 1 has 0 bad responses.
|
||||
pid1 := addPeer(t, p, peers.PeerConnected)
|
||||
@ -361,7 +401,10 @@ func TestDecay(t *testing.T) {
|
||||
|
||||
func TestPrune(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
for i := 0; i < p.MaxPeerLimit()+100; i++ {
|
||||
if i%7 == 0 {
|
||||
@ -403,7 +446,10 @@ func TestPrune(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTrimmedOrderedPeers(t *testing.T) {
|
||||
p := peers.NewStatus(1, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: 1,
|
||||
})
|
||||
|
||||
expectedTarget := uint64(2)
|
||||
maxPeers := 3
|
||||
@ -461,7 +507,10 @@ func TestBestPeer(t *testing.T) {
|
||||
expectedFinEpoch := uint64(4)
|
||||
expectedRoot := [32]byte{'t', 'e', 's', 't'}
|
||||
junkRoot := [32]byte{'j', 'u', 'n', 'k'}
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
// Peer 1
|
||||
pid1 := addPeer(t, p, peers.PeerConnected)
|
||||
@ -506,7 +555,10 @@ func TestBestPeer(t *testing.T) {
|
||||
func TestBestFinalized_returnsMaxValue(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
maxPeers := 10
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
for i := 0; i <= maxPeers+100; i++ {
|
||||
p.Add(new(enr.Record), peer.ID(i), nil, network.DirOutbound)
|
||||
@ -522,7 +574,10 @@ func TestBestFinalized_returnsMaxValue(t *testing.T) {
|
||||
|
||||
func TestStatus_CurrentEpoch(t *testing.T) {
|
||||
maxBadResponses := 2
|
||||
p := peers.NewStatus(maxBadResponses, 30)
|
||||
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
// Peer 1
|
||||
pid1 := addPeer(t, p, peers.PeerConnected)
|
||||
p.SetChainState(pid1, &pb.Status{
|
||||
|
49
beacon-chain/p2p/peers/store.go
Normal file
49
beacon-chain/p2p/peers/store.go
Normal file
@ -0,0 +1,49 @@
|
||||
package peers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
)
|
||||
|
||||
// peerDataStore is a container for various peer related data (both protocol and app level).
|
||||
// Container implements RWMutex, so data access can be restricted on the container level. This allows
|
||||
// different components rely on the very same peer map container.
|
||||
type peerDataStore struct {
|
||||
sync.RWMutex
|
||||
ctx context.Context
|
||||
config *peerDataStoreConfig
|
||||
peers map[peer.ID]*peerData
|
||||
}
|
||||
|
||||
// peerDataStoreConfig holds peer store parameters.
|
||||
type peerDataStoreConfig struct {
|
||||
maxPeers int
|
||||
}
|
||||
|
||||
// peerData aggregates protocol and application level info about a single peer.
|
||||
type peerData struct {
|
||||
address ma.Multiaddr
|
||||
direction network.Direction
|
||||
connState PeerConnectionState
|
||||
chainState *pb.Status
|
||||
enr *enr.Record
|
||||
metaData *pb.MetaData
|
||||
chainStateLastUpdated time.Time
|
||||
badResponsesCount int
|
||||
}
|
||||
|
||||
// newPeerDataStore creates peer store.
|
||||
func newPeerDataStore(ctx context.Context, config *peerDataStoreConfig) *peerDataStore {
|
||||
return &peerDataStore{
|
||||
ctx: ctx,
|
||||
config: config,
|
||||
peers: make(map[peer.ID]*peerData),
|
||||
}
|
||||
}
|
@ -149,7 +149,10 @@ func NewService(cfg *Config) (*Service, error) {
|
||||
}
|
||||
s.pubsub = gs
|
||||
|
||||
s.peers = peers.NewStatus(maxBadResponses, int(s.cfg.MaxPeers))
|
||||
s.peers = peers.NewStatus(ctx, &peers.StatusConfig{
|
||||
PeerLimit: int(s.cfg.MaxPeers),
|
||||
MaxBadResponses: maxBadResponses,
|
||||
})
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package testing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
@ -25,7 +26,10 @@ func (m *MockPeersProvider) Peers() *peers.Status {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
if m.peers == nil {
|
||||
m.peers = peers.NewStatus(5, 30)
|
||||
m.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: 5,
|
||||
})
|
||||
// Pretend we are connected to two peers
|
||||
id0, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
|
||||
if err != nil {
|
||||
|
@ -53,12 +53,16 @@ func NewTestP2P(t *testing.T) *TestP2P {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
peerStatuses := peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
MaxBadResponses: 5,
|
||||
})
|
||||
return &TestP2P{
|
||||
t: t,
|
||||
BHost: h,
|
||||
pubsub: ps,
|
||||
joinedTopics: map[string]*pubsub.Topic{},
|
||||
peers: peers.NewStatus(5, 30),
|
||||
peers: peerStatuses,
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user