Introduces peer scorer service (#6579)

* introduces peer scorer service
* gazelle
* Merge branch 'master' into peer-scoring-service
* fixes comment
* fix build error
* linter suggestions
* Merge branch 'master' into peer-scoring-service
* updates tests
* Merge branch 'master' into peer-scoring-service
* badresponses scorer tests
* gazelle
* Merge branch 'master' into peer-scoring-service
* Merge branch 'master' into peer-scoring-service
* adds scorer_test
* gazelle
* updates bad response default penalty
* more comments
* Merge branch 'master' into peer-scoring-service
* Merge branch 'master' into peer-scoring-service
* Merge branch 'master' into peer-scoring-service
* Merge refs/heads/master into peer-scoring-service
* Merge refs/heads/master into peer-scoring-service
* Merge refs/heads/master into peer-scoring-service
* Merge refs/heads/master into peer-scoring-service
* Merge refs/heads/master into peer-scoring-service
* Merge refs/heads/master into peer-scoring-service
* Merge refs/heads/master into peer-scoring-service
* Merge refs/heads/master into peer-scoring-service
* introduces peerdatastore into status
* makes sure that commong peer data store is used
* Merge branch 'peer-scoring-service' of github.com:prysmaticlabs/prysm into peer-scoring-service
* linter
* gazelle
* updates tests
* Merge branch 'master' into peer-scoring-service
* fixes tests
* Nishant's suggestions
* Merge refs/heads/master into peer-scoring-service
* Merge refs/heads/master into peer-scoring-service
* Merge refs/heads/master into peer-scoring-service
* Merge refs/heads/master into peer-scoring-service
* Merge refs/heads/master into peer-scoring-service
* Merge refs/heads/master into peer-scoring-service
* Merge refs/heads/master into peer-scoring-service
* Merge branch 'master' into peer-scoring-service
* gofmt
* Nishant's suggestion to use isBadPeer
* Merge refs/heads/master into peer-scoring-service
This commit is contained in:
Victor Farazdagi 2020-07-20 18:12:59 +03:00 committed by GitHub
parent 22141db319
commit f0ffd5af03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 608 additions and 168 deletions

View File

@ -22,8 +22,10 @@ func TestPeer_AtMaxLimit(t *testing.T) {
require.NoError(t, err, "Failed to p2p listen")
s := &Service{}
s.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 0,
MaxBadResponses: 3,
PeerLimit: 0,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: 3,
},
})
s.cfg = &Config{MaxPeers: 0}
s.addrFilter, err = configureFilter(&Config{})
@ -61,8 +63,10 @@ func TestPeer_BelowMaxLimit(t *testing.T) {
require.NoError(t, err, "Failed to p2p listen")
s := &Service{}
s.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 1,
MaxBadResponses: 3,
PeerLimit: 1,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: 3,
},
})
s.cfg = &Config{MaxPeers: 1}
s.addrFilter, err = configureFilter(&Config{})

View File

