Dial cached enr's before making subnet discovery query (#1376)

## Issue Addressed

Closes #1365 

## Proposed Changes

Dial peers in the `cached_enrs` who aren't connected, aren't banned and satisfy the subnet predicate before making a subnet discovery query.
This commit is contained in:
Pawan Dhananjay 2020-09-11 00:52:27 +00:00
parent d79366c503
commit 0525876882
2 changed files with 70 additions and 14 deletions

View File

@ -31,12 +31,12 @@ use tokio::sync::mpsc;
use types::{EnrForkId, EthSpec, SubnetId}; use types::{EnrForkId, EthSpec, SubnetId};
mod subnet_predicate; mod subnet_predicate;
use subnet_predicate::subnet_predicate; pub use subnet_predicate::subnet_predicate;
/// Local ENR storage filename. /// Local ENR storage filename.
pub const ENR_FILENAME: &str = "enr.dat"; pub const ENR_FILENAME: &str = "enr.dat";
/// Target number of peers we'd like to have connected to a given long-lived subnet. /// Target number of peers we'd like to have connected to a given long-lived subnet.
const TARGET_SUBNET_PEERS: usize = 3; pub const TARGET_SUBNET_PEERS: usize = 3;
/// Target number of peers to search for given a grouped subnet query. /// Target number of peers to search for given a grouped subnet query.
const TARGET_PEERS_FOR_GROUPED_QUERY: usize = 6; const TARGET_PEERS_FOR_GROUPED_QUERY: usize = 6;
/// Number of times to attempt a discovery request. /// Number of times to attempt a discovery request.
@ -287,6 +287,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
self.discv5.local_enr() self.discv5.local_enr()
} }
/// Return the cached enrs.
pub fn cached_enrs(&self) -> impl Iterator<Item = (&PeerId, &Enr)> {
self.cached_enrs.iter()
}
/// This adds a new `FindPeers` query to the queue if one doesn't already exist. /// This adds a new `FindPeers` query to the queue if one doesn't already exist.
pub fn discover_peers(&mut self) { pub fn discover_peers(&mut self) {
// If the discv5 service isn't running or we are in the process of a query, don't bother queuing a new one. // If the discv5 service isn't running or we are in the process of a query, don't bother queuing a new one.
@ -558,7 +563,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
.peers_on_subnet(subnet_query.subnet_id) .peers_on_subnet(subnet_query.subnet_id)
.count(); .count();
if peers_on_subnet > TARGET_SUBNET_PEERS { if peers_on_subnet >= TARGET_SUBNET_PEERS {
debug!(self.log, "Discovery ignored"; debug!(self.log, "Discovery ignored";
"reason" => "Already connected to desired peers", "reason" => "Already connected to desired peers",
"connected_peers_on_subnet" => peers_on_subnet, "connected_peers_on_subnet" => peers_on_subnet,

View File

@ -1,7 +1,7 @@
//! Implementation of a Lighthouse's peer management system. //! Implementation of a Lighthouse's peer management system.
pub use self::peerdb::*; pub use self::peerdb::*;
use crate::discovery::{Discovery, DiscoveryEvent}; use crate::discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS};
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode}; use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::{error, metrics}; use crate::{error, metrics};
use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId, SubnetDiscovery}; use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId, SubnetDiscovery};
@ -19,7 +19,7 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use types::EthSpec; use types::{EthSpec, SubnetId};
pub use libp2p::core::{identity::Keypair, Multiaddr}; pub use libp2p::core::{identity::Keypair, Multiaddr};
@ -214,18 +214,45 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// A request to find peers on a given subnet. /// A request to find peers on a given subnet.
pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<SubnetDiscovery>) { pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<SubnetDiscovery>) {
// Extend the time to maintain peers if required. let filtered: Vec<SubnetDiscovery> = subnets_to_discover
for s in subnets_to_discover.iter() { .into_iter()
if let Some(min_ttl) = s.min_ttl { .filter(|s| {
self.network_globals // Extend min_ttl of connected peers on required subnets
if let Some(min_ttl) = s.min_ttl {
self.network_globals
.peers
.write()
.extend_peers_on_subnet(s.subnet_id, min_ttl);
}
// Already have target number of peers, no need for subnet discovery
let peers_on_subnet = self
.network_globals
.peers .peers
.write() .read()
.extend_peers_on_subnet(s.subnet_id, min_ttl); .peers_on_subnet(s.subnet_id)
} .count();
} if peers_on_subnet >= TARGET_SUBNET_PEERS {
debug!(
self.log,
"Discovery query ignored";
"subnet_id" => format!("{:?}",s.subnet_id),
"reason" => "Already connected to desired peers",
"connected_peers_on_subnet" => peers_on_subnet,
"target_subnet_peers" => TARGET_SUBNET_PEERS,
);
false
// Queue an outgoing connection request to the cached peers that are on `s.subnet_id`.
// If we connect to the cached peers before the discovery query starts, then we potentially
// save a costly discovery query.
} else {
self.dial_cached_enrs_in_subnet(s.subnet_id);
true
}
})
.collect();
// request the subnet query from discovery // request the subnet query from discovery
self.discovery.discover_subnet_peers(subnets_to_discover); self.discovery.discover_subnet_peers(filtered);
} }
/// A STATUS message has been received from a peer. This resets the status timer. /// A STATUS message has been received from a peer. This resets the status timer.
@ -531,6 +558,30 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.events.push(PeerManagerEvent::SocketUpdated(multiaddr)); self.events.push(PeerManagerEvent::SocketUpdated(multiaddr));
} }
/// Dial cached enrs in discovery service that are in the given `subnet_id` and aren't
/// in Connected, Dialing or Banned state.
fn dial_cached_enrs_in_subnet(&mut self, subnet_id: SubnetId) {
let predicate = subnet_predicate::<TSpec>(vec![subnet_id], &self.log);
let peers_to_dial: Vec<PeerId> = self
.discovery()
.cached_enrs()
.filter_map(|(peer_id, enr)| {
let peers = self.network_globals.peers.read();
if predicate(enr)
&& !peers.is_connected_or_dialing(peer_id)
&& !peers.is_banned(peer_id)
{
Some(peer_id.clone())
} else {
None
}
})
.collect();
for peer in &peers_to_dial {
self.dial_peer(peer);
}
}
/// Peers that have been returned by discovery requests are dialed here if they are suitable. /// Peers that have been returned by discovery requests are dialed here if they are suitable.
/// ///
/// NOTE: By dialing `PeerId`s and not multiaddrs, libp2p requests the multiaddr associated /// NOTE: By dialing `PeerId`s and not multiaddrs, libp2p requests the multiaddr associated