From 02fd54bea7eaef3bb610c3f7ca88a77599747b08 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 24 Aug 2021 00:12:13 +0000 Subject: [PATCH] Refactor discovery queries (#2518) ## Issue Addressed N/A ## Proposed Changes Refactor discovery queries such that only `QueryType::Subnet` queries are queued for grouping. `QueryType::FindPeers` is always made regardless of the number of active `Subnet` queries (max = 2). This prevents `FindPeers` queries from getting starved if `Subnet` queries start queuing up. Also removes `GroupedQueryType` struct and use `QueryType` for all queuing and processing of discovery requests. ## Additional Info Currently, no distinction is made between subnet discovery queries for attestation and sync committee subnets when grouping queries. Potentially we could prioritise attestation queries over sync committee queries in the future. --- beacon_node/eth2_libp2p/src/discovery/mod.rs | 230 +++++++------------ 1 file changed, 83 insertions(+), 147 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/discovery/mod.rs b/beacon_node/eth2_libp2p/src/discovery/mod.rs index 3866e4d47..a74889426 100644 --- a/beacon_node/eth2_libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2_libp2p/src/discovery/mod.rs @@ -53,8 +53,11 @@ pub const TARGET_SUBNET_PEERS: usize = config::MESH_N_LOW; const TARGET_PEERS_FOR_GROUPED_QUERY: usize = 6; /// Number of times to attempt a discovery request. const MAX_DISCOVERY_RETRY: usize = 3; -/// The maximum number of concurrent discovery queries. -const MAX_CONCURRENT_QUERIES: usize = 2; +/// The maximum number of concurrent subnet discovery queries. +/// Note: we always allow a single FindPeers query, so we would be +/// running a maximum of `MAX_CONCURRENT_SUBNET_QUERIES + 1` +/// discovery queries at a time. +const MAX_CONCURRENT_SUBNET_QUERIES: usize = 2; /// The max number of subnets to search for in a single subnet discovery query. const MAX_SUBNETS_IN_QUERY: usize = 3; /// The number of closest peers to search for when doing a regular peer search. @@ -81,6 +84,19 @@ struct SubnetQuery { retries: usize, } +impl SubnetQuery { + /// Returns true if this query has expired. + pub fn expired(&self) -> bool { + if let Some(ttl) = self.min_ttl { + ttl < Instant::now() + } + // `None` corresponds to long lived subnet discovery requests. + else { + false + } + } +} + impl std::fmt::Debug for SubnetQuery { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let min_ttl_secs = self @@ -97,37 +113,16 @@ impl std::fmt::Debug for SubnetQuery { #[derive(Debug, Clone, PartialEq)] enum QueryType { /// We are searching for subnet peers. - Subnet(SubnetQuery), - /// We are searching for more peers without ENR or time constraints. - FindPeers, -} - -#[derive(Debug, Clone, PartialEq)] -enum GroupedQueryType { - /// We are searching for peers on one of a few subnets. Subnet(Vec), /// We are searching for more peers without ENR or time constraints. FindPeers, } -impl QueryType { - /// Returns true if this query has expired. - pub fn expired(&self) -> bool { - match self { - Self::FindPeers => false, - Self::Subnet(subnet_query) => { - if let Some(ttl) = subnet_query.min_ttl { - ttl < Instant::now() - } else { - true - } - } - } - } -} - /// The result of a query. -struct QueryResult(GroupedQueryType, Result, discv5::QueryError>); +struct QueryResult { + query_type: QueryType, + result: Result, discv5::QueryError>, +} // Awaiting the event stream future enum EventStream { @@ -169,8 +164,8 @@ pub struct Discovery { /// a time, regardless of the query concurrency. find_peer_active: bool, - /// A queue of discovery queries to be processed. - queued_queries: VecDeque, + /// A queue of subnet queries to be processed. + queued_queries: VecDeque, /// Active discovery queries. active_queries: FuturesUnordered + Send>>>, @@ -328,15 +323,12 @@ impl Discovery { if !self.started || self.find_peer_active { return; } - - // If there is not already a find peer's query queued, add one - let query = QueryType::FindPeers; - if !self.queued_queries.contains(&query) { - debug!(self.log, "Queuing a peer discovery request"); - self.queued_queries.push_back(query); - // update the metrics - metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); - } + // Immediately start a FindNode query + debug!(self.log, "Starting a peer discovery request"); + self.find_peer_active = true; + self.start_query(QueryType::FindPeers, FIND_NODE_QUERY_CLOSEST_PEERS, |_| { + true + }); } /// Processes a request to search for more peers on a subnet. @@ -347,7 +339,7 @@ impl Discovery { } trace!( self.log, - "Making discovery query for subnets"; + "Starting discovery query for subnets"; "subnets" => ?subnets_to_discover.iter().map(|s| s.subnet).collect::>() ); for subnet in subnets_to_discover { @@ -612,30 +604,26 @@ impl Discovery { // Search through any queued requests and update the timeout if a query for this subnet // already exists let mut found = false; - for query in self.queued_queries.iter_mut() { - if let QueryType::Subnet(ref mut subnet_query) = query { - if subnet_query.subnet == subnet { - if subnet_query.min_ttl < min_ttl { - subnet_query.min_ttl = min_ttl; - } - // update the number of retries - subnet_query.retries = retries; - // mimic an `Iter::Find()` and short-circuit the loop - found = true; - break; + for subnet_query in self.queued_queries.iter_mut() { + if subnet_query.subnet == subnet { + if subnet_query.min_ttl < min_ttl { + subnet_query.min_ttl = min_ttl; } + // update the number of retries + subnet_query.retries = retries; + // mimic an `Iter::Find()` and short-circuit the loop + found = true; + break; } } if !found { - // Set up the query and add it to the queue - let query = QueryType::Subnet(SubnetQuery { + // update the metrics and insert into the queue. + trace!(self.log, "Queuing subnet query"; "subnet" => ?subnet, "retries" => retries); + self.queued_queries.push_back(SubnetQuery { subnet, min_ttl, retries, }); - // update the metrics and insert into the queue. - trace!(self.log, "Queuing subnet query"; "subnet" => ?subnet, "retries" => retries); - self.queued_queries.push_back(query); metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); } } @@ -643,8 +631,8 @@ impl Discovery { /// Consume the discovery queue and initiate queries when applicable. /// /// This also sanitizes the queue removing out-dated queries. - /// Returns `true` if any of the queued queries is processed and a discovery - /// query (Subnet or FindPeers) is started. + /// Returns `true` if any of the queued queries is processed and a subnet discovery + /// query is started. fn process_queue(&mut self) -> bool { // Sanitize the queue, removing any out-dated subnet queries self.queued_queries.retain(|query| !query.expired()); @@ -655,44 +643,19 @@ impl Discovery { // Check that we are within our query concurrency limit while !self.at_capacity() && !self.queued_queries.is_empty() { // consume and process the query queue - match self.queued_queries.pop_front() { - Some(QueryType::FindPeers) => { - // Only start a find peers query if it is the last message in the queue. - // We want to prioritize subnet queries, so we don't miss attestations. - if self.queued_queries.is_empty() { - // This is a regular request to find additional peers - debug!(self.log, "Discovery query started"); - self.find_peer_active = true; - self.start_query( - GroupedQueryType::FindPeers, - FIND_NODE_QUERY_CLOSEST_PEERS, - |_| true, - ); - processed = true; - } else { - self.queued_queries.push_back(QueryType::FindPeers); - } - } - Some(QueryType::Subnet(subnet_query)) => { - subnet_queries.push(subnet_query); + if let Some(subnet_query) = self.queued_queries.pop_front() { + subnet_queries.push(subnet_query); - // We want to start a grouped subnet query if: - // 1. We've grouped MAX_SUBNETS_IN_QUERY subnets together. - // 2. There are no more messages in the queue. - // 3. There is exactly one message in the queue and it is FindPeers. - if subnet_queries.len() == MAX_SUBNETS_IN_QUERY - || self.queued_queries.is_empty() - || (self.queued_queries.front() == Some(&QueryType::FindPeers) - && self.queued_queries.len() == 1) - { - // This query is for searching for peers of a particular subnet - // Drain subnet_queries so we can re-use it as we continue to process the queue - let grouped_queries: Vec = subnet_queries.drain(..).collect(); - self.start_subnet_query(grouped_queries); - processed = true; - } + // We want to start a grouped subnet query if: + // 1. We've grouped MAX_SUBNETS_IN_QUERY subnets together. + // 2. There are no more messages in the queue. + if subnet_queries.len() == MAX_SUBNETS_IN_QUERY || self.queued_queries.is_empty() { + // This query is for searching for peers of a particular subnet + // Drain subnet_queries so we can re-use it as we continue to process the queue + let grouped_queries: Vec = subnet_queries.drain(..).collect(); + self.start_subnet_query(grouped_queries); + processed = true; } - None => {} // Queue is empty } } // Update the queue metric @@ -701,9 +664,12 @@ impl Discovery { } // Returns a boolean indicating if we are currently processing the maximum number of - // concurrent queries or not. + // concurrent subnet queries or not. fn at_capacity(&self) -> bool { - self.active_queries.len() >= MAX_CONCURRENT_QUERIES + self.active_queries + .len() + .saturating_sub(self.find_peer_active as usize) // We only count active subnet queries + >= MAX_CONCURRENT_SUBNET_QUERIES } /// Runs a discovery request for a given group of subnets. @@ -754,7 +720,7 @@ impl Discovery { "subnets" => ?filtered_subnet_queries, ); self.start_query( - GroupedQueryType::Subnet(filtered_subnet_queries), + QueryType::Subnet(filtered_subnet_queries), TARGET_PEERS_FOR_GROUPED_QUERY, subnet_predicate, ); @@ -768,14 +734,14 @@ impl Discovery { /// ENR. fn start_query( &mut self, - grouped_query: GroupedQueryType, + query: QueryType, target_peers: usize, additional_predicate: impl Fn(&Enr) -> bool + Send + 'static, ) { // Make sure there are subnet queries included - let contains_queries = match &grouped_query { - GroupedQueryType::Subnet(queries) => !queries.is_empty(), - GroupedQueryType::FindPeers => true, + let contains_queries = match &query { + QueryType::Subnet(queries) => !queries.is_empty(), + QueryType::FindPeers => true, }; if !contains_queries { @@ -813,7 +779,10 @@ impl Discovery { let query_future = self .discv5 .find_node_predicate(random_node, predicate, target_peers) - .map(|v| QueryResult(grouped_query, v)); + .map(|v| QueryResult { + query_type: query, + result: v, + }); // Add the future to active queries, to be executed. self.active_queries.push(Box::pin(query_future)); @@ -822,12 +791,12 @@ impl Discovery { /// Process the completed QueryResult returned from discv5. fn process_completed_queries( &mut self, - query_result: QueryResult, + query: QueryResult, ) -> Option>> { - match query_result.0 { - GroupedQueryType::FindPeers => { + match query.query_type { + QueryType::FindPeers => { self.find_peer_active = false; - match query_result.1 { + match query.result { Ok(r) if r.is_empty() => { debug!(self.log, "Discovery query yielded no results."); } @@ -846,10 +815,10 @@ impl Discovery { } } } - GroupedQueryType::Subnet(queries) => { + QueryType::Subnet(queries) => { let subnets_searched_for: Vec = queries.iter().map(|query| query.subnet).collect(); - match query_result.1 { + match query.result { Ok(r) if r.is_empty() => { debug!(self.log, "Grouped subnet discovery query yielded no results."; "subnets_searched_for" => ?subnets_searched_for); queries.iter().for_each(|query| { @@ -1144,10 +1113,7 @@ mod tests { subnet_query.min_ttl, subnet_query.retries, ); - assert_eq!( - discovery.queued_queries.back(), - Some(&QueryType::Subnet(subnet_query.clone())) - ); + assert_eq!(discovery.queued_queries.back(), Some(&subnet_query)); // New query should replace old query subnet_query.min_ttl = Some(now + Duration::from_secs(1)); @@ -1158,7 +1124,7 @@ mod tests { assert_eq!(discovery.queued_queries.len(), 1); assert_eq!( discovery.queued_queries.pop_back(), - Some(QueryType::Subnet(subnet_query.clone())) + Some(subnet_query.clone()) ); // Retries > MAX_DISCOVERY_RETRY must return immediately without adding @@ -1172,39 +1138,6 @@ mod tests { assert_eq!(discovery.queued_queries.len(), 0); } - #[tokio::test] - async fn test_process_queue() { - let mut discovery = build_discovery().await; - - // FindPeers query is processed if there is no subnet query - discovery.queued_queries.push_back(QueryType::FindPeers); - assert!(discovery.process_queue()); - - let now = Instant::now(); - let subnet_query = SubnetQuery { - subnet: Subnet::Attestation(SubnetId::new(1)), - min_ttl: Some(now + Duration::from_secs(10)), - retries: 0, - }; - - // Refresh active queries - discovery.active_queries = Default::default(); - - // SubnetQuery is processed if it's the only queued query - discovery - .queued_queries - .push_back(QueryType::Subnet(subnet_query.clone())); - assert!(discovery.process_queue()); - - // SubnetQuery is processed if it's there is also 1 queued discovery query - discovery.queued_queries.push_back(QueryType::FindPeers); - discovery - .queued_queries - .push_back(QueryType::Subnet(subnet_query)); - // Process Subnet query and FindPeers afterwards. - assert!(discovery.process_queue()); - } - fn make_enr(subnet_ids: Vec) -> Enr { let mut builder = EnrBuilder::new("v4"); let keypair = libp2p::identity::Keypair::generate_secp256k1(); @@ -1227,7 +1160,7 @@ mod tests { let instant1 = Some(now + Duration::from_secs(10)); let instant2 = Some(now + Duration::from_secs(5)); - let query = GroupedQueryType::Subnet(vec![ + let query = QueryType::Subnet(vec![ SubnetQuery { subnet: Subnet::Attestation(SubnetId::new(1)), min_ttl: instant1, @@ -1248,7 +1181,10 @@ mod tests { let enrs: Vec = vec![enr1.clone(), enr2, enr3]; let results = discovery - .process_completed_queries(QueryResult(query, Ok(enrs))) + .process_completed_queries(QueryResult { + query_type: query, + result: Ok(enrs), + }) .unwrap(); // enr1 and enr2 are required peers based on the requested subnet ids