@ -4,6 +4,8 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")
go_library(
name = "go_default_library",
srcs = [
"score_bad_responses.go",
"scorer.go",
"status.go",
"store.go",
],
@ -27,6 +29,8 @@ go_test(
name = "go_default_test",
srcs = [
"benchmark_test.go",
"score_bad_responses_test.go",
"scorer_test.go",
"status_test.go",
],
embed = [":go_default_library"],

View File

@ -0,0 +1,84 @@
package peers
import (
"github.com/libp2p/go-libp2p-core/peer"
)
// BadResponsesThreshold returns the maximum number of bad responses a peer can provide before it is considered bad.
func (s *PeerScorer) BadResponsesThreshold() int {
return s.config.BadResponsesThreshold
}
// BadResponses obtains the number of bad responses we have received from the given remote peer.
func (s *PeerScorer) BadResponses(pid peer.ID) (int, error) {
s.store.RLock()
defer s.store.RUnlock()
return s.badResponses(pid)
}
// badResponses is a lock-free version of BadResponses.
func (s *PeerScorer) badResponses(pid peer.ID) (int, error) {
if peerData, ok := s.store.peers[pid]; ok {
return peerData.badResponsesCount, nil
}
return -1, ErrPeerUnknown
}
// IncrementBadResponses increments the number of bad responses we have received from the given remote peer.
// If peer doesn't exist this method is no-op.
func (s *PeerScorer) IncrementBadResponses(pid peer.ID) {
s.store.Lock()
defer s.store.Unlock()
if _, ok := s.store.peers[pid]; !ok {
s.store.peers[pid] = &peerData{
badResponsesCount: 1,
}
return
}
s.store.peers[pid].badResponsesCount++
}
// IsBadPeer states if the peer is to be considered bad.
// If the peer is unknown this will return `false`, which makes using this function easier than returning an error.
func (s *PeerScorer) IsBadPeer(pid peer.ID) bool {
s.store.RLock()
defer s.store.RUnlock()
return s.isBadPeer(pid)
}
// isBadPeer is lock-free version of IsBadPeer.
func (s *PeerScorer) isBadPeer(pid peer.ID) bool {
if peerData, ok := s.store.peers[pid]; ok {
return peerData.badResponsesCount >= s.config.BadResponsesThreshold
}
return false
}
// BadPeers returns the peers that are bad.
func (s *PeerScorer) BadPeers() []peer.ID {
s.store.RLock()
defer s.store.RUnlock()
badPeers := make([]peer.ID, 0)
for pid := range s.store.peers {
if s.isBadPeer(pid) {
badPeers = append(badPeers, pid)
}
}
return badPeers
}
// DecayBadResponsesStats reduces the bad responses of all peers, giving reformed peers a chance to join the network.
// This can be run periodically, although note that each time it runs it does give all bad peers another chance as well
// to clog up the network with bad responses, so should not be run too frequently; once an hour would be reasonable.
func (s *PeerScorer) DecayBadResponsesStats() {
s.store.Lock()
defer s.store.Unlock()
for _, peerData := range s.store.peers {
if peerData.badResponsesCount > 0 {
peerData.badResponsesCount--
}
}
}

View File

@ -0,0 +1,161 @@
package peers_test
import (
"context"
"sort"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestPeerScorer_BadResponsesThreshold(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
maxBadResponses := 2
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
scorer := peerStatuses.Scorer()
assert.Equal(t, maxBadResponses, scorer.BadResponsesThreshold())
}
func TestPeerScorer_BadResponses(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{},
})
scorer := peerStatuses.Scorer()
pid := peer.ID("peer1")
_, err := scorer.BadResponses(pid)
assert.ErrorContains(t, peers.ErrPeerUnknown.Error(), err)
peerStatuses.Add(nil, pid, nil, network.DirUnknown)
count, err := scorer.BadResponses(pid)
assert.NoError(t, err)
assert.Equal(t, 0, count)
}
func TestPeerScorer_decayBadResponsesStats(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
maxBadResponses := 2
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
BadResponsesWeight: 1,
BadResponsesDecayInterval: 50 * time.Nanosecond,
},
})
scorer := peerStatuses.Scorer()
// Peer 1 has 0 bad responses.
pid1 := peer.ID("peer1")
peerStatuses.Add(nil, pid1, nil, network.DirUnknown)
badResponsesCount, err := scorer.BadResponses(pid1)
require.NoError(t, err)
assert.Equal(t, 0, badResponsesCount)
// Peer 2 has 1 bad response.
pid2 := peer.ID("peer2")
peerStatuses.Add(nil, pid2, nil, network.DirUnknown)
scorer.IncrementBadResponses(pid2)
badResponsesCount, err = scorer.BadResponses(pid2)
require.NoError(t, err)
assert.Equal(t, 1, badResponsesCount)
// Peer 3 has 2 bad response.
pid3 := peer.ID("peer3")
peerStatuses.Add(nil, pid3, nil, network.DirUnknown)
scorer.IncrementBadResponses(pid3)
scorer.IncrementBadResponses(pid3)
badResponsesCount, err = scorer.BadResponses(pid3)
require.NoError(t, err)
assert.Equal(t, 2, badResponsesCount)
// Decay the values
scorer.DecayBadResponsesStats()
// Ensure the new values are as expected
badResponsesCount, err = scorer.BadResponses(pid1)
require.NoError(t, err)
assert.Equal(t, 0, badResponsesCount, "unexpected bad responses for pid1")
badResponsesCount, err = scorer.BadResponses(pid2)
require.NoError(t, err)
assert.Equal(t, 0, badResponsesCount, "unexpected bad responses for pid2")
badResponsesCount, err = scorer.BadResponses(pid3)
require.NoError(t, err)
assert.Equal(t, 1, badResponsesCount, "unexpected bad responses for pid3")
}
func TestPeerScorer_IsBadPeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{},
})
scorer := peerStatuses.Scorer()
pid := peer.ID("peer1")
assert.Equal(t, false, scorer.IsBadPeer(pid))
peerStatuses.Add(nil, pid, nil, network.DirUnknown)
assert.Equal(t, false, scorer.IsBadPeer(pid))
for i := 0; i < peers.DefaultBadResponsesThreshold; i++ {
scorer.IncrementBadResponses(pid)
if i == peers.DefaultBadResponsesThreshold-1 {
assert.Equal(t, true, scorer.IsBadPeer(pid), "Unexpected peer status")
} else {
assert.Equal(t, false, scorer.IsBadPeer(pid), "Unexpected peer status")
}
}
}
func TestPeerScorer_BadPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{},
})
scorer := peerStatuses.Scorer()
pids := []peer.ID{peer.ID("peer1"), peer.ID("peer2"), peer.ID("peer3"), peer.ID("peer4"), peer.ID("peer5")}
for i := 0; i < len(pids); i++ {
peerStatuses.Add(nil, pids[i], nil, network.DirUnknown)
}
for i := 0; i < peers.DefaultBadResponsesThreshold; i++ {
scorer.IncrementBadResponses(pids[1])
scorer.IncrementBadResponses(pids[2])
scorer.IncrementBadResponses(pids[4])
}
assert.Equal(t, false, scorer.IsBadPeer(pids[0]), "Invalid peer status")
assert.Equal(t, true, scorer.IsBadPeer(pids[1]), "Invalid peer status")
assert.Equal(t, true, scorer.IsBadPeer(pids[2]), "Invalid peer status")
assert.Equal(t, false, scorer.IsBadPeer(pids[3]), "Invalid peer status")
assert.Equal(t, true, scorer.IsBadPeer(pids[4]), "Invalid peer status")
want := []peer.ID{pids[1], pids[2], pids[4]}
badPeers := scorer.BadPeers()
sort.Slice(badPeers, func(i, j int) bool {
return badPeers[i] < badPeers[j]
})
assert.DeepEqual(t, want, badPeers, "Unexpected list of bad peers")
}

