Refactor discovery queries (#2518)

## Issue Addressed

N/A

## Proposed Changes

Refactor discovery queries such that only `QueryType::Subnet` queries are queued for grouping. `QueryType::FindPeers` is always made regardless of the number of active `Subnet` queries (max = 2). This prevents `FindPeers` queries from getting starved if `Subnet` queries start queuing up. 

Also removes `GroupedQueryType` struct and use `QueryType` for all queuing and processing of discovery requests.

## Additional Info

Currently, no distinction is made between subnet discovery queries for attestation and sync committee subnets when grouping queries. Potentially we could prioritise attestation queries over sync committee queries in the future.
This commit is contained in:
Pawan Dhananjay 2021-08-24 00:12:13 +00:00
parent 90d5ab1566
commit 02fd54bea7

View File

@ -53,8 +53,11 @@ pub const TARGET_SUBNET_PEERS: usize = config::MESH_N_LOW;
const TARGET_PEERS_FOR_GROUPED_QUERY: usize = 6;
/// Number of times to attempt a discovery request.
const MAX_DISCOVERY_RETRY: usize = 3;
/// The maximum number of concurrent discovery queries.
const MAX_CONCURRENT_QUERIES: usize = 2;
/// The maximum number of concurrent subnet discovery queries.
/// Note: we always allow a single FindPeers query, so we would be
/// running a maximum of `MAX_CONCURRENT_SUBNET_QUERIES + 1`
/// discovery queries at a time.
const MAX_CONCURRENT_SUBNET_QUERIES: usize = 2;
/// The max number of subnets to search for in a single subnet discovery query.
const MAX_SUBNETS_IN_QUERY: usize = 3;
/// The number of closest peers to search for when doing a regular peer search.
@ -81,6 +84,19 @@ struct SubnetQuery {
retries: usize,
}
impl SubnetQuery {
/// Returns true if this query has expired.
pub fn expired(&self) -> bool {
if let Some(ttl) = self.min_ttl {
ttl < Instant::now()
}
// `None` corresponds to long lived subnet discovery requests.
else {
false
}
}
}
impl std::fmt::Debug for SubnetQuery {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let min_ttl_secs = self
@ -97,37 +113,16 @@ impl std::fmt::Debug for SubnetQuery {
#[derive(Debug, Clone, PartialEq)]
enum QueryType {
/// We are searching for subnet peers.
Subnet(SubnetQuery),
/// We are searching for more peers without ENR or time constraints.
FindPeers,
}
#[derive(Debug, Clone, PartialEq)]
enum GroupedQueryType {
/// We are searching for peers on one of a few subnets.
Subnet(Vec<SubnetQuery>),
/// We are searching for more peers without ENR or time constraints.
FindPeers,
}
impl QueryType {
/// Returns true if this query has expired.
pub fn expired(&self) -> bool {
match self {
Self::FindPeers => false,
Self::Subnet(subnet_query) => {
if let Some(ttl) = subnet_query.min_ttl {
ttl < Instant::now()
} else {
true
}
}
}
}
}
/// The result of a query.
struct QueryResult(GroupedQueryType, Result<Vec<Enr>, discv5::QueryError>);
struct QueryResult {
query_type: QueryType,
result: Result<Vec<Enr>, discv5::QueryError>,
}
// Awaiting the event stream future
enum EventStream {
@ -169,8 +164,8 @@ pub struct Discovery<TSpec: EthSpec> {
/// a time, regardless of the query concurrency.
find_peer_active: bool,
/// A queue of discovery queries to be processed.
queued_queries: VecDeque<QueryType>,
/// A queue of subnet queries to be processed.
queued_queries: VecDeque<SubnetQuery>,
/// Active discovery queries.
active_queries: FuturesUnordered<std::pin::Pin<Box<dyn Future<Output = QueryResult> + Send>>>,
@ -328,15 +323,12 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
if !self.started || self.find_peer_active {
return;
}
// If there is not already a find peer's query queued, add one
let query = QueryType::FindPeers;
if !self.queued_queries.contains(&query) {
debug!(self.log, "Queuing a peer discovery request");
self.queued_queries.push_back(query);
// update the metrics
metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64);
}
// Immediately start a FindNode query
debug!(self.log, "Starting a peer discovery request");
self.find_peer_active = true;
self.start_query(QueryType::FindPeers, FIND_NODE_QUERY_CLOSEST_PEERS, |_| {
true
});
}
/// Processes a request to search for more peers on a subnet.
@ -347,7 +339,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
trace!(
self.log,
"Making discovery query for subnets";
"Starting discovery query for subnets";
"subnets" => ?subnets_to_discover.iter().map(|s| s.subnet).collect::<Vec<_>>()
);
for subnet in subnets_to_discover {
@ -612,30 +604,26 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// Search through any queued requests and update the timeout if a query for this subnet
// already exists
let mut found = false;
for query in self.queued_queries.iter_mut() {
if let QueryType::Subnet(ref mut subnet_query) = query {
if subnet_query.subnet == subnet {
if subnet_query.min_ttl < min_ttl {
subnet_query.min_ttl = min_ttl;
}
// update the number of retries
subnet_query.retries = retries;
// mimic an `Iter::Find()` and short-circuit the loop
found = true;
break;
for subnet_query in self.queued_queries.iter_mut() {
if subnet_query.subnet == subnet {
if subnet_query.min_ttl < min_ttl {
subnet_query.min_ttl = min_ttl;
}
// update the number of retries
subnet_query.retries = retries;
// mimic an `Iter::Find()` and short-circuit the loop
found = true;
break;
}
}
if !found {
// Set up the query and add it to the queue
let query = QueryType::Subnet(SubnetQuery {
// update the metrics and insert into the queue.
trace!(self.log, "Queuing subnet query"; "subnet" => ?subnet, "retries" => retries);
self.queued_queries.push_back(SubnetQuery {
subnet,
min_ttl,
retries,
});
// update the metrics and insert into the queue.
trace!(self.log, "Queuing subnet query"; "subnet" => ?subnet, "retries" => retries);
self.queued_queries.push_back(query);
metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64);
}
}
@ -643,8 +631,8 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
/// Consume the discovery queue and initiate queries when applicable.
///
/// This also sanitizes the queue removing out-dated queries.
/// Returns `true` if any of the queued queries is processed and a discovery
/// query (Subnet or FindPeers) is started.
/// Returns `true` if any of the queued queries is processed and a subnet discovery
/// query is started.
fn process_queue(&mut self) -> bool {
// Sanitize the queue, removing any out-dated subnet queries
self.queued_queries.retain(|query| !query.expired());
@ -655,44 +643,19 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// Check that we are within our query concurrency limit
while !self.at_capacity() && !self.queued_queries.is_empty() {
// consume and process the query queue
match self.queued_queries.pop_front() {
Some(QueryType::FindPeers) => {
// Only start a find peers query if it is the last message in the queue.
// We want to prioritize subnet queries, so we don't miss attestations.
if self.queued_queries.is_empty() {
// This is a regular request to find additional peers
debug!(self.log, "Discovery query started");
self.find_peer_active = true;
self.start_query(
GroupedQueryType::FindPeers,
FIND_NODE_QUERY_CLOSEST_PEERS,
|_| true,
);
processed = true;
} else {
self.queued_queries.push_back(QueryType::FindPeers);
}
}
Some(QueryType::Subnet(subnet_query)) => {
subnet_queries.push(subnet_query);
if let Some(subnet_query) = self.queued_queries.pop_front() {
subnet_queries.push(subnet_query);
// We want to start a grouped subnet query if:
// 1. We've grouped MAX_SUBNETS_IN_QUERY subnets together.
// 2. There are no more messages in the queue.
// 3. There is exactly one message in the queue and it is FindPeers.
if subnet_queries.len() == MAX_SUBNETS_IN_QUERY
|| self.queued_queries.is_empty()
|| (self.queued_queries.front() == Some(&QueryType::FindPeers)
&& self.queued_queries.len() == 1)
{
// This query is for searching for peers of a particular subnet
// Drain subnet_queries so we can re-use it as we continue to process the queue
let grouped_queries: Vec<SubnetQuery> = subnet_queries.drain(..).collect();
self.start_subnet_query(grouped_queries);
processed = true;
}
// We want to start a grouped subnet query if:
// 1. We've grouped MAX_SUBNETS_IN_QUERY subnets together.
// 2. There are no more messages in the queue.
if subnet_queries.len() == MAX_SUBNETS_IN_QUERY || self.queued_queries.is_empty() {
// This query is for searching for peers of a particular subnet
// Drain subnet_queries so we can re-use it as we continue to process the queue
let grouped_queries: Vec<SubnetQuery> = subnet_queries.drain(..).collect();
self.start_subnet_query(grouped_queries);
processed = true;
}
None => {} // Queue is empty
}
}
// Update the queue metric
@ -701,9 +664,12 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
// Returns a boolean indicating if we are currently processing the maximum number of
// concurrent queries or not.
// concurrent subnet queries or not.
fn at_capacity(&self) -> bool {
self.active_queries.len() >= MAX_CONCURRENT_QUERIES
self.active_queries
.len()
.saturating_sub(self.find_peer_active as usize) // We only count active subnet queries
>= MAX_CONCURRENT_SUBNET_QUERIES
}
/// Runs a discovery request for a given group of subnets.
@ -754,7 +720,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
"subnets" => ?filtered_subnet_queries,
);
self.start_query(
GroupedQueryType::Subnet(filtered_subnet_queries),
QueryType::Subnet(filtered_subnet_queries),
TARGET_PEERS_FOR_GROUPED_QUERY,
subnet_predicate,
);
@ -768,14 +734,14 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
/// ENR.
fn start_query(
&mut self,
grouped_query: GroupedQueryType,
query: QueryType,
target_peers: usize,
additional_predicate: impl Fn(&Enr) -> bool + Send + 'static,
) {
// Make sure there are subnet queries included
let contains_queries = match &grouped_query {
GroupedQueryType::Subnet(queries) => !queries.is_empty(),
GroupedQueryType::FindPeers => true,
let contains_queries = match &query {
QueryType::Subnet(queries) => !queries.is_empty(),
QueryType::FindPeers => true,
};
if !contains_queries {
@ -813,7 +779,10 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
let query_future = self
.discv5
.find_node_predicate(random_node, predicate, target_peers)
.map(|v| QueryResult(grouped_query, v));
.map(|v| QueryResult {
query_type: query,
result: v,
});
// Add the future to active queries, to be executed.
self.active_queries.push(Box::pin(query_future));
@ -822,12 +791,12 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
/// Process the completed QueryResult returned from discv5.
fn process_completed_queries(
&mut self,
query_result: QueryResult,
query: QueryResult,
) -> Option<HashMap<PeerId, Option<Instant>>> {
match query_result.0 {
GroupedQueryType::FindPeers => {
match query.query_type {
QueryType::FindPeers => {
self.find_peer_active = false;
match query_result.1 {
match query.result {
Ok(r) if r.is_empty() => {
debug!(self.log, "Discovery query yielded no results.");
}
@ -846,10 +815,10 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
}
}
GroupedQueryType::Subnet(queries) => {
QueryType::Subnet(queries) => {
let subnets_searched_for: Vec<Subnet> =
queries.iter().map(|query| query.subnet).collect();
match query_result.1 {
match query.result {
Ok(r) if r.is_empty() => {
debug!(self.log, "Grouped subnet discovery query yielded no results."; "subnets_searched_for" => ?subnets_searched_for);
queries.iter().for_each(|query| {
@ -1144,10 +1113,7 @@ mod tests {
subnet_query.min_ttl,
subnet_query.retries,
);
assert_eq!(
discovery.queued_queries.back(),
Some(&QueryType::Subnet(subnet_query.clone()))
);
assert_eq!(discovery.queued_queries.back(), Some(&subnet_query));
// New query should replace old query
subnet_query.min_ttl = Some(now + Duration::from_secs(1));
@ -1158,7 +1124,7 @@ mod tests {
assert_eq!(discovery.queued_queries.len(), 1);
assert_eq!(
discovery.queued_queries.pop_back(),
Some(QueryType::Subnet(subnet_query.clone()))
Some(subnet_query.clone())
);
// Retries > MAX_DISCOVERY_RETRY must return immediately without adding
@ -1172,39 +1138,6 @@ mod tests {
assert_eq!(discovery.queued_queries.len(), 0);
}
#[tokio::test]
async fn test_process_queue() {
let mut discovery = build_discovery().await;
// FindPeers query is processed if there is no subnet query
discovery.queued_queries.push_back(QueryType::FindPeers);
assert!(discovery.process_queue());
let now = Instant::now();
let subnet_query = SubnetQuery {
subnet: Subnet::Attestation(SubnetId::new(1)),
min_ttl: Some(now + Duration::from_secs(10)),
retries: 0,
};
// Refresh active queries
discovery.active_queries = Default::default();
// SubnetQuery is processed if it's the only queued query
discovery
.queued_queries
.push_back(QueryType::Subnet(subnet_query.clone()));
assert!(discovery.process_queue());
// SubnetQuery is processed if it's there is also 1 queued discovery query
discovery.queued_queries.push_back(QueryType::FindPeers);
discovery
.queued_queries
.push_back(QueryType::Subnet(subnet_query));
// Process Subnet query and FindPeers afterwards.
assert!(discovery.process_queue());
}
fn make_enr(subnet_ids: Vec<usize>) -> Enr {
let mut builder = EnrBuilder::new("v4");
let keypair = libp2p::identity::Keypair::generate_secp256k1();
@ -1227,7 +1160,7 @@ mod tests {
let instant1 = Some(now + Duration::from_secs(10));
let instant2 = Some(now + Duration::from_secs(5));
let query = GroupedQueryType::Subnet(vec![
let query = QueryType::Subnet(vec![
SubnetQuery {
subnet: Subnet::Attestation(SubnetId::new(1)),
min_ttl: instant1,
@ -1248,7 +1181,10 @@ mod tests {
let enrs: Vec<Enr> = vec![enr1.clone(), enr2, enr3];
let results = discovery
.process_completed_queries(QueryResult(query, Ok(enrs)))
.process_completed_queries(QueryResult {
query_type: query,
result: Ok(enrs),
})
.unwrap();
// enr1 and enr2 are required peers based on the requested subnet ids