diff --git a/beacon-chain/p2p/connection_gater_test.go b/beacon-chain/p2p/connection_gater_test.go index 30c56fa68..817914657 100644 --- a/beacon-chain/p2p/connection_gater_test.go +++ b/beacon-chain/p2p/connection_gater_test.go @@ -22,8 +22,10 @@ func TestPeer_AtMaxLimit(t *testing.T) { require.NoError(t, err, "Failed to p2p listen") s := &Service{} s.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 0, - MaxBadResponses: 3, + PeerLimit: 0, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: 3, + }, }) s.cfg = &Config{MaxPeers: 0} s.addrFilter, err = configureFilter(&Config{}) @@ -61,8 +63,10 @@ func TestPeer_BelowMaxLimit(t *testing.T) { require.NoError(t, err, "Failed to p2p listen") s := &Service{} s.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 1, - MaxBadResponses: 3, + PeerLimit: 1, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: 3, + }, }) s.cfg = &Config{MaxPeers: 1} s.addrFilter, err = configureFilter(&Config{}) diff --git a/beacon-chain/p2p/peers/BUILD.bazel b/beacon-chain/p2p/peers/BUILD.bazel index 49709923f..5ae09a952 100644 --- a/beacon-chain/p2p/peers/BUILD.bazel +++ b/beacon-chain/p2p/peers/BUILD.bazel @@ -4,6 +4,8 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test") go_library( name = "go_default_library", srcs = [ + "score_bad_responses.go", + "scorer.go", "status.go", "store.go", ], @@ -27,6 +29,8 @@ go_test( name = "go_default_test", srcs = [ "benchmark_test.go", + "score_bad_responses_test.go", + "scorer_test.go", "status_test.go", ], embed = [":go_default_library"], diff --git a/beacon-chain/p2p/peers/score_bad_responses.go b/beacon-chain/p2p/peers/score_bad_responses.go new file mode 100644 index 000000000..05b58f755 --- /dev/null +++ b/beacon-chain/p2p/peers/score_bad_responses.go @@ -0,0 +1,84 @@ +package peers + +import ( + "github.com/libp2p/go-libp2p-core/peer" +) + +// BadResponsesThreshold returns the maximum number of bad responses a peer can provide before it is considered bad. +func (s *PeerScorer) BadResponsesThreshold() int { + return s.config.BadResponsesThreshold +} + +// BadResponses obtains the number of bad responses we have received from the given remote peer. +func (s *PeerScorer) BadResponses(pid peer.ID) (int, error) { + s.store.RLock() + defer s.store.RUnlock() + return s.badResponses(pid) +} + +// badResponses is a lock-free version of BadResponses. +func (s *PeerScorer) badResponses(pid peer.ID) (int, error) { + if peerData, ok := s.store.peers[pid]; ok { + return peerData.badResponsesCount, nil + } + return -1, ErrPeerUnknown +} + +// IncrementBadResponses increments the number of bad responses we have received from the given remote peer. +// If peer doesn't exist this method is no-op. +func (s *PeerScorer) IncrementBadResponses(pid peer.ID) { + s.store.Lock() + defer s.store.Unlock() + + if _, ok := s.store.peers[pid]; !ok { + s.store.peers[pid] = &peerData{ + badResponsesCount: 1, + } + return + } + s.store.peers[pid].badResponsesCount++ +} + +// IsBadPeer states if the peer is to be considered bad. +// If the peer is unknown this will return `false`, which makes using this function easier than returning an error. +func (s *PeerScorer) IsBadPeer(pid peer.ID) bool { + s.store.RLock() + defer s.store.RUnlock() + return s.isBadPeer(pid) +} + +// isBadPeer is lock-free version of IsBadPeer. +func (s *PeerScorer) isBadPeer(pid peer.ID) bool { + if peerData, ok := s.store.peers[pid]; ok { + return peerData.badResponsesCount >= s.config.BadResponsesThreshold + } + return false +} + +// BadPeers returns the peers that are bad. +func (s *PeerScorer) BadPeers() []peer.ID { + s.store.RLock() + defer s.store.RUnlock() + + badPeers := make([]peer.ID, 0) + for pid := range s.store.peers { + if s.isBadPeer(pid) { + badPeers = append(badPeers, pid) + } + } + return badPeers +} + +// DecayBadResponsesStats reduces the bad responses of all peers, giving reformed peers a chance to join the network. +// This can be run periodically, although note that each time it runs it does give all bad peers another chance as well +// to clog up the network with bad responses, so should not be run too frequently; once an hour would be reasonable. +func (s *PeerScorer) DecayBadResponsesStats() { + s.store.Lock() + defer s.store.Unlock() + + for _, peerData := range s.store.peers { + if peerData.badResponsesCount > 0 { + peerData.badResponsesCount-- + } + } +} diff --git a/beacon-chain/p2p/peers/score_bad_responses_test.go b/beacon-chain/p2p/peers/score_bad_responses_test.go new file mode 100644 index 000000000..8cec41ff0 --- /dev/null +++ b/beacon-chain/p2p/peers/score_bad_responses_test.go @@ -0,0 +1,161 @@ +package peers_test + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers" + "github.com/prysmaticlabs/prysm/shared/testutil/assert" + "github.com/prysmaticlabs/prysm/shared/testutil/require" +) + +func TestPeerScorer_BadResponsesThreshold(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + maxBadResponses := 2 + peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, + }) + scorer := peerStatuses.Scorer() + assert.Equal(t, maxBadResponses, scorer.BadResponsesThreshold()) +} + +func TestPeerScorer_BadResponses(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{}, + }) + scorer := peerStatuses.Scorer() + + pid := peer.ID("peer1") + _, err := scorer.BadResponses(pid) + assert.ErrorContains(t, peers.ErrPeerUnknown.Error(), err) + + peerStatuses.Add(nil, pid, nil, network.DirUnknown) + count, err := scorer.BadResponses(pid) + assert.NoError(t, err) + assert.Equal(t, 0, count) +} + +func TestPeerScorer_decayBadResponsesStats(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + maxBadResponses := 2 + peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + BadResponsesWeight: 1, + BadResponsesDecayInterval: 50 * time.Nanosecond, + }, + }) + scorer := peerStatuses.Scorer() + + // Peer 1 has 0 bad responses. + pid1 := peer.ID("peer1") + peerStatuses.Add(nil, pid1, nil, network.DirUnknown) + badResponsesCount, err := scorer.BadResponses(pid1) + require.NoError(t, err) + assert.Equal(t, 0, badResponsesCount) + + // Peer 2 has 1 bad response. + pid2 := peer.ID("peer2") + peerStatuses.Add(nil, pid2, nil, network.DirUnknown) + scorer.IncrementBadResponses(pid2) + badResponsesCount, err = scorer.BadResponses(pid2) + require.NoError(t, err) + assert.Equal(t, 1, badResponsesCount) + + // Peer 3 has 2 bad response. + pid3 := peer.ID("peer3") + peerStatuses.Add(nil, pid3, nil, network.DirUnknown) + scorer.IncrementBadResponses(pid3) + scorer.IncrementBadResponses(pid3) + badResponsesCount, err = scorer.BadResponses(pid3) + require.NoError(t, err) + assert.Equal(t, 2, badResponsesCount) + + // Decay the values + scorer.DecayBadResponsesStats() + + // Ensure the new values are as expected + badResponsesCount, err = scorer.BadResponses(pid1) + require.NoError(t, err) + assert.Equal(t, 0, badResponsesCount, "unexpected bad responses for pid1") + + badResponsesCount, err = scorer.BadResponses(pid2) + require.NoError(t, err) + assert.Equal(t, 0, badResponsesCount, "unexpected bad responses for pid2") + + badResponsesCount, err = scorer.BadResponses(pid3) + require.NoError(t, err) + assert.Equal(t, 1, badResponsesCount, "unexpected bad responses for pid3") +} + +func TestPeerScorer_IsBadPeer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{}, + }) + scorer := peerStatuses.Scorer() + pid := peer.ID("peer1") + assert.Equal(t, false, scorer.IsBadPeer(pid)) + + peerStatuses.Add(nil, pid, nil, network.DirUnknown) + assert.Equal(t, false, scorer.IsBadPeer(pid)) + + for i := 0; i < peers.DefaultBadResponsesThreshold; i++ { + scorer.IncrementBadResponses(pid) + if i == peers.DefaultBadResponsesThreshold-1 { + assert.Equal(t, true, scorer.IsBadPeer(pid), "Unexpected peer status") + } else { + assert.Equal(t, false, scorer.IsBadPeer(pid), "Unexpected peer status") + } + } +} + +func TestPeerScorer_BadPeers(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{}, + }) + scorer := peerStatuses.Scorer() + pids := []peer.ID{peer.ID("peer1"), peer.ID("peer2"), peer.ID("peer3"), peer.ID("peer4"), peer.ID("peer5")} + for i := 0; i < len(pids); i++ { + peerStatuses.Add(nil, pids[i], nil, network.DirUnknown) + } + for i := 0; i < peers.DefaultBadResponsesThreshold; i++ { + scorer.IncrementBadResponses(pids[1]) + scorer.IncrementBadResponses(pids[2]) + scorer.IncrementBadResponses(pids[4]) + } + assert.Equal(t, false, scorer.IsBadPeer(pids[0]), "Invalid peer status") + assert.Equal(t, true, scorer.IsBadPeer(pids[1]), "Invalid peer status") + assert.Equal(t, true, scorer.IsBadPeer(pids[2]), "Invalid peer status") + assert.Equal(t, false, scorer.IsBadPeer(pids[3]), "Invalid peer status") + assert.Equal(t, true, scorer.IsBadPeer(pids[4]), "Invalid peer status") + want := []peer.ID{pids[1], pids[2], pids[4]} + badPeers := scorer.BadPeers() + sort.Slice(badPeers, func(i, j int) bool { + return badPeers[i] < badPeers[j] + }) + assert.DeepEqual(t, want, badPeers, "Unexpected list of bad peers") +} diff --git a/beacon-chain/p2p/peers/scorer.go b/beacon-chain/p2p/peers/scorer.go new file mode 100644 index 000000000..0850032de --- /dev/null +++ b/beacon-chain/p2p/peers/scorer.go @@ -0,0 +1,92 @@ +package peers + +import ( + "context" + "time" + + "github.com/libp2p/go-libp2p-core/peer" +) + +const ( + // DefaultBadResponsesThreshold defines how many bad responses to tolerate before peer is deemed bad. + DefaultBadResponsesThreshold = 6 + // DefaultBadResponsesWeight is a default weight. Since score represents penalty, it has negative weight. + DefaultBadResponsesWeight = -1.0 + // DefaultBadResponsesDecayInterval defines how often to decay previous statistics. + // Every interval bad responses counter will be decremented by 1. + DefaultBadResponsesDecayInterval = time.Hour +) + +// PeerScorer keeps track of peer counters that are used to calculate peer score. +type PeerScorer struct { + ctx context.Context + config *PeerScorerConfig + store *peerDataStore +} + +// PeerScorerConfig holds configuration parameters for scoring service. +type PeerScorerConfig struct { + // BadResponsesThreshold specifies number of bad responses tolerated, before peer is banned. + BadResponsesThreshold int + BadResponsesWeight float64 + BadResponsesDecayInterval time.Duration +} + +// newPeerScorer provides fully initialized peer scoring service. +func newPeerScorer(ctx context.Context, store *peerDataStore, config *PeerScorerConfig) *PeerScorer { + scorer := &PeerScorer{ + ctx: ctx, + config: config, + store: store, + } + if scorer.config.BadResponsesThreshold == 0 { + scorer.config.BadResponsesThreshold = DefaultBadResponsesThreshold + } + if scorer.config.BadResponsesWeight == 0.0 { + scorer.config.BadResponsesWeight = DefaultBadResponsesWeight + } + if scorer.config.BadResponsesDecayInterval == 0 { + scorer.config.BadResponsesDecayInterval = DefaultBadResponsesDecayInterval + } + + go scorer.loop(scorer.ctx) + + return scorer +} + +// Score returns calculated peer score across all tracked metrics. +func (s *PeerScorer) Score(pid peer.ID) float64 { + s.store.RLock() + defer s.store.RUnlock() + + var score float64 + if _, ok := s.store.peers[pid]; !ok { + return 0 + } + + badResponsesScore := float64(s.store.peers[pid].badResponsesCount) / float64(s.config.BadResponsesThreshold) + badResponsesScore = badResponsesScore * s.config.BadResponsesWeight + score += badResponsesScore + + return score +} + +// Params exposes peer scorer parameters. +func (s *PeerScorer) Params() *PeerScorerConfig { + return s.config +} + +// loop handles background tasks. +func (s *PeerScorer) loop(ctx context.Context) { + decayBadResponsesScores := time.NewTicker(s.config.BadResponsesDecayInterval) + defer decayBadResponsesScores.Stop() + + for { + select { + case <-decayBadResponsesScores.C: + s.DecayBadResponsesStats() + case <-ctx.Done(): + return + } + } +} diff --git a/beacon-chain/p2p/peers/scorer_test.go b/beacon-chain/p2p/peers/scorer_test.go new file mode 100644 index 000000000..61551f327 --- /dev/null +++ b/beacon-chain/p2p/peers/scorer_test.go @@ -0,0 +1,137 @@ +package peers_test + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers" + "github.com/prysmaticlabs/prysm/shared/testutil/assert" +) + +func TestPeerScorer_NewPeerScorer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + t.Run("default config", func(t *testing.T) { + peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{}, + }) + scorer := peerStatuses.Scorer() + assert.Equal(t, peers.DefaultBadResponsesThreshold, scorer.Params().BadResponsesThreshold, "Unexpected threshold value") + assert.Equal(t, peers.DefaultBadResponsesWeight, scorer.Params().BadResponsesWeight, "Unexpected weight value") + assert.Equal(t, peers.DefaultBadResponsesDecayInterval, scorer.Params().BadResponsesDecayInterval, "Unexpected decay interval value") + }) + + t.Run("explicit config", func(t *testing.T) { + peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: 2, + BadResponsesWeight: -1, + BadResponsesDecayInterval: 1 * time.Minute, + }, + }) + scorer := peerStatuses.Scorer() + assert.Equal(t, 2, scorer.Params().BadResponsesThreshold, "Unexpected threshold value") + assert.Equal(t, -1.0, scorer.Params().BadResponsesWeight, "Unexpected weight value") + assert.Equal(t, 1*time.Minute, scorer.Params().BadResponsesDecayInterval, "Unexpected decay interval value") + }) +} + +func TestPeerScorer_Score(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: 5, + BadResponsesWeight: -0.5, + BadResponsesDecayInterval: 50 * time.Millisecond, + }, + }) + scorer := peerStatuses.Scorer() + + sortByScore := func(pids []peer.ID) []peer.ID { + sort.Slice(pids, func(i, j int) bool { + scr1, scr2 := scorer.Score(pids[i]), scorer.Score(pids[j]) + if scr1 == scr2 { + // Sort by peer ID, whenever peers have equal score. + return pids[i] < pids[j] + } + return scr1 > scr2 + }) + return pids + } + + pids := []peer.ID{"peer1", "peer2", "peer3"} + for _, pid := range pids { + peerStatuses.Add(nil, pid, nil, network.DirUnknown) + if score := scorer.Score(pid); score < 0 { + t.Errorf("Unexpected peer score, want: >=0, got: %v", score) + } + } + assert.DeepEqual(t, []peer.ID{"peer1", "peer2", "peer3"}, sortByScore(pids), "Unexpected scores") + + // Update peers' stats and test the effect on peer order. + scorer.IncrementBadResponses("peer2") + assert.DeepEqual(t, []peer.ID{"peer1", "peer3", "peer2"}, sortByScore(pids), "Unexpected scores") + scorer.IncrementBadResponses("peer1") + scorer.IncrementBadResponses("peer1") + assert.DeepEqual(t, []peer.ID{"peer3", "peer2", "peer1"}, sortByScore(pids), "Unexpected scores") + + // See how decaying affects order of peers. + scorer.DecayBadResponsesStats() + assert.DeepEqual(t, []peer.ID{"peer2", "peer3", "peer1"}, sortByScore(pids), "Unexpected scores") + scorer.DecayBadResponsesStats() + assert.DeepEqual(t, []peer.ID{"peer1", "peer2", "peer3"}, sortByScore(pids), "Unexpected scores") +} + +func TestPeerScorer_loop(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: 5, + BadResponsesWeight: -0.5, + BadResponsesDecayInterval: 50 * time.Millisecond, + }, + }) + scorer := peerStatuses.Scorer() + + pid1 := peer.ID("peer1") + peerStatuses.Add(nil, pid1, nil, network.DirUnknown) + for i := 0; i < scorer.Params().BadResponsesThreshold+5; i++ { + scorer.IncrementBadResponses(pid1) + } + assert.Equal(t, true, scorer.IsBadPeer(pid1), "Peer should be marked as bad") + + done := make(chan struct{}, 1) + go func() { + defer func() { + done <- struct{}{} + }() + ticker := time.NewTicker(50 * time.Millisecond) + for { + select { + case <-ticker.C: + if scorer.IsBadPeer(pid1) == false { + return + } + case <-ctx.Done(): + t.Error("Timed out") + return + } + } + }() + + <-done + assert.Equal(t, false, scorer.IsBadPeer(pid1), "Peer should not be marked as bad") +} diff --git a/beacon-chain/p2p/peers/status.go b/beacon-chain/p2p/peers/status.go index fca42565f..bf5c69689 100644 --- a/beacon-chain/p2p/peers/status.go +++ b/beacon-chain/p2p/peers/status.go @@ -51,8 +51,7 @@ const ( PeerConnecting ) -// Additional buffer beyond current peer limit, from which we can store -// the relevant peer statuses. +// Additional buffer beyond current peer limit, from which we can store the relevant peer statuses. const maxLimitBuffer = 150 var ( @@ -63,16 +62,16 @@ var ( // Status is the structure holding the peer status information. type Status struct { ctx context.Context + scorer *PeerScorer store *peerDataStore - config *StatusConfig } // StatusConfig represents peer status service params. type StatusConfig struct { // PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node. PeerLimit int - // MaxBadResponses specifies number of bad responses tolerated, before peer is banned. - MaxBadResponses int + // ScorerParams holds peer scorer configuration params. + ScorerParams *PeerScorerConfig } // NewStatus creates a new status entity. @@ -83,13 +82,13 @@ func NewStatus(ctx context.Context, config *StatusConfig) *Status { return &Status{ ctx: ctx, store: store, - config: config, + scorer: newPeerScorer(ctx, store, config.ScorerParams), } } -// MaxBadResponses returns the maximum number of bad responses a peer can provide before it is considered bad. -func (p *Status) MaxBadResponses() int { - return p.config.MaxBadResponses +// Scorer exposes peer scoring service. +func (p *Status) Scorer() *PeerScorer { + return p.scorer } // MaxPeerLimit returns the max peer limit stored in the current peer store. @@ -282,37 +281,10 @@ func (p *Status) ChainStateLastUpdated(pid peer.ID) (time.Time, error) { return roughtime.Now(), ErrPeerUnknown } -// IncrementBadResponses increments the number of bad responses we have received from the given remote peer. -func (p *Status) IncrementBadResponses(pid peer.ID) { - p.store.Lock() - defer p.store.Unlock() - - peerData := p.fetch(pid) - peerData.badResponsesCount++ -} - -// BadResponses obtains the number of bad responses we have received from the given remote peer. -// This will error if the peer does not exist. -func (p *Status) BadResponses(pid peer.ID) (int, error) { - p.store.RLock() - defer p.store.RUnlock() - - if peerData, ok := p.store.peers[pid]; ok { - return peerData.badResponsesCount, nil - } - return -1, ErrPeerUnknown -} - // IsBad states if the peer is to be considered bad. // If the peer is unknown this will return `false`, which makes using this function easier than returning an error. func (p *Status) IsBad(pid peer.ID) bool { - p.store.RLock() - defer p.store.RUnlock() - - if peerData, ok := p.store.peers[pid]; ok { - return peerData.badResponsesCount >= p.config.MaxBadResponses - } - return false + return p.scorer.IsBadPeer(pid) } // Connecting returns the peers that are connecting. @@ -395,15 +367,7 @@ func (p *Status) Inactive() []peer.ID { // Bad returns the peers that are bad. func (p *Status) Bad() []peer.ID { - p.store.RLock() - defer p.store.RUnlock() - peers := make([]peer.ID, 0) - for pid, peerData := range p.store.peers { - if peerData.badResponsesCount >= p.config.MaxBadResponses { - peers = append(peers, pid) - } - } - return peers + return p.scorer.BadPeers() } // All returns all the peers regardless of state. @@ -417,19 +381,6 @@ func (p *Status) All() []peer.ID { return pids } -// Decay reduces the bad responses of all peers, giving reformed peers a chance to join the network. -// This can be run periodically, although note that each time it runs it does give all bad peers another chance as well to clog up -// the network with bad responses, so should not be run too frequently; once an hour would be reasonable. -func (p *Status) Decay() { - p.store.Lock() - defer p.store.Unlock() - for _, peerData := range p.store.peers { - if peerData.badResponsesCount > 0 { - peerData.badResponsesCount-- - } - } -} - // Prune clears out and removes outdated and disconnected peers. func (p *Status) Prune() { p.store.Lock() @@ -445,10 +396,9 @@ func (p *Status) Prune() { badResp int } peersToPrune := make([]*peerResp, 0) - // Select disconnected peers with a smaller - // bad response count. + // Select disconnected peers with a smaller bad response count. for pid, peerData := range p.store.peers { - if peerData.connState == PeerDisconnected && peerData.badResponsesCount < p.config.MaxBadResponses { + if peerData.connState == PeerDisconnected && !p.scorer.isBadPeer(pid) { peersToPrune = append(peersToPrune, &peerResp{ pid: pid, badResp: p.store.peers[pid].badResponsesCount, @@ -465,7 +415,6 @@ func (p *Status) Prune() { }) limitDiff := len(p.store.peers) - p.store.config.maxPeers - if limitDiff > len(peersToPrune) { limitDiff = len(peersToPrune) } diff --git a/beacon-chain/p2p/peers/status_test.go b/beacon-chain/p2p/peers/status_test.go index 5603dd854..d5775117f 100644 --- a/beacon-chain/p2p/peers/status_test.go +++ b/beacon-chain/p2p/peers/status_test.go @@ -20,18 +20,22 @@ import ( func TestStatus(t *testing.T) { maxBadResponses := 2 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) require.NotNil(t, p, "p not created") - assert.Equal(t, maxBadResponses, p.MaxBadResponses(), "maxBadResponses incorrect value") + assert.Equal(t, maxBadResponses, p.Scorer().BadResponsesThreshold(), "maxBadResponses incorrect value") } func TestPeerExplicitAdd(t *testing.T) { maxBadResponses := 2 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") @@ -67,8 +71,10 @@ func TestPeerExplicitAdd(t *testing.T) { func TestPeerNoENR(t *testing.T) { maxBadResponses := 2 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") @@ -87,8 +93,10 @@ func TestPeerNoENR(t *testing.T) { func TestPeerNoOverwriteENR(t *testing.T) { maxBadResponses := 2 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") @@ -110,8 +118,10 @@ func TestPeerNoOverwriteENR(t *testing.T) { func TestErrUnknownPeer(t *testing.T) { maxBadResponses := 2 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") @@ -132,15 +142,17 @@ func TestErrUnknownPeer(t *testing.T) { _, err = p.ChainStateLastUpdated(id) assert.ErrorContains(t, peers.ErrPeerUnknown.Error(), err) - _, err = p.BadResponses(id) + _, err = p.Scorer().BadResponses(id) assert.ErrorContains(t, peers.ErrPeerUnknown.Error(), err) } func TestPeerCommitteeIndices(t *testing.T) { maxBadResponses := 2 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") @@ -172,8 +184,10 @@ func TestPeerCommitteeIndices(t *testing.T) { func TestPeerSubscribedToSubnet(t *testing.T) { maxBadResponses := 2 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) // Add some peers with different states @@ -212,8 +226,10 @@ func TestPeerSubscribedToSubnet(t *testing.T) { func TestPeerImplicitAdd(t *testing.T) { maxBadResponses := 2 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") @@ -231,8 +247,10 @@ func TestPeerImplicitAdd(t *testing.T) { func TestPeerChainState(t *testing.T) { maxBadResponses := 2 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") @@ -262,8 +280,10 @@ func TestPeerChainState(t *testing.T) { func TestPeerBadResponses(t *testing.T) { maxBadResponses := 2 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") @@ -280,25 +300,25 @@ func TestPeerBadResponses(t *testing.T) { direction := network.DirInbound p.Add(new(enr.Record), id, address, direction) - resBadResponses, err := p.BadResponses(id) + resBadResponses, err := p.Scorer().BadResponses(id) require.NoError(t, err) assert.Equal(t, 0, resBadResponses, "Unexpected bad responses") assert.Equal(t, false, p.IsBad(id), "Peer marked as bad when should be good") - p.IncrementBadResponses(id) - resBadResponses, err = p.BadResponses(id) + p.Scorer().IncrementBadResponses(id) + resBadResponses, err = p.Scorer().BadResponses(id) require.NoError(t, err) assert.Equal(t, 1, resBadResponses, "Unexpected bad responses") assert.Equal(t, false, p.IsBad(id), "Peer marked as bad when should be good") - p.IncrementBadResponses(id) - resBadResponses, err = p.BadResponses(id) + p.Scorer().IncrementBadResponses(id) + resBadResponses, err = p.Scorer().BadResponses(id) require.NoError(t, err) assert.Equal(t, 2, resBadResponses, "Unexpected bad responses") assert.Equal(t, true, p.IsBad(id), "Peer not marked as bad when it should be") - p.IncrementBadResponses(id) - resBadResponses, err = p.BadResponses(id) + p.Scorer().IncrementBadResponses(id) + resBadResponses, err = p.Scorer().BadResponses(id) require.NoError(t, err) assert.Equal(t, 3, resBadResponses, "Unexpected bad responses") assert.Equal(t, true, p.IsBad(id), "Peer not marked as bad when it should be") @@ -307,8 +327,10 @@ func TestPeerBadResponses(t *testing.T) { func TestAddMetaData(t *testing.T) { maxBadResponses := 2 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) // Add some peers with different states @@ -332,8 +354,10 @@ func TestAddMetaData(t *testing.T) { func TestPeerConnectionStatuses(t *testing.T) { maxBadResponses := 2 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) // Add some peers with different states @@ -367,43 +391,13 @@ func TestPeerConnectionStatuses(t *testing.T) { assert.Equal(t, numPeersAll, len(p.All()), "Unexpected number of peers") } -func TestDecay(t *testing.T) { - maxBadResponses := 2 - p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, - }) - - // Peer 1 has 0 bad responses. - pid1 := addPeer(t, p, peers.PeerConnected) - // Peer 2 has 1 bad response. - pid2 := addPeer(t, p, peers.PeerConnected) - p.IncrementBadResponses(pid2) - // Peer 3 has 2 bad response. - pid3 := addPeer(t, p, peers.PeerConnected) - p.IncrementBadResponses(pid3) - p.IncrementBadResponses(pid3) - - // Decay the values - p.Decay() - - // Ensure the new values are as expected - badResponses1, err := p.BadResponses(pid1) - require.NoError(t, err) - assert.Equal(t, 0, badResponses1, "Unexpected bad responses for peer 1") - badResponses2, err := p.BadResponses(pid2) - require.NoError(t, err) - assert.Equal(t, 0, badResponses2, "Unexpected bad responses for peer 2") - badResponses3, err := p.BadResponses(pid3) - require.NoError(t, err) - assert.Equal(t, 1, badResponses3, "Unexpected bad responses for peer 3") -} - func TestPrune(t *testing.T) { maxBadResponses := 2 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) for i := 0; i < p.MaxPeerLimit()+100; i++ { @@ -421,34 +415,36 @@ func TestPrune(t *testing.T) { thirdPID := disPeers[2] // Make first peer a bad peer - p.IncrementBadResponses(firstPID) - p.IncrementBadResponses(firstPID) + p.Scorer().IncrementBadResponses(firstPID) + p.Scorer().IncrementBadResponses(firstPID) // Add bad response for p2. - p.IncrementBadResponses(secondPID) + p.Scorer().IncrementBadResponses(secondPID) // Prune peers p.Prune() // Bad peer is expected to still be kept in handler. - badRes, err := p.BadResponses(firstPID) + badRes, err := p.Scorer().BadResponses(firstPID) assert.NoError(t, err, "error is supposed to be nil") assert.Equal(t, 2, badRes, "Did not get expected amount") // Not so good peer is pruned away so that we can reduce the // total size of the handler. - badRes, err = p.BadResponses(secondPID) + badRes, err = p.Scorer().BadResponses(secondPID) assert.NotNil(t, err, "error is supposed to be not nil") // Last peer has been removed. - badRes, err = p.BadResponses(thirdPID) + badRes, err = p.Scorer().BadResponses(thirdPID) assert.NotNil(t, err, "error is supposed to be not nil") } func TestTrimmedOrderedPeers(t *testing.T) { p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: 1, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: 1, + }, }) expectedTarget := uint64(2) @@ -508,8 +504,10 @@ func TestBestPeer(t *testing.T) { expectedRoot := [32]byte{'t', 'e', 's', 't'} junkRoot := [32]byte{'j', 'u', 'n', 'k'} p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) // Peer 1 @@ -556,8 +554,10 @@ func TestBestFinalized_returnsMaxValue(t *testing.T) { maxBadResponses := 2 maxPeers := 10 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) for i := 0; i <= maxPeers+100; i++ { @@ -575,8 +575,10 @@ func TestBestFinalized_returnsMaxValue(t *testing.T) { func TestStatus_CurrentEpoch(t *testing.T) { maxBadResponses := 2 p := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: maxBadResponses, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + }, }) // Peer 1 pid1 := addPeer(t, p, peers.PeerConnected) diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index e309a9d26..f4a925025 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -150,8 +150,12 @@ func NewService(cfg *Config) (*Service, error) { s.pubsub = gs s.peers = peers.NewStatus(ctx, &peers.StatusConfig{ - PeerLimit: int(s.cfg.MaxPeers), - MaxBadResponses: maxBadResponses, + PeerLimit: int(s.cfg.MaxPeers), + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: maxBadResponses, + BadResponsesWeight: -100, + BadResponsesDecayInterval: time.Hour, + }, }) return s, nil @@ -212,7 +216,6 @@ func (s *Service) Start() { runutil.RunEvery(s.ctx, params.BeaconNetworkConfig().TtfbTimeout, func() { ensurePeerConnections(s.ctx, s.host, peersToWatch...) }) - runutil.RunEvery(s.ctx, time.Hour, s.Peers().Decay) runutil.RunEvery(s.ctx, 30*time.Minute, s.Peers().Prune) runutil.RunEvery(s.ctx, params.BeaconNetworkConfig().RespTimeout, s.updateMetrics) runutil.RunEvery(s.ctx, refreshRate, func() { @@ -394,7 +397,7 @@ func (s *Service) connectWithPeer(info peer.AddrInfo) error { ctx, cancel := context.WithTimeout(s.ctx, maxDialTimeout) defer cancel() if err := s.host.Connect(ctx, info); err != nil { - s.Peers().IncrementBadResponses(info.ID) + s.Peers().Scorer().IncrementBadResponses(info.ID) return err } return nil diff --git a/beacon-chain/p2p/testing/mock_peersprovider.go b/beacon-chain/p2p/testing/mock_peersprovider.go index 2b31a701d..0cc82864d 100644 --- a/beacon-chain/p2p/testing/mock_peersprovider.go +++ b/beacon-chain/p2p/testing/mock_peersprovider.go @@ -27,8 +27,10 @@ func (m *MockPeersProvider) Peers() *peers.Status { defer m.lock.Unlock() if m.peers == nil { m.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: 5, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: 5, + }, }) // Pretend we are connected to two peers id0, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR") diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 5feeef6bf..0cd2d50d0 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -54,8 +54,10 @@ func NewTestP2P(t *testing.T) *TestP2P { } peerStatuses := peers.NewStatus(context.Background(), &peers.StatusConfig{ - PeerLimit: 30, - MaxBadResponses: 5, + PeerLimit: 30, + ScorerParams: &peers.PeerScorerConfig{ + BadResponsesThreshold: 5, + }, }) return &TestP2P{ t: t, diff --git a/beacon-chain/rpc/debug/p2p.go b/beacon-chain/rpc/debug/p2p.go index 412b6b233..884f143d8 100644 --- a/beacon-chain/rpc/debug/p2p.go +++ b/beacon-chain/rpc/debug/p2p.go @@ -77,7 +77,7 @@ func (ds *Server) getPeer(pid peer.ID) (*pbrpc.DebugPeerResponse, error) { if err != nil { return nil, status.Errorf(codes.NotFound, "Requested peer does not exist: %v", err) } - resp, err := peers.BadResponses(pid) + resp, err := peers.Scorer().BadResponses(pid) if err != nil { return nil, status.Errorf(codes.NotFound, "Requested peer does not exist: %v", err) } diff --git a/beacon-chain/sync/rate_limiter.go b/beacon-chain/sync/rate_limiter.go index cb4459b2e..16e2a5ddf 100644 --- a/beacon-chain/sync/rate_limiter.go +++ b/beacon-chain/sync/rate_limiter.go @@ -76,7 +76,7 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error { key := stream.Conn().RemotePeer().String() remaining := collector.Remaining(key) if amt > uint64(remaining) { - l.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + l.p2p.Peers().Scorer().IncrementBadResponses(stream.Conn().RemotePeer()) if l.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { log.Debug("Disconnecting bad peer") defer func() { diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go index fff40d94d..e9972a38e 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go @@ -269,7 +269,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { saveBlocks(req) hook.Reset() - for i := 0; i < p2.Peers().MaxBadResponses(); i++ { + for i := 0; i < p2.Peers().Scorer().BadResponsesThreshold(); i++ { err := sendRequest(p1, p2, r, req, false) assert.ErrorContains(t, rateLimitedError, err) } @@ -308,7 +308,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { // One more request should result in overflow. hook.Reset() - for i := 0; i < p2.Peers().MaxBadResponses(); i++ { + for i := 0; i < p2.Peers().Scorer().BadResponsesThreshold(); i++ { err := sendRequest(p1, p2, r, req, false) assert.ErrorContains(t, rateLimitedError, err) } diff --git a/beacon-chain/sync/rpc_metadata.go b/beacon-chain/sync/rpc_metadata.go index ad77f0fc0..32848e5f1 100644 --- a/beacon-chain/sync/rpc_metadata.go +++ b/beacon-chain/sync/rpc_metadata.go @@ -56,7 +56,7 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.Meta return nil, err } if code != 0 { - s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + s.p2p.Peers().Scorer().IncrementBadResponses(stream.Conn().RemotePeer()) return nil, errors.New(errMsg) } msg := new(pb.MetaData) diff --git a/beacon-chain/sync/rpc_ping.go b/beacon-chain/sync/rpc_ping.go index 7e8152e1a..c60e689f2 100644 --- a/beacon-chain/sync/rpc_ping.go +++ b/beacon-chain/sync/rpc_ping.go @@ -102,7 +102,7 @@ func (s *Service) sendPingRequest(ctx context.Context, id peer.ID) error { s.p2p.Host().Peerstore().RecordLatency(id, roughtime.Now().Sub(currentTime)) if code != 0 { - s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + s.p2p.Peers().Scorer().IncrementBadResponses(stream.Conn().RemotePeer()) return errors.New(errMsg) } msg := new(uint64) @@ -111,7 +111,7 @@ func (s *Service) sendPingRequest(ctx context.Context, id peer.ID) error { } valid, err := s.validateSequenceNum(*msg, stream.Conn().RemotePeer()) if err != nil { - s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + s.p2p.Peers().Scorer().IncrementBadResponses(stream.Conn().RemotePeer()) return err } if valid { diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index f78fd9541..280c3c38a 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -55,7 +55,7 @@ func (s *Service) maintainPeerStatuses() { if roughtime.Now().After(lastUpdated.Add(interval)) { if err := s.reValidatePeer(s.ctx, id); err != nil { log.WithField("peer", id).WithError(err).Error("Failed to revalidate peer") - s.p2p.Peers().IncrementBadResponses(id) + s.p2p.Peers().Scorer().IncrementBadResponses(id) } } }(pid) @@ -137,7 +137,7 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { } if code != 0 { - s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + s.p2p.Peers().Scorer().IncrementBadResponses(stream.Conn().RemotePeer()) return errors.New(errMsg) } @@ -149,7 +149,7 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { err = s.validateStatusMessage(ctx, msg) if err != nil { - s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + s.p2p.Peers().Scorer().IncrementBadResponses(stream.Conn().RemotePeer()) // Disconnect if on a wrong fork. if err == errWrongForkDigestVersion { if err := s.sendGoodByeAndDisconnect(ctx, codeWrongNetwork, stream.Conn().RemotePeer()); err != nil { @@ -216,7 +216,7 @@ func (s *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream return nil default: respCode = responseCodeInvalidRequest - s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + s.p2p.Peers().Scorer().IncrementBadResponses(stream.Conn().RemotePeer()) } originalErr := err diff --git a/beacon-chain/sync/rpc_status_test.go b/beacon-chain/sync/rpc_status_test.go index c68897fda..3b9f26a0d 100644 --- a/beacon-chain/sync/rpc_status_test.go +++ b/beacon-chain/sync/rpc_status_test.go @@ -584,7 +584,7 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) { require.NoError(t, err, "Failed to obtain peer connection state") assert.Equal(t, peers.PeerDisconnected, connectionState, "Expected peer to be disconnected") - badResponses, err := p1.Peers().BadResponses(p2.PeerID()) + badResponses, err := p1.Peers().Scorer().BadResponses(p2.PeerID()) require.NoError(t, err, "Failed to obtain peer connection state") assert.Equal(t, 1, badResponses, "Bad response was not bumped to one") }