mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-22 19:40:37 +00:00
Add Pubsub Metrics Tracer (#12178)
* add tracer * gaz * preston's review * preston's review
This commit is contained in:
parent
797cc360c7
commit
76c729f9fa
@ -24,6 +24,7 @@ go_library(
|
||||
"options.go",
|
||||
"pubsub.go",
|
||||
"pubsub_filter.go",
|
||||
"pubsub_tracer.go",
|
||||
"rpc_topic_mappings.go",
|
||||
"sender.go",
|
||||
"service.go",
|
||||
@ -84,6 +85,7 @@ go_library(
|
||||
"@com_github_libp2p_go_libp2p//core/host:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/peerstore:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/protocol:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//p2p/muxer/mplex:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//p2p/security/noise:go_default_library",
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
@ -63,6 +64,80 @@ var (
|
||||
Name: "p2p_sync_committee_subnet_attempted_broadcasts",
|
||||
Help: "The number of sync committee that were attempted to be broadcast.",
|
||||
})
|
||||
|
||||
// Gossip Tracer Metrics
|
||||
pubsubTopicsActive = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "p2p_pubsub_topic_active",
|
||||
Help: "The topics that the peer is participating in gossipsub.",
|
||||
},
|
||||
[]string{"topic"})
|
||||
pubsubTopicsGraft = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "p2p_pubsub_graft_total",
|
||||
Help: "The number of graft messages sent for a particular topic",
|
||||
},
|
||||
[]string{"topic"})
|
||||
pubsubTopicsPrune = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "p2p_pubsub_prune_total",
|
||||
Help: "The number of prune messages sent for a particular topic",
|
||||
},
|
||||
[]string{"topic"})
|
||||
pubsubMessageDeliver = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "p2p_pubsub_deliver_total",
|
||||
Help: "The number of messages received for delivery of a particular topic",
|
||||
},
|
||||
[]string{"topic"})
|
||||
pubsubMessageUndeliverable = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "p2p_pubsub_undeliverable_total",
|
||||
Help: "The number of messages received which weren't able to be delivered of a particular topic",
|
||||
},
|
||||
[]string{"topic"})
|
||||
pubsubMessageValidate = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "p2p_pubsub_validate_total",
|
||||
Help: "The number of messages received for validation of a particular topic",
|
||||
},
|
||||
[]string{"topic"})
|
||||
pubsubMessageDuplicate = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "p2p_pubsub_duplicate_total",
|
||||
Help: "The number of duplicate messages sent for a particular topic",
|
||||
},
|
||||
[]string{"topic"})
|
||||
pubsubMessageReject = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "p2p_pubsub_reject_total",
|
||||
Help: "The number of messages rejected of a particular topic",
|
||||
},
|
||||
[]string{"topic"})
|
||||
pubsubPeerThrottle = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "p2p_pubsub_throttle_total",
|
||||
Help: "The number of times a peer has been throttled for a particular topic",
|
||||
},
|
||||
[]string{"topic"})
|
||||
pubsubRPCRecv = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "p2p_pubsub_rpc_recv_total",
|
||||
Help: "The number of messages received via rpc for a particular topic",
|
||||
},
|
||||
[]string{"control_message"})
|
||||
pubsubRPCSubRecv = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "p2p_pubsub_rpc_recv_sub_total",
|
||||
Help: "The number of subscription messages received via rpc",
|
||||
})
|
||||
pubsubRPCDrop = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "p2p_pubsub_rpc_drop_total",
|
||||
Help: "The number of messages dropped via rpc for a particular topic",
|
||||
},
|
||||
[]string{"control_message"})
|
||||
pubsubRPCSubDrop = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "p2p_pubsub_rpc_drop_sub_total",
|
||||
Help: "The number of subscription messages dropped via rpc",
|
||||
})
|
||||
pubsubRPCSent = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "p2p_pubsub_rpc_sent_total",
|
||||
Help: "The number of messages sent via rpc for a particular topic",
|
||||
},
|
||||
[]string{"control_message"})
|
||||
pubsubRPCSubSent = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "p2p_pubsub_rpc_sent_sub_total",
|
||||
Help: "The number of subscription messages sent via rpc",
|
||||
})
|
||||
)
|
||||
|
||||
func (s *Service) updateMetrics() {
|
||||
@ -84,20 +159,7 @@ func (s *Service) updateMetrics() {
|
||||
continue
|
||||
}
|
||||
|
||||
// Get the agent data.
|
||||
rawAgent, err := store.Get(pid, "AgentVersion")
|
||||
agent, ok := rawAgent.(string)
|
||||
if err != nil || !ok {
|
||||
agent = "unknown"
|
||||
}
|
||||
foundName := "unknown"
|
||||
for _, knownAgent := range knownAgentVersions {
|
||||
// If the agent string matches one of our known agents, we set
|
||||
// the value to our own, sanitized string.
|
||||
if strings.Contains(strings.ToLower(agent), knownAgent) {
|
||||
foundName = knownAgent
|
||||
}
|
||||
}
|
||||
foundName := agentFromPid(pid, store)
|
||||
numConnectedPeersByClient[foundName] += 1
|
||||
|
||||
// Get peer scoring data.
|
||||
@ -123,3 +185,21 @@ func average(xs []float64) float64 {
|
||||
}
|
||||
return total / float64(len(xs))
|
||||
}
|
||||
|
||||
func agentFromPid(pid peer.ID, store peerstore.Peerstore) string {
|
||||
// Get the agent data.
|
||||
rawAgent, err := store.Get(pid, "AgentVersion")
|
||||
agent, ok := rawAgent.(string)
|
||||
if err != nil || !ok {
|
||||
return "unknown"
|
||||
}
|
||||
foundName := "unknown"
|
||||
for _, knownAgent := range knownAgentVersions {
|
||||
// If the agent string matches one of our known agents, we set
|
||||
// the value to our own, sanitized string.
|
||||
if strings.Contains(strings.ToLower(agent), knownAgent) {
|
||||
foundName = knownAgent
|
||||
}
|
||||
}
|
||||
return foundName
|
||||
}
|
||||
|
@ -145,6 +145,7 @@ func (s *Service) pubsubOptions() []pubsub.Option {
|
||||
pubsub.WithPeerScore(peerScoringParams()),
|
||||
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute),
|
||||
pubsub.WithGossipSubParams(pubsubGossipParam()),
|
||||
pubsub.WithRawTracer(gossipTracer{host: s.host}),
|
||||
}
|
||||
return psOpts
|
||||
}
|
||||
|
103
beacon-chain/p2p/pubsub_tracer.go
Normal file
103
beacon-chain/p2p/pubsub_tracer.go
Normal file
@ -0,0 +1,103 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
var _ = pubsub.RawTracer(gossipTracer{})
|
||||
|
||||
// This tracer is used to implement metrics collection for messages received
|
||||
// and broadcasted through gossipsub.
|
||||
type gossipTracer struct {
|
||||
host host.Host
|
||||
}
|
||||
|
||||
// AddPeer .
|
||||
func (g gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
// RemovePeer .
|
||||
func (g gossipTracer) RemovePeer(p peer.ID) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
// Join .
|
||||
func (g gossipTracer) Join(topic string) {
|
||||
pubsubTopicsActive.WithLabelValues(topic).Set(1)
|
||||
}
|
||||
|
||||
// Leave .
|
||||
func (g gossipTracer) Leave(topic string) {
|
||||
pubsubTopicsActive.WithLabelValues(topic).Set(0)
|
||||
}
|
||||
|
||||
// Graft .
|
||||
func (g gossipTracer) Graft(p peer.ID, topic string) {
|
||||
pubsubTopicsGraft.WithLabelValues(topic).Inc()
|
||||
}
|
||||
|
||||
// Prune .
|
||||
func (g gossipTracer) Prune(p peer.ID, topic string) {
|
||||
pubsubTopicsPrune.WithLabelValues(topic).Inc()
|
||||
}
|
||||
|
||||
// ValidateMessage .
|
||||
func (g gossipTracer) ValidateMessage(msg *pubsub.Message) {
|
||||
pubsubMessageValidate.WithLabelValues(*msg.Topic).Inc()
|
||||
}
|
||||
|
||||
// DeliverMessage .
|
||||
func (g gossipTracer) DeliverMessage(msg *pubsub.Message) {
|
||||
pubsubMessageDeliver.WithLabelValues(*msg.Topic).Inc()
|
||||
}
|
||||
|
||||
// RejectMessage .
|
||||
func (g gossipTracer) RejectMessage(msg *pubsub.Message, reason string) {
|
||||
pubsubMessageReject.WithLabelValues(*msg.Topic).Inc()
|
||||
}
|
||||
|
||||
// DuplicateMessage .
|
||||
func (g gossipTracer) DuplicateMessage(msg *pubsub.Message) {
|
||||
pubsubMessageDuplicate.WithLabelValues(*msg.Topic).Inc()
|
||||
}
|
||||
|
||||
// UndeliverableMessage .
|
||||
func (g gossipTracer) UndeliverableMessage(msg *pubsub.Message) {
|
||||
pubsubMessageUndeliverable.WithLabelValues(*msg.Topic).Inc()
|
||||
}
|
||||
|
||||
// ThrottlePeer .
|
||||
func (g gossipTracer) ThrottlePeer(p peer.ID) {
|
||||
agent := agentFromPid(p, g.host.Peerstore())
|
||||
pubsubPeerThrottle.WithLabelValues(agent).Inc()
|
||||
}
|
||||
|
||||
// RecvRPC .
|
||||
func (g gossipTracer) RecvRPC(rpc *pubsub.RPC) {
|
||||
setMetricFromRPC(pubsubRPCSubRecv, pubsubRPCRecv, rpc)
|
||||
}
|
||||
|
||||
// SendRPC .
|
||||
func (g gossipTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {
|
||||
setMetricFromRPC(pubsubRPCSubSent, pubsubRPCSent, rpc)
|
||||
}
|
||||
|
||||
// DropRPC .
|
||||
func (g gossipTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {
|
||||
setMetricFromRPC(pubsubRPCSubDrop, pubsubRPCDrop, rpc)
|
||||
}
|
||||
|
||||
func setMetricFromRPC(ctr prometheus.Counter, gauge *prometheus.CounterVec, rpc *pubsub.RPC) {
|
||||
ctr.Add(float64(len(rpc.Subscriptions)))
|
||||
if rpc.Control != nil {
|
||||
gauge.WithLabelValues("graft").Add(float64(len(rpc.Control.Graft)))
|
||||
gauge.WithLabelValues("prune").Add(float64(len(rpc.Control.Prune)))
|
||||
gauge.WithLabelValues("ihave").Add(float64(len(rpc.Control.Ihave)))
|
||||
gauge.WithLabelValues("iwant").Add(float64(len(rpc.Control.Iwant)))
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user