View File

@ -0,0 +1,92 @@
package peers
import (
"context"
"time"
"github.com/libp2p/go-libp2p-core/peer"
)
const (
// DefaultBadResponsesThreshold defines how many bad responses to tolerate before peer is deemed bad.
DefaultBadResponsesThreshold = 6
// DefaultBadResponsesWeight is a default weight. Since score represents penalty, it has negative weight.
DefaultBadResponsesWeight = -1.0
// DefaultBadResponsesDecayInterval defines how often to decay previous statistics.
// Every interval bad responses counter will be decremented by 1.
DefaultBadResponsesDecayInterval = time.Hour
)
// PeerScorer keeps track of peer counters that are used to calculate peer score.
type PeerScorer struct {
ctx context.Context
config *PeerScorerConfig
store *peerDataStore
}
// PeerScorerConfig holds configuration parameters for scoring service.
type PeerScorerConfig struct {
// BadResponsesThreshold specifies number of bad responses tolerated, before peer is banned.
BadResponsesThreshold int
BadResponsesWeight float64
BadResponsesDecayInterval time.Duration
}
// newPeerScorer provides fully initialized peer scoring service.
func newPeerScorer(ctx context.Context, store *peerDataStore, config *PeerScorerConfig) *PeerScorer {
scorer := &PeerScorer{
ctx: ctx,
config: config,
store: store,
}
if scorer.config.BadResponsesThreshold == 0 {
scorer.config.BadResponsesThreshold = DefaultBadResponsesThreshold
}
if scorer.config.BadResponsesWeight == 0.0 {
scorer.config.BadResponsesWeight = DefaultBadResponsesWeight
}
if scorer.config.BadResponsesDecayInterval == 0 {
scorer.config.BadResponsesDecayInterval = DefaultBadResponsesDecayInterval
}
go scorer.loop(scorer.ctx)
return scorer
}
// Score returns calculated peer score across all tracked metrics.
func (s *PeerScorer) Score(pid peer.ID) float64 {
s.store.RLock()
defer s.store.RUnlock()
var score float64
if _, ok := s.store.peers[pid]; !ok {
return 0
}
badResponsesScore := float64(s.store.peers[pid].badResponsesCount) / float64(s.config.BadResponsesThreshold)
badResponsesScore = badResponsesScore * s.config.BadResponsesWeight
score += badResponsesScore
return score
}
// Params exposes peer scorer parameters.
func (s *PeerScorer) Params() *PeerScorerConfig {
return s.config
}
// loop handles background tasks.
func (s *PeerScorer) loop(ctx context.Context) {
decayBadResponsesScores := time.NewTicker(s.config.BadResponsesDecayInterval)
defer decayBadResponsesScores.Stop()
for {
select {
case <-decayBadResponsesScores.C:
s.DecayBadResponsesStats()
case <-ctx.Done():
return
}
}
}

