Add Gossip Scoring For Peers (#7184)

* add gossip scoring

* fix

* clean up

* remove

* add new topics

* clean up gossip scoring

* clean up

* fix

* gaz

* remove true

* comment better

* remove from dev
This commit is contained in:
Nishant Das 2020-11-12 16:08:07 +08:00 committed by GitHub
parent 79d19ea438
commit 56fd535dd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 212 additions and 1 deletions

View File

@ -12,6 +12,7 @@ go_library(
"discovery.go",
"doc.go",
"fork.go",
"gossip_scoring_params.go",
"gossip_topic_mappings.go",
"handshake.go",
"info.go",
@ -48,6 +49,7 @@ go_library(
"//beacon-chain/p2p/types:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/fileutil:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/iputils:go_default_library",

View File

@ -0,0 +1,185 @@
package p2p
import (
"math"
"strings"
"time"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/params"
)
const (
// beaconBlockWeight specifies the scoring weight that we apply to
// our beacon block topic.
beaconBlockWeight = 0.8
// aggregateWeight specifies the scoring weight that we apply to
// our aggregate topic.
aggregateWeight = 0.5
// attestationTotalWeight specifies the scoring weight that we apply to
// our attestation subnet topic.
attestationTotalWeight = 1
// decayToZero specifies the terminal value that we will use when decaying
// a value.
decayToZero = 0.01
)
func peerScoringParams() (*pubsub.PeerScoreParams, *pubsub.PeerScoreThresholds) {
thresholds := &pubsub.PeerScoreThresholds{
GossipThreshold: -4000,
PublishThreshold: -8000,
GraylistThreshold: -16000,
AcceptPXThreshold: 100,
OpportunisticGraftThreshold: 5,
}
scoreParams := &pubsub.PeerScoreParams{
Topics: make(map[string]*pubsub.TopicScoreParams),
TopicScoreCap: 32.72,
AppSpecificScore: func(p peer.ID) float64 {
return 0
},
AppSpecificWeight: 1,
IPColocationFactorWeight: -35.11,
IPColocationFactorThreshold: 10,
IPColocationFactorWhitelist: nil,
BehaviourPenaltyWeight: -15.92,
BehaviourPenaltyThreshold: 6,
BehaviourPenaltyDecay: scoreDecay(10 * oneEpochDuration()),
DecayInterval: 1 * oneSlotDuration(),
DecayToZero: decayToZero,
RetainScore: 100 * oneEpochDuration(),
}
return scoreParams, thresholds
}
func topicScoreParams(topic string) *pubsub.TopicScoreParams {
switch {
case strings.Contains(topic, "beacon_block"):
return defaultBlockTopicParams()
case strings.Contains(topic, "beacon_aggregate_and_proof"):
return defaultAggregateTopicParams()
case strings.Contains(topic, "beacon_attestation"):
return defaultAggregateSubnetTopicParams()
default:
return nil
}
}
// Based on Ben's tested parameters for lighthouse.
// https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c
func defaultBlockTopicParams() *pubsub.TopicScoreParams {
decayEpoch := time.Duration(5)
blocksPerEpoch := params.BeaconConfig().SlotsPerEpoch
return &pubsub.TopicScoreParams{
TopicWeight: beaconBlockWeight,
TimeInMeshWeight: 0.0324,
TimeInMeshQuantum: 1 * oneSlotDuration(),
TimeInMeshCap: 300,
FirstMessageDeliveriesWeight: 1,
FirstMessageDeliveriesDecay: scoreDecay(20 * oneEpochDuration()),
FirstMessageDeliveriesCap: 23,
MeshMessageDeliveriesWeight: -0.717,
MeshMessageDeliveriesDecay: scoreDecay(decayEpoch * oneEpochDuration()),
MeshMessageDeliveriesCap: float64(blocksPerEpoch * uint64(decayEpoch)),
MeshMessageDeliveriesThreshold: float64(blocksPerEpoch*uint64(decayEpoch)) / 10,
MeshMessageDeliveriesWindow: 2 * time.Second,
MeshMessageDeliveriesActivation: 4 * oneEpochDuration(),
MeshFailurePenaltyWeight: -0.717,
MeshFailurePenaltyDecay: scoreDecay(decayEpoch * oneEpochDuration()),
InvalidMessageDeliveriesWeight: -140.4475,
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
}
}
func defaultAggregateTopicParams() *pubsub.TopicScoreParams {
aggPerEpoch := aggregatorsPerSlot() * params.BeaconConfig().SlotsPerEpoch
return &pubsub.TopicScoreParams{
TopicWeight: aggregateWeight,
TimeInMeshWeight: 0.0324,
TimeInMeshQuantum: 1 * oneSlotDuration(),
TimeInMeshCap: 300,
FirstMessageDeliveriesWeight: 0.128,
FirstMessageDeliveriesDecay: scoreDecay(1 * oneEpochDuration()),
FirstMessageDeliveriesCap: 179,
MeshMessageDeliveriesWeight: -0.064,
MeshMessageDeliveriesDecay: scoreDecay(1 * oneEpochDuration()),
MeshMessageDeliveriesCap: float64(aggPerEpoch),
MeshMessageDeliveriesThreshold: float64(aggPerEpoch / 50),
MeshMessageDeliveriesWindow: 2 * time.Second,
MeshMessageDeliveriesActivation: 32 * oneSlotDuration(),
MeshFailurePenaltyWeight: -0.064,
MeshFailurePenaltyDecay: scoreDecay(1 * oneEpochDuration()),
InvalidMessageDeliveriesWeight: -140.4475,
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
}
}
func defaultAggregateSubnetTopicParams() *pubsub.TopicScoreParams {
topicWeight := attestationTotalWeight / float64(params.BeaconNetworkConfig().AttestationSubnetCount)
subnetWeight := activeValidators() / params.BeaconNetworkConfig().AttestationSubnetCount
minimumWeight := subnetWeight / 50
numPerSlot := time.Duration(subnetWeight / params.BeaconConfig().SlotsPerEpoch)
comsPerSlot := committeeCountPerSlot()
exceedsThreshold := comsPerSlot >= 2*params.BeaconNetworkConfig().AttestationSubnetCount/params.BeaconConfig().SlotsPerEpoch
firstDecay := time.Duration(1)
meshDecay := time.Duration(4)
if exceedsThreshold {
firstDecay = 4
meshDecay = 16
}
return &pubsub.TopicScoreParams{
TopicWeight: topicWeight,
TimeInMeshWeight: 0.0324,
TimeInMeshQuantum: numPerSlot,
TimeInMeshCap: 300,
FirstMessageDeliveriesWeight: 0.955,
FirstMessageDeliveriesDecay: scoreDecay(firstDecay * oneEpochDuration()),
FirstMessageDeliveriesCap: 24,
MeshMessageDeliveriesWeight: -37.55,
MeshMessageDeliveriesDecay: scoreDecay(meshDecay * oneEpochDuration()),
MeshMessageDeliveriesCap: float64(subnetWeight),
MeshMessageDeliveriesThreshold: float64(minimumWeight),
MeshMessageDeliveriesWindow: 2 * time.Second,
MeshMessageDeliveriesActivation: 17 * oneSlotDuration(),
MeshFailurePenaltyWeight: -37.55,
MeshFailurePenaltyDecay: scoreDecay(meshDecay * oneEpochDuration()),
InvalidMessageDeliveriesWeight: -4544,
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
}
}
func oneSlotDuration() time.Duration {
return time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
}
func oneEpochDuration() time.Duration {
return time.Duration(params.BeaconConfig().SlotsPerEpoch) * oneSlotDuration()
}
func scoreDecay(totalDurationDecay time.Duration) float64 {
numOfTimes := totalDurationDecay / oneSlotDuration()
return math.Pow(decayToZero, 1/float64(numOfTimes))
}
// Default to the min-genesis for the current moment, as p2p service
// has no access to the chain service.
func activeValidators() uint64 {
return params.BeaconConfig().MinGenesisActiveValidatorCount
}
func committeeCountPerSlot() uint64 {
// Use a static parameter for now rather than a dynamic one, we can use
// the actual parameter later when we have figured out how to fix a circular
// dependency in service startup order.
return helpers.SlotCommitteeCount(activeValidators())
}
// Uses a very rough gauge for total aggregator size per slot.
func aggregatorsPerSlot() uint64 {
comms := committeeCountPerSlot()
totalAggs := comms * params.BeaconConfig().TargetAggregatorsPerCommittee
return totalAggs
}

View File

@ -5,8 +5,10 @@ import (
"time"
"github.com/golang/snappy"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
)
@ -71,9 +73,24 @@ func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub
if err != nil {
return nil, err
}
if featureconfig.Get().EnablePeerScorer {
scoringParams := topicScoreParams(topic)
if scoringParams != nil {
if err = topicHandle.SetScoreParams(scoringParams); err != nil {
return nil, err
}
}
}
return topicHandle.Subscribe(opts...)
}
// peerInspector will scrape all the relevant scoring data and add it to our
// peer handler.
// TODO(#6043): Add hooks to add in peer inspector to our global peer handler.
func (s *Service) peerInspector(peerMap map[peer.ID]*pubsub.PeerScoreSnapshot) {
// no-op
}
// Content addressable ID function.
//
// ETH2 spec defines the message ID as:

View File

@ -24,6 +24,7 @@ import (
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"go.opencensus.io/trace"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
@ -156,6 +157,13 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
pubsub.WithMessageIdFn(msgIDFunction),
pubsub.WithSubscriptionFilter(s),
}
// Add gossip scoring options.
if featureconfig.Get().EnablePeerScorer {
psOpts = append(
psOpts,
pubsub.WithPeerScore(peerScoringParams()),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute))
}
// Set the pubsub global parameters that we require.
setPubSubParameters()

View File

@ -103,7 +103,6 @@ var (
// devModeFlags holds list of flags that are set when development mode is on.
var devModeFlags = []cli.Flag{
enablePeerScorer,
enableSyncBacktracking,
}