From ec59be2261f74defbecb7472819b06a102d50e43 Mon Sep 17 00:00:00 2001 From: Dmytro Date: Mon, 23 Oct 2023 17:33:08 +0300 Subject: [PATCH] Dvovk/sentinel and sentry peers data collect (#8533) --- cl/sentinel/service/service.go | 39 ++++++++ cmd/sentry/sentry/sentry_grpc_server.go | 12 +++ diagnostics/peers.go | 127 +++++++++++++++++++----- erigon-lib/diagnostics/entities.go | 26 +++++ erigon-lib/direct/sentinel_client.go | 10 ++ eth/backend.go | 20 ++++ p2p/peer.go | 7 +- 7 files changed, 215 insertions(+), 26 deletions(-) create mode 100644 erigon-lib/diagnostics/entities.go diff --git a/cl/sentinel/service/service.go b/cl/sentinel/service/service.go index d9344ae70..c3c31fa40 100644 --- a/cl/sentinel/service/service.go +++ b/cl/sentinel/service/service.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/ledgerwatch/erigon-lib/diagnostics" "github.com/ledgerwatch/erigon/cl/sentinel" "github.com/ledgerwatch/erigon/cl/sentinel/httpreqresp" "github.com/ledgerwatch/erigon/cl/sentinel/peers" @@ -34,6 +35,8 @@ type SentinelServer struct { mu sync.RWMutex logger log.Logger + + peerStatistics map[string]*diagnostics.PeerStatistics } func NewSentinelServer(ctx context.Context, sentinel *sentinel.Sentinel, logger log.Logger) *SentinelServer { @@ -42,6 +45,7 @@ func NewSentinelServer(ctx context.Context, sentinel *sentinel.Sentinel, logger ctx: ctx, gossipNotifier: newGossipNotifier(), logger: logger, + peerStatistics: make(map[string]*diagnostics.PeerStatistics), } } @@ -74,6 +78,18 @@ func (s *SentinelServer) PublishGossip(_ context.Context, msg *sentinelrpc.Gossi manager := s.sentinel.GossipManager() // Snappify payload before sending it to gossip compressedData := utils.CompressSnappy(msg.Data) + + _, found := s.peerStatistics[msg.GetPeer().Pid] + + if found { + s.peerStatistics[msg.GetPeer().Pid].BytesOut += uint64(len(compressedData)) + } else { + s.peerStatistics[msg.GetPeer().Pid] = &diagnostics.PeerStatistics{ + BytesIn: 0, + BytesOut: uint64(len(compressedData)), + } + } + var subscription *sentinel.GossipSubscription switch msg.Type { @@ -295,6 +311,7 @@ func (s *SentinelServer) ListenToGossip() { func (s *SentinelServer) handleGossipPacket(pkt *pubsub.Message) error { var err error s.logger.Trace("[Sentinel Gossip] Received Packet", "topic", pkt.Topic) + data := pkt.GetData() // If we use snappy codec then decompress it accordingly. @@ -308,6 +325,18 @@ func (s *SentinelServer) handleGossipPacket(pkt *pubsub.Message) error { if err != nil { return err } + + _, found := s.peerStatistics[string(textPid)] + + if found { + s.peerStatistics[string(textPid)].BytesIn += uint64(len(data)) + } else { + s.peerStatistics[string(textPid)] = &diagnostics.PeerStatistics{ + BytesIn: uint64(len(data)), + BytesOut: 0, + } + } + // Check to which gossip it belongs to. if strings.Contains(*pkt.Topic, string(sentinel.BeaconBlockTopic)) { s.gossipNotifier.notify(sentinelrpc.GossipType_BeaconBlockGossipType, data, string(textPid)) @@ -327,3 +356,13 @@ func (s *SentinelServer) handleGossipPacket(pkt *pubsub.Message) error { } return nil } + +func (s *SentinelServer) GetPeersStatistics() map[string]*diagnostics.PeerStatistics { + stats := make(map[string]*diagnostics.PeerStatistics) + for k, v := range s.peerStatistics { + stats[k] = v + delete(s.peerStatistics, k) + } + + return stats +} diff --git a/cmd/sentry/sentry/sentry_grpc_server.go b/cmd/sentry/sentry/sentry_grpc_server.go index 3d55727c2..6e908638c 100644 --- a/cmd/sentry/sentry/sentry_grpc_server.go +++ b/cmd/sentry/sentry/sentry_grpc_server.go @@ -383,6 +383,9 @@ func runPeer( if err != nil { return p2p.NewPeerError(p2p.PeerErrorMessageReceive, p2p.DiscNetworkError, err, "sentry.runPeer: ReadMsg error") } + + peerInfo.peer.BytesTransfered += int(msg.Size) + if msg.Size > eth.ProtocolMaxMsgSize { msg.Discard() return p2p.NewPeerError(p2p.PeerErrorMessageSizeLimit, p2p.DiscSubprotocolError, nil, fmt.Sprintf("sentry.runPeer: message is too large %d, limit %d", msg.Size, eth.ProtocolMaxMsgSize)) @@ -1038,6 +1041,15 @@ func (ss *GrpcServer) Peers(_ context.Context, _ *emptypb.Empty) (*proto_sentry. return &reply, nil } +func (ss *GrpcServer) DiagnosticsPeersData() []*p2p.PeerInfo { + if ss.P2pServer == nil { + return []*p2p.PeerInfo{} + } + + peers := ss.P2pServer.PeersInfo() + return peers +} + func (ss *GrpcServer) SimplePeerCount() map[uint]int { counts := map[uint]int{} ss.rangePeers(func(peerInfo *PeerInfo) bool { diff --git a/diagnostics/peers.go b/diagnostics/peers.go index 2b25fda29..4d1a4b768 100644 --- a/diagnostics/peers.go +++ b/diagnostics/peers.go @@ -1,15 +1,37 @@ package diagnostics import ( - "context" "encoding/json" "net/http" - "github.com/ledgerwatch/erigon/p2p" + diagnint "github.com/ledgerwatch/erigon-lib/diagnostics" "github.com/ledgerwatch/erigon/turbo/node" "github.com/urfave/cli/v2" ) +type PeerNetworkInfo struct { + LocalAddress string `json:"localAddress"` // Local endpoint of the TCP data connection + RemoteAddress string `json:"remoteAddress"` // Remote endpoint of the TCP data connection + Inbound bool `json:"inbound"` + Trusted bool `json:"trusted"` + Static bool `json:"static"` +} + +type PeerResponse struct { + ENR string `json:"enr,omitempty"` // Ethereum Node Record + Enode string `json:"enode"` // Node URL + ID string `json:"id"` // Unique node identifier + Name string `json:"name"` // Name of the node, including client type, version, OS, custom data + ErrorCount int `json:"errorCount"` // Number of errors + LastSeenError string `json:"lastSeenError"` // Last seen error + Type string `json:"type"` // Type of connection + Caps []string `json:"caps"` // Protocols advertised by this peer + Network PeerNetworkInfo `json:"network"` + Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields + BytesIn int `json:"bytesIn"` // Number of bytes received from the peer + BytesOut int `json:"bytesOut"` // Number of bytes sent to the peer +} + func SetupPeersAccess(ctx *cli.Context, metricsMux *http.ServeMux, node *node.ErigonNode) { metricsMux.HandleFunc("/peers", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") @@ -19,34 +41,89 @@ func SetupPeersAccess(ctx *cli.Context, metricsMux *http.ServeMux, node *node.Er } func writePeers(w http.ResponseWriter, ctx *cli.Context, node *node.ErigonNode) { - reply, err := node.Backend().Peers(context.Background()) - + sentinelPeers, err := sentinelPeers(node) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - peers := make([]*p2p.PeerInfo, 0, len(reply.Peers)) + sentryPeers, err := sentryPeers(node) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } - for _, rpcPeer := range reply.Peers { - peer := p2p.PeerInfo{ - ENR: rpcPeer.Enr, - Enode: rpcPeer.Enode, - ID: rpcPeer.Id, - Name: rpcPeer.Name, - Caps: rpcPeer.Caps, - Network: struct { - LocalAddress string `json:"localAddress"` - RemoteAddress string `json:"remoteAddress"` - Inbound bool `json:"inbound"` - Trusted bool `json:"trusted"` - Static bool `json:"static"` - }{ - LocalAddress: rpcPeer.ConnLocalAddr, - RemoteAddress: rpcPeer.ConnRemoteAddr, - Inbound: rpcPeer.ConnIsInbound, - Trusted: rpcPeer.ConnIsTrusted, - Static: rpcPeer.ConnIsStatic, + allPeers := append(sentryPeers, sentinelPeers...) + + json.NewEncoder(w).Encode(allPeers) +} + +func sentinelPeers(node *node.ErigonNode) ([]*PeerResponse, error) { + if diag, ok := node.Backend().Sentinel().(diagnint.PeerStatisticsGetter); ok { + + statisticsArray := diag.GetPeersStatistics() + peers := make([]*PeerResponse, 0, len(statisticsArray)) + + for key, value := range statisticsArray { + peer := PeerResponse{ + ENR: "", //TODO: find a way how to get missing data + Enode: "", + ID: key, + Name: "", + BytesIn: int(value.BytesIn), + BytesOut: int(value.BytesOut), + Type: "Sentinel", + Caps: []string{}, + Network: PeerNetworkInfo{ + LocalAddress: "", + RemoteAddress: "", + Inbound: false, + Trusted: false, + Static: false, + }, + Protocols: nil, + } + + peers = append(peers, &peer) + } + + return peers, nil + } else { + return []*PeerResponse{}, nil + } +} + +func sentryPeers(node *node.ErigonNode) ([]*PeerResponse, error) { + + reply := node.Backend().DiagnosticsPeersData() + + peers := make([]*PeerResponse, 0, len(reply)) + + for _, rpcPeer := range reply { + var bin = 0 + var bout = 0 + + if rpcPeer.Network.Inbound { + bin = rpcPeer.BytesTransfered + } else { + bout = rpcPeer.BytesTransfered + } + + peer := PeerResponse{ + ENR: rpcPeer.ENR, + Enode: rpcPeer.Enode, + ID: rpcPeer.ID, + Name: rpcPeer.Name, + BytesIn: bin, + BytesOut: bout, + Type: "Sentry", + Caps: rpcPeer.Caps, + Network: PeerNetworkInfo{ + LocalAddress: rpcPeer.Network.LocalAddress, + RemoteAddress: rpcPeer.Network.RemoteAddress, + Inbound: rpcPeer.Network.Inbound, + Trusted: rpcPeer.Network.Trusted, + Static: rpcPeer.Network.Static, }, Protocols: nil, } @@ -54,5 +131,5 @@ func writePeers(w http.ResponseWriter, ctx *cli.Context, node *node.ErigonNode) peers = append(peers, &peer) } - json.NewEncoder(w).Encode(peers) + return peers, nil } diff --git a/erigon-lib/diagnostics/entities.go b/erigon-lib/diagnostics/entities.go new file mode 100644 index 000000000..7e8920e87 --- /dev/null +++ b/erigon-lib/diagnostics/entities.go @@ -0,0 +1,26 @@ +/* + Copyright 2021 Erigon contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package diagnostics + +type PeerStatisticsGetter interface { + GetPeersStatistics() map[string]*PeerStatistics +} + +type PeerStatistics struct { + BytesIn uint64 + BytesOut uint64 +} diff --git a/erigon-lib/direct/sentinel_client.go b/erigon-lib/direct/sentinel_client.go index 86481b4fb..f421f4332 100644 --- a/erigon-lib/direct/sentinel_client.go +++ b/erigon-lib/direct/sentinel_client.go @@ -20,6 +20,7 @@ import ( "context" "io" + "github.com/ledgerwatch/erigon-lib/diagnostics" "github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel" "google.golang.org/grpc" ) @@ -110,3 +111,12 @@ func (s *SentinelSubscribeGossipS) Err(err error) { } s.ch <- &gossipReply{err: err} } + +func (s *SentinelClientDirect) GetPeersStatistics() map[string]*diagnostics.PeerStatistics { + + if diag, ok := s.server.(diagnostics.PeerStatisticsGetter); ok { + return diag.GetPeersStatistics() + } + + return map[string]*diagnostics.PeerStatistics{} +} diff --git a/eth/backend.go b/eth/backend.go index 054fe9ad6..c8a07fc2d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -90,6 +90,7 @@ import ( "github.com/ledgerwatch/erigon/cmd/sentry/sentry" "github.com/ledgerwatch/erigon/common/debug" + rpcsentinel "github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus/bor" "github.com/ledgerwatch/erigon/consensus/bor/finality/flags" @@ -198,6 +199,7 @@ type Ethereum struct { kvRPC *remotedbserver.KvServer logger log.Logger + sentinel rpcsentinel.SentinelClient silkworm *silkworm.Silkworm } @@ -808,6 +810,9 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger if err != nil { return nil, err } + + backend.sentinel = client + go func() { if err := caplin1.RunCaplinPhase1(ctx, client, engine, beaconCfg, genesisCfg, state, nil, dirs, beacon.RouterConfiguration{Active: false}); err != nil { logger.Error("could not start caplin", "err", err) @@ -1206,9 +1211,20 @@ func (s *Ethereum) Peers(ctx context.Context) (*remote.PeersReply, error) { } reply.Peers = append(reply.Peers, peers.Peers...) } + return &reply, nil } +func (s *Ethereum) DiagnosticsPeersData() []*p2p.PeerInfo { + var reply []*p2p.PeerInfo + for _, sentryServer := range s.sentryServers { + peers := sentryServer.DiagnosticsPeersData() + reply = append(reply, peers...) + } + + return reply +} + func (s *Ethereum) AddPeer(ctx context.Context, req *remote.AddPeerRequest) (*remote.AddPeerReply, error) { for _, sentryClient := range s.sentriesClient.Sentries() { _, err := sentryClient.AddPeer(ctx, &proto_sentry.AddPeerRequest{Url: req.Url}) @@ -1390,3 +1406,7 @@ func readCurrentTotalDifficulty(ctx context.Context, db kv.RwDB, blockReader ser }) return currentTD, err } + +func (s *Ethereum) Sentinel() rpcsentinel.SentinelClient { + return s.sentinel +} diff --git a/p2p/peer.go b/p2p/peer.go index fd8e9890a..e47b95105 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -121,6 +121,9 @@ type Peer struct { events *event.Feed pubkey [64]byte metricsEnabled bool + + //diagnostics info + BytesTransfered int } // NewPeer returns a peer for testing purposes. @@ -490,7 +493,8 @@ type PeerInfo struct { Trusted bool `json:"trusted"` Static bool `json:"static"` } `json:"network"` - Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields + Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields + BytesTransfered int `json:"bytesTransfered,omitempty"` } // Info gathers and returns a collection of metadata known about a peer. @@ -516,6 +520,7 @@ func (p *Peer) Info() *PeerInfo { info.Network.Inbound = p.rw.is(inboundConn) info.Network.Trusted = p.rw.is(trustedConn) info.Network.Static = p.rw.is(staticDialedConn) + info.BytesTransfered = p.BytesTransfered // Gather all the running protocol infos for _, proto := range p.running {