View File

@ -0,0 +1,137 @@
package peers_test
import (
"context"
"sort"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
)
func TestPeerScorer_NewPeerScorer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("default config", func(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{},
})
scorer := peerStatuses.Scorer()
assert.Equal(t, peers.DefaultBadResponsesThreshold, scorer.Params().BadResponsesThreshold, "Unexpected threshold value")
assert.Equal(t, peers.DefaultBadResponsesWeight, scorer.Params().BadResponsesWeight, "Unexpected weight value")
assert.Equal(t, peers.DefaultBadResponsesDecayInterval, scorer.Params().BadResponsesDecayInterval, "Unexpected decay interval value")
})
t.Run("explicit config", func(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: 2,
BadResponsesWeight: -1,
BadResponsesDecayInterval: 1 * time.Minute,
},
})
scorer := peerStatuses.Scorer()
assert.Equal(t, 2, scorer.Params().BadResponsesThreshold, "Unexpected threshold value")
assert.Equal(t, -1.0, scorer.Params().BadResponsesWeight, "Unexpected weight value")
assert.Equal(t, 1*time.Minute, scorer.Params().BadResponsesDecayInterval, "Unexpected decay interval value")
})
}
func TestPeerScorer_Score(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: 5,
BadResponsesWeight: -0.5,
BadResponsesDecayInterval: 50 * time.Millisecond,
},
})
scorer := peerStatuses.Scorer()
sortByScore := func(pids []peer.ID) []peer.ID {
sort.Slice(pids, func(i, j int) bool {
scr1, scr2 := scorer.Score(pids[i]), scorer.Score(pids[j])
if scr1 == scr2 {
// Sort by peer ID, whenever peers have equal score.
return pids[i] < pids[j]
}
return scr1 > scr2
})
return pids
}
pids := []peer.ID{"peer1", "peer2", "peer3"}
for _, pid := range pids {
peerStatuses.Add(nil, pid, nil, network.DirUnknown)
if score := scorer.Score(pid); score < 0 {
t.Errorf("Unexpected peer score, want: >=0, got: %v", score)
}
}
assert.DeepEqual(t, []peer.ID{"peer1", "peer2", "peer3"}, sortByScore(pids), "Unexpected scores")
// Update peers' stats and test the effect on peer order.
scorer.IncrementBadResponses("peer2")
assert.DeepEqual(t, []peer.ID{"peer1", "peer3", "peer2"}, sortByScore(pids), "Unexpected scores")
scorer.IncrementBadResponses("peer1")
scorer.IncrementBadResponses("peer1")
assert.DeepEqual(t, []peer.ID{"peer3", "peer2", "peer1"}, sortByScore(pids), "Unexpected scores")
// See how decaying affects order of peers.
scorer.DecayBadResponsesStats()
assert.DeepEqual(t, []peer.ID{"peer2", "peer3", "peer1"}, sortByScore(pids), "Unexpected scores")
scorer.DecayBadResponsesStats()
assert.DeepEqual(t, []peer.ID{"peer1", "peer2", "peer3"}, sortByScore(pids), "Unexpected scores")
}
func TestPeerScorer_loop(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: 5,
BadResponsesWeight: -0.5,
BadResponsesDecayInterval: 50 * time.Millisecond,
},
})
scorer := peerStatuses.Scorer()
pid1 := peer.ID("peer1")
peerStatuses.Add(nil, pid1, nil, network.DirUnknown)
for i := 0; i < scorer.Params().BadResponsesThreshold+5; i++ {
scorer.IncrementBadResponses(pid1)
}
assert.Equal(t, true, scorer.IsBadPeer(pid1), "Peer should be marked as bad")
done := make(chan struct{}, 1)
go func() {
defer func() {
done <- struct{}{}
}()
ticker := time.NewTicker(50 * time.Millisecond)
for {
select {
case <-ticker.C:
if scorer.IsBadPeer(pid1) == false {
return
}
case <-ctx.Done():
t.Error("Timed out")
return
}
}
}()
<-done
assert.Equal(t, false, scorer.IsBadPeer(pid1), "Peer should not be marked as bad")
}

View File

