From cb26c15eb65382bbcd5dc3065264fb4673014604 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 13 Nov 2020 02:02:41 +0000 Subject: [PATCH] Peer endpoint updates (#1893) ## Issue Addressed N/A ## Proposed Changes - rename `address` -> `last_seen_p2p_address` - state and direction filters for `peers` endpoint - metadata count addition to `peers` endpoint - add `peer_count` endpoint Co-authored-by: realbigsean --- beacon_node/http_api/src/lib.rs | 120 +++++++++++++++++++++------- beacon_node/http_api/tests/tests.rs | 70 +++++++++++++--- common/eth2/src/lib.rs | 36 ++++++++- common/eth2/src/types.rs | 51 +++++++++++- 4 files changed, 234 insertions(+), 43 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index ba3715011..61c434753 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1353,7 +1353,7 @@ pub fn serve( return Ok(api_types::GenericResponse::from(api_types::PeerData { peer_id: peer_id.to_string(), enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()), - address, + last_seen_p2p_address: address, direction: api_types::PeerDirection::from_connection_direction( &dir, ), @@ -1375,47 +1375,104 @@ pub fn serve( .and(warp::path("node")) .and(warp::path("peers")) .and(warp::path::end()) + .and(warp::query::()) + .and(network_globals.clone()) + .and_then( + |query: api_types::PeersQuery, network_globals: Arc>| { + blocking_json_task(move || { + let mut peers: Vec = Vec::new(); + network_globals + .peers + .read() + .peers() + .for_each(|(peer_id, peer_info)| { + let address = + if let Some(socket_addr) = peer_info.seen_addresses.iter().next() { + let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip()); + addr.push(eth2_libp2p::multiaddr::Protocol::Tcp( + socket_addr.port(), + )); + addr.to_string() + } else if let Some(addr) = peer_info.listening_addresses.first() { + addr.to_string() + } else { + String::new() + }; + + // the eth2 API spec implies only peers we have been connected to at some point should be included. + if let Some(dir) = peer_info.connection_direction.as_ref() { + let direction = + api_types::PeerDirection::from_connection_direction(&dir); + let state = api_types::PeerState::from_peer_connection_status( + &peer_info.connection_status(), + ); + + let state_matches = query.state.as_ref().map_or(true, |states| { + states.0.iter().any(|state_param| *state_param == state) + }); + let direction_matches = + query.direction.as_ref().map_or(true, |directions| { + directions.0.iter().any(|dir_param| *dir_param == direction) + }); + + if state_matches && direction_matches { + peers.push(api_types::PeerData { + peer_id: peer_id.to_string(), + enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()), + last_seen_p2p_address: address, + direction, + state, + }); + } + } + }); + Ok(api_types::PeersData { + meta: api_types::PeersMetaData { + count: peers.len() as u64, + }, + data: peers, + }) + }) + }, + ); + + // GET node/peer_count + let get_node_peer_count = eth1_v1 + .and(warp::path("node")) + .and(warp::path("peer_count")) + .and(warp::path::end()) .and(network_globals.clone()) .and_then(|network_globals: Arc>| { blocking_json_task(move || { - let mut peers: Vec = Vec::new(); + let mut connected: u64 = 0; + let mut connecting: u64 = 0; + let mut disconnected: u64 = 0; + let mut disconnecting: u64 = 0; + network_globals .peers .read() .peers() - // the eth2 API spec implies only peers we have been connected to at some point should be included. - .filter(|(_, peer_info)| peer_info.connection_direction.is_some()) - .for_each(|(peer_id, peer_info)| { - let address = if let Some(socket_addr) = - peer_info.seen_addresses.iter().next() - { - let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip()); - addr.push(eth2_libp2p::multiaddr::Protocol::Tcp(socket_addr.port())); - addr.to_string() - } else if let Some(addr) = peer_info.listening_addresses.first() { - addr.to_string() - } else { - String::new() - }; - - if let Some(dir) = peer_info.connection_direction.as_ref() { - peers.push(api_types::PeerData { - peer_id: peer_id.to_string(), - enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()), - address, - direction: api_types::PeerDirection::from_connection_direction( - &dir, - ), - state: api_types::PeerState::from_peer_connection_status( - &peer_info.connection_status(), - ), - }); + .for_each(|(_, peer_info)| { + let state = api_types::PeerState::from_peer_connection_status( + &peer_info.connection_status(), + ); + match state { + api_types::PeerState::Connected => connected += 1, + api_types::PeerState::Connecting => connecting += 1, + api_types::PeerState::Disconnected => disconnected += 1, + api_types::PeerState::Disconnecting => disconnecting += 1, } }); - Ok(api_types::GenericResponse::from(peers)) + + Ok(api_types::GenericResponse::from(api_types::PeerCount { + disconnecting, + connecting, + connected, + disconnected, + })) }) }); - /* * validator */ @@ -2076,6 +2133,7 @@ pub fn serve( .or(get_node_health.boxed()) .or(get_node_peers_by_id.boxed()) .or(get_node_peers.boxed()) + .or(get_node_peer_count.boxed()) .or(get_validator_duties_proposer.boxed()) .or(get_validator_blocks.boxed()) .or(get_validator_attestation_data.boxed()) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index f80ece9fe..875543bb7 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1178,7 +1178,7 @@ impl ApiTester { let expected = PeerData { peer_id: self.external_peer_id.to_string(), enr: None, - address: EXTERNAL_ADDR.to_string(), + last_seen_p2p_address: EXTERNAL_ADDR.to_string(), state: PeerState::Connected, direction: PeerDirection::Inbound, }; @@ -1189,18 +1189,66 @@ impl ApiTester { } pub async fn test_get_node_peers(self) -> Self { - let result = self.client.get_node_peers().await.unwrap().data; + let peer_states: Vec> = vec![ + Some(&[PeerState::Connected]), + Some(&[PeerState::Connecting]), + Some(&[PeerState::Disconnected]), + Some(&[PeerState::Disconnecting]), + None, + Some(&[PeerState::Connected, PeerState::Connecting]), + ]; + let peer_dirs: Vec> = vec![ + Some(&[PeerDirection::Outbound]), + Some(&[PeerDirection::Inbound]), + Some(&[PeerDirection::Inbound, PeerDirection::Outbound]), + None, + ]; - let expected = PeerData { - peer_id: self.external_peer_id.to_string(), - enr: None, - address: EXTERNAL_ADDR.to_string(), - state: PeerState::Connected, - direction: PeerDirection::Inbound, - }; + for states in peer_states { + for dirs in peer_dirs.clone() { + let result = self.client.get_node_peers(states, dirs).await.unwrap(); + let expected_peer = PeerData { + peer_id: self.external_peer_id.to_string(), + enr: None, + last_seen_p2p_address: EXTERNAL_ADDR.to_string(), + state: PeerState::Connected, + direction: PeerDirection::Inbound, + }; - assert_eq!(result, vec![expected]); + let state_match = + states.map_or(true, |states| states.contains(&PeerState::Connected)); + let dir_match = dirs.map_or(true, |dirs| dirs.contains(&PeerDirection::Inbound)); + let mut expected_peers = Vec::new(); + if state_match && dir_match { + expected_peers.push(expected_peer); + } + + assert_eq!( + result, + PeersData { + meta: PeersMetaData { + count: expected_peers.len() as u64 + }, + data: expected_peers, + } + ); + } + } + self + } + + pub async fn test_get_node_peer_count(self) -> Self { + let result = self.client.get_node_peer_count().await.unwrap().data; + assert_eq!( + result, + PeerCount { + connected: 1, + connecting: 0, + disconnected: 0, + disconnecting: 0, + } + ); self } @@ -1899,6 +1947,8 @@ async fn node_get() { .test_get_node_peers_by_id() .await .test_get_node_peers() + .await + .test_get_node_peer_count() .await; } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 38d86ddd7..be8fc0b59 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -722,7 +722,11 @@ impl BeaconNodeHttpClient { } /// `GET node/peers` - pub async fn get_node_peers(&self) -> Result>, Error> { + pub async fn get_node_peers( + &self, + states: Option<&[PeerState]>, + directions: Option<&[PeerDirection]>, + ) -> Result { let mut path = self.eth_path()?; path.path_segments_mut() @@ -730,6 +734,36 @@ impl BeaconNodeHttpClient { .push("node") .push("peers"); + if let Some(states) = states { + let state_string = states + .iter() + .map(|i| i.to_string()) + .collect::>() + .join(","); + path.query_pairs_mut().append_pair("state", &state_string); + } + + if let Some(directions) = directions { + let dir_string = directions + .iter() + .map(|i| i.to_string()) + .collect::>() + .join(","); + path.query_pairs_mut().append_pair("direction", &dir_string); + } + + self.get(path).await + } + + /// `GET node/peer_count` + pub async fn get_node_peer_count(&self) -> Result, Error> { + let mut path = self.eth_path()?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("node") + .push("peer_count"); + self.get(path).await } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 56a83cfd5..51e3c6c33 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -509,15 +509,32 @@ pub struct BeaconCommitteeSubscription { pub is_aggregator: bool, } +#[derive(Deserialize)] +pub struct PeersQuery { + pub state: Option>, + pub direction: Option>, +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct PeerData { pub peer_id: String, pub enr: Option, - pub address: String, + pub last_seen_p2p_address: String, pub state: PeerState, pub direction: PeerDirection, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct PeersData { + pub data: Vec, + pub meta: PeersMetaData, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct PeersMetaData { + pub count: u64, +} + #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum PeerState { @@ -554,6 +571,17 @@ impl FromStr for PeerState { } } +impl fmt::Display for PeerState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PeerState::Connected => write!(f, "connected"), + PeerState::Connecting => write!(f, "connecting"), + PeerState::Disconnected => write!(f, "disconnected"), + PeerState::Disconnecting => write!(f, "disconnecting"), + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum PeerDirection { @@ -582,6 +610,27 @@ impl FromStr for PeerDirection { } } +impl fmt::Display for PeerDirection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PeerDirection::Inbound => write!(f, "inbound"), + PeerDirection::Outbound => write!(f, "outbound"), + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct PeerCount { + #[serde(with = "serde_utils::quoted_u64")] + pub connected: u64, + #[serde(with = "serde_utils::quoted_u64")] + pub connecting: u64, + #[serde(with = "serde_utils::quoted_u64")] + pub disconnected: u64, + #[serde(with = "serde_utils::quoted_u64")] + pub disconnecting: u64, +} + #[cfg(test)] mod tests { use super::*;