@ -51,8 +51,7 @@ const (
PeerConnecting
)
// Additional buffer beyond current peer limit, from which we can store
// the relevant peer statuses.
// Additional buffer beyond current peer limit, from which we can store the relevant peer statuses.
const maxLimitBuffer = 150
var (
@ -63,16 +62,16 @@ var (
// Status is the structure holding the peer status information.
type Status struct {
ctx context.Context
scorer *PeerScorer
store *peerDataStore
config *StatusConfig
}
// StatusConfig represents peer status service params.
type StatusConfig struct {
// PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node.
PeerLimit int
// MaxBadResponses specifies number of bad responses tolerated, before peer is banned.
MaxBadResponses int
// ScorerParams holds peer scorer configuration params.
ScorerParams *PeerScorerConfig
}
// NewStatus creates a new status entity.
@ -83,13 +82,13 @@ func NewStatus(ctx context.Context, config *StatusConfig) *Status {
return &Status{
ctx: ctx,
store: store,
config: config,
scorer: newPeerScorer(ctx, store, config.ScorerParams),
}
}
// MaxBadResponses returns the maximum number of bad responses a peer can provide before it is considered bad.
func (p *Status) MaxBadResponses() int {
return p.config.MaxBadResponses
// Scorer exposes peer scoring service.
func (p *Status) Scorer() *PeerScorer {
return p.scorer
}
// MaxPeerLimit returns the max peer limit stored in the current peer store.
@ -282,37 +281,10 @@ func (p *Status) ChainStateLastUpdated(pid peer.ID) (time.Time, error) {
return roughtime.Now(), ErrPeerUnknown
}
// IncrementBadResponses increments the number of bad responses we have received from the given remote peer.
func (p *Status) IncrementBadResponses(pid peer.ID) {
p.store.Lock()
defer p.store.Unlock()
peerData := p.fetch(pid)
peerData.badResponsesCount++
}
// BadResponses obtains the number of bad responses we have received from the given remote peer.
// This will error if the peer does not exist.
func (p *Status) BadResponses(pid peer.ID) (int, error) {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.peers[pid]; ok {
return peerData.badResponsesCount, nil
}
return -1, ErrPeerUnknown
}
// IsBad states if the peer is to be considered bad.
// If the peer is unknown this will return `false`, which makes using this function easier than returning an error.
func (p *Status) IsBad(pid peer.ID) bool {
p.store.RLock()
defer p.store.RUnlock()
if peerData, ok := p.store.peers[pid]; ok {
return peerData.badResponsesCount >= p.config.MaxBadResponses
}
return false
return p.scorer.IsBadPeer(pid)
}
// Connecting returns the peers that are connecting.
@ -395,15 +367,7 @@ func (p *Status) Inactive() []peer.ID {
// Bad returns the peers that are bad.
func (p *Status) Bad() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.peers {
if peerData.badResponsesCount >= p.config.MaxBadResponses {
peers = append(peers, pid)
}
}
return peers
return p.scorer.BadPeers()
}
// All returns all the peers regardless of state.
@ -417,19 +381,6 @@ func (p *Status) All() []peer.ID {
return pids
}
// Decay reduces the bad responses of all peers, giving reformed peers a chance to join the network.
// This can be run periodically, although note that each time it runs it does give all bad peers another chance as well to clog up
// the network with bad responses, so should not be run too frequently; once an hour would be reasonable.
func (p *Status) Decay() {
p.store.Lock()
defer p.store.Unlock()
for _, peerData := range p.store.peers {
if peerData.badResponsesCount > 0 {
peerData.badResponsesCount--
}
}
}
// Prune clears out and removes outdated and disconnected peers.
func (p *Status) Prune() {
p.store.Lock()
@ -445,10 +396,9 @@ func (p *Status) Prune() {
badResp int
}
peersToPrune := make([]*peerResp, 0)
// Select disconnected peers with a smaller
// bad response count.
// Select disconnected peers with a smaller bad response count.
for pid, peerData := range p.store.peers {
if peerData.connState == PeerDisconnected && peerData.badResponsesCount < p.config.MaxBadResponses {
if peerData.connState == PeerDisconnected && !p.scorer.isBadPeer(pid) {
peersToPrune = append(peersToPrune, &peerResp{
pid: pid,
badResp: p.store.peers[pid].badResponsesCount,
@ -465,7 +415,6 @@ func (p *Status) Prune() {
})
limitDiff := len(p.store.peers) - p.store.config.maxPeers
if limitDiff > len(peersToPrune) {
limitDiff = len(peersToPrune)
}

View File

@ -20,18 +20,22 @@ import (
func TestStatus(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
require.NotNil(t, p, "p not created")
assert.Equal(t, maxBadResponses, p.MaxBadResponses(), "maxBadResponses incorrect value")
assert.Equal(t, maxBadResponses, p.Scorer().BadResponsesThreshold(), "maxBadResponses incorrect value")
}
func TestPeerExplicitAdd(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
@ -67,8 +71,10 @@ func TestPeerExplicitAdd(t *testing.T) {
func TestPeerNoENR(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
@ -87,8 +93,10 @@ func TestPeerNoENR(t *testing.T) {
func TestPeerNoOverwriteENR(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
@ -110,8 +118,10 @@ func TestPeerNoOverwriteENR(t *testing.T) {
func TestErrUnknownPeer(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
@ -132,15 +142,17 @@ func TestErrUnknownPeer(t *testing.T) {
_, err = p.ChainStateLastUpdated(id)
assert.ErrorContains(t, peers.ErrPeerUnknown.Error(), err)
_, err = p.BadResponses(id)
_, err = p.Scorer().BadResponses(id)
assert.ErrorContains(t, peers.ErrPeerUnknown.Error(), err)
}
func TestPeerCommitteeIndices(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
@ -172,8 +184,10 @@ func TestPeerCommitteeIndices(t *testing.T) {
func TestPeerSubscribedToSubnet(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
// Add some peers with different states
@ -212,8 +226,10 @@ func TestPeerSubscribedToSubnet(t *testing.T) {
func TestPeerImplicitAdd(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
@ -231,8 +247,10 @@ func TestPeerImplicitAdd(t *testing.T) {
func TestPeerChainState(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
@ -262,8 +280,10 @@ func TestPeerChainState(t *testing.T) {
func TestPeerBadResponses(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
@ -280,25 +300,25 @@ func TestPeerBadResponses(t *testing.T) {
direction := network.DirInbound
p.Add(new(enr.Record), id, address, direction)
resBadResponses, err := p.BadResponses(id)
resBadResponses, err := p.Scorer().BadResponses(id)
require.NoError(t, err)
assert.Equal(t, 0, resBadResponses, "Unexpected bad responses")
assert.Equal(t, false, p.IsBad(id), "Peer marked as bad when should be good")
p.IncrementBadResponses(id)
resBadResponses, err = p.BadResponses(id)
p.Scorer().IncrementBadResponses(id)
resBadResponses, err = p.Scorer().BadResponses(id)
require.NoError(t, err)
assert.Equal(t, 1, resBadResponses, "Unexpected bad responses")
assert.Equal(t, false, p.IsBad(id), "Peer marked as bad when should be good")
p.IncrementBadResponses(id)
resBadResponses, err = p.BadResponses(id)
p.Scorer().IncrementBadResponses(id)
resBadResponses, err = p.Scorer().BadResponses(id)
require.NoError(t, err)
assert.Equal(t, 2, resBadResponses, "Unexpected bad responses")
assert.Equal(t, true, p.IsBad(id), "Peer not marked as bad when it should be")
p.IncrementBadResponses(id)
resBadResponses, err = p.BadResponses(id)
p.Scorer().IncrementBadResponses(id)
resBadResponses, err = p.Scorer().BadResponses(id)
require.NoError(t, err)
assert.Equal(t, 3, resBadResponses, "Unexpected bad responses")
assert.Equal(t, true, p.IsBad(id), "Peer not marked as bad when it should be")
@ -307,8 +327,10 @@ func TestPeerBadResponses(t *testing.T) {
func TestAddMetaData(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
// Add some peers with different states
@ -332,8 +354,10 @@ func TestAddMetaData(t *testing.T) {
func TestPeerConnectionStatuses(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
// Add some peers with different states
@ -367,43 +391,13 @@ func TestPeerConnectionStatuses(t *testing.T) {
assert.Equal(t, numPeersAll, len(p.All()), "Unexpected number of peers")
}
func TestDecay(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
})
// Peer 1 has 0 bad responses.
pid1 := addPeer(t, p, peers.PeerConnected)
// Peer 2 has 1 bad response.
pid2 := addPeer(t, p, peers.PeerConnected)
p.IncrementBadResponses(pid2)
// Peer 3 has 2 bad response.
pid3 := addPeer(t, p, peers.PeerConnected)
p.IncrementBadResponses(pid3)
p.IncrementBadResponses(pid3)
// Decay the values
p.Decay()
// Ensure the new values are as expected
badResponses1, err := p.BadResponses(pid1)
require.NoError(t, err)
assert.Equal(t, 0, badResponses1, "Unexpected bad responses for peer 1")
badResponses2, err := p.BadResponses(pid2)
require.NoError(t, err)
assert.Equal(t, 0, badResponses2, "Unexpected bad responses for peer 2")
badResponses3, err := p.BadResponses(pid3)
require.NoError(t, err)
assert.Equal(t, 1, badResponses3, "Unexpected bad responses for peer 3")
}
func TestPrune(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
for i := 0; i < p.MaxPeerLimit()+100; i++ {
@ -421,34 +415,36 @@ func TestPrune(t *testing.T) {
thirdPID := disPeers[2]
// Make first peer a bad peer
p.IncrementBadResponses(firstPID)
p.IncrementBadResponses(firstPID)
p.Scorer().IncrementBadResponses(firstPID)
p.Scorer().IncrementBadResponses(firstPID)
// Add bad response for p2.
p.IncrementBadResponses(secondPID)
p.Scorer().IncrementBadResponses(secondPID)
// Prune peers
p.Prune()
// Bad peer is expected to still be kept in handler.
badRes, err := p.BadResponses(firstPID)
badRes, err := p.Scorer().BadResponses(firstPID)
assert.NoError(t, err, "error is supposed to be nil")
assert.Equal(t, 2, badRes, "Did not get expected amount")
// Not so good peer is pruned away so that we can reduce the
// total size of the handler.
badRes, err = p.BadResponses(secondPID)
badRes, err = p.Scorer().BadResponses(secondPID)
assert.NotNil(t, err, "error is supposed to be not nil")
// Last peer has been removed.
badRes, err = p.BadResponses(thirdPID)
badRes, err = p.Scorer().BadResponses(thirdPID)
assert.NotNil(t, err, "error is supposed to be not nil")
}
func TestTrimmedOrderedPeers(t *testing.T) {
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: 1,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: 1,
},
})
expectedTarget := uint64(2)
@ -508,8 +504,10 @@ func TestBestPeer(t *testing.T) {
expectedRoot := [32]byte{'t', 'e', 's', 't'}
junkRoot := [32]byte{'j', 'u', 'n', 'k'}
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
// Peer 1
@ -556,8 +554,10 @@ func TestBestFinalized_returnsMaxValue(t *testing.T) {
maxBadResponses := 2
maxPeers := 10
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
for i := 0; i <= maxPeers+100; i++ {
@ -575,8 +575,10 @@ func TestBestFinalized_returnsMaxValue(t *testing.T) {
func TestStatus_CurrentEpoch(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: maxBadResponses,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
},
})
// Peer 1
pid1 := addPeer(t, p, peers.PeerConnected)

View File

@ -150,8 +150,12 @@ func NewService(cfg *Config) (*Service, error) {
s.pubsub = gs
s.peers = peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: int(s.cfg.MaxPeers),
MaxBadResponses: maxBadResponses,
PeerLimit: int(s.cfg.MaxPeers),
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: maxBadResponses,
BadResponsesWeight: -100,
BadResponsesDecayInterval: time.Hour,
},
})
return s, nil
@ -212,7 +216,6 @@ func (s *Service) Start() {
runutil.RunEvery(s.ctx, params.BeaconNetworkConfig().TtfbTimeout, func() {
ensurePeerConnections(s.ctx, s.host, peersToWatch...)
})
runutil.RunEvery(s.ctx, time.Hour, s.Peers().Decay)
runutil.RunEvery(s.ctx, 30*time.Minute, s.Peers().Prune)
runutil.RunEvery(s.ctx, params.BeaconNetworkConfig().RespTimeout, s.updateMetrics)
runutil.RunEvery(s.ctx, refreshRate, func() {
@ -394,7 +397,7 @@ func (s *Service) connectWithPeer(info peer.AddrInfo) error {
ctx, cancel := context.WithTimeout(s.ctx, maxDialTimeout)
defer cancel()
if err := s.host.Connect(ctx, info); err != nil {
s.Peers().IncrementBadResponses(info.ID)
s.Peers().Scorer().IncrementBadResponses(info.ID)
return err
}
return nil

View File

@ -27,8 +27,10 @@ func (m *MockPeersProvider) Peers() *peers.Status {
defer m.lock.Unlock()
if m.peers == nil {
m.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: 5,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: 5,
},
})
// Pretend we are connected to two peers
id0, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")

View File

@ -54,8 +54,10 @@ func NewTestP2P(t *testing.T) *TestP2P {
}
peerStatuses := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
MaxBadResponses: 5,
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BadResponsesThreshold: 5,
},
})
return &TestP2P{
t: t,

View File

@ -77,7 +77,7 @@ func (ds *Server) getPeer(pid peer.ID) (*pbrpc.DebugPeerResponse, error) {
if err != nil {
return nil, status.Errorf(codes.NotFound, "Requested peer does not exist: %v", err)
}
resp, err := peers.BadResponses(pid)
resp, err := peers.Scorer().BadResponses(pid)
if err != nil {
return nil, status.Errorf(codes.NotFound, "Requested peer does not exist: %v", err)
}

View File

@ -76,7 +76,7 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error {
key := stream.Conn().RemotePeer().String()
remaining := collector.Remaining(key)
if amt > uint64(remaining) {
l.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
l.p2p.Peers().Scorer().IncrementBadResponses(stream.Conn().RemotePeer())
if l.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
log.Debug("Disconnecting bad peer")
defer func() {

View File

@ -269,7 +269,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
saveBlocks(req)
hook.Reset()
for i := 0; i < p2.Peers().MaxBadResponses(); i++ {
for i := 0; i < p2.Peers().Scorer().BadResponsesThreshold(); i++ {
err := sendRequest(p1, p2, r, req, false)
assert.ErrorContains(t, rateLimitedError, err)
}
@ -308,7 +308,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
// One more request should result in overflow.
hook.Reset()
for i := 0; i < p2.Peers().MaxBadResponses(); i++ {
for i := 0; i < p2.Peers().Scorer().BadResponsesThreshold(); i++ {
err := sendRequest(p1, p2, r, req, false)
assert.ErrorContains(t, rateLimitedError, err)
}

View File

@ -56,7 +56,7 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.Meta
return nil, err
}
if code != 0 {
s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
s.p2p.Peers().Scorer().IncrementBadResponses(stream.Conn().RemotePeer())
return nil, errors.New(errMsg)
}
msg := new(pb.MetaData)

View File

@ -102,7 +102,7 @@ func (s *Service) sendPingRequest(ctx context.Context, id peer.ID) error {
s.p2p.Host().Peerstore().RecordLatency(id, roughtime.Now().Sub(currentTime))
if code != 0 {
s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
s.p2p.Peers().Scorer().IncrementBadResponses(stream.Conn().RemotePeer())
return errors.New(errMsg)
}
msg := new(uint64)
@ -111,7 +111,7 @@ func (s *Service) sendPingRequest(ctx context.Context, id peer.ID) error {
}
valid, err := s.validateSequenceNum(*msg, stream.Conn().RemotePeer())
if err != nil {
s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
s.p2p.Peers().Scorer().IncrementBadResponses(stream.Conn().RemotePeer())
return err
}
if valid {

View File

@ -55,7 +55,7 @@ func (s *Service) maintainPeerStatuses() {
if roughtime.Now().After(lastUpdated.Add(interval)) {
if err := s.reValidatePeer(s.ctx, id); err != nil {
log.WithField("peer", id).WithError(err).Error("Failed to revalidate peer")
s.p2p.Peers().IncrementBadResponses(id)
s.p2p.Peers().Scorer().IncrementBadResponses(id)
}
}
}(pid)
@ -137,7 +137,7 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
}
if code != 0 {
s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
s.p2p.Peers().Scorer().IncrementBadResponses(stream.Conn().RemotePeer())
return errors.New(errMsg)
}
@ -149,7 +149,7 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
err = s.validateStatusMessage(ctx, msg)
if err != nil {
s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
s.p2p.Peers().Scorer().IncrementBadResponses(stream.Conn().RemotePeer())
// Disconnect if on a wrong fork.
if err == errWrongForkDigestVersion {
if err := s.sendGoodByeAndDisconnect(ctx, codeWrongNetwork, stream.Conn().RemotePeer()); err != nil {
@ -216,7 +216,7 @@ func (s *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream
return nil
default:
respCode = responseCodeInvalidRequest
s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
s.p2p.Peers().Scorer().IncrementBadResponses(stream.Conn().RemotePeer())
}
originalErr := err

View File

@ -584,7 +584,7 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) {
require.NoError(t, err, "Failed to obtain peer connection state")
assert.Equal(t, peers.PeerDisconnected, connectionState, "Expected peer to be disconnected")
badResponses, err := p1.Peers().BadResponses(p2.PeerID())
badResponses, err := p1.Peers().Scorer().BadResponses(p2.PeerID())
require.NoError(t, err, "Failed to obtain peer connection state")
assert.Equal(t, 1, badResponses, "Bad response was not bumped to one")
}