add retry logic to peer discovery and an expiration time for peers ()

* add retry logic to peer discovery and an expiration time for peers

* Restructure discovery

* Add mac build to CI

* Always return an error for Health when not linux

* Change macos workflow

* Rename macos tests

* Update DiscoverPeers messages to pass Instants. Implement PartialEq for AttServiceMessage

* update discover peer queueing to always check existing messages and extend min_ttl as necessary

* update method name and comment

* Correct merge issues

* Add subnet id check to partialeq, fix discover peer message dups

* fix discover peer message dups

* fix discover peer message dups for real this time

Co-authored-by: Age Manning <Age@AgeManning.com>
Co-authored-by: Paul Hauner <paul@paulhauner.com>
This commit is contained in:
realbigsean 2020-06-05 00:55:03 -04:00 committed by GitHub
parent 0e37a16927
commit 036096ef61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 442 additions and 141 deletions
Cargo.lock
beacon_node
eth2-libp2p/src
network
Cargo.toml
src
attestation_service
service.rs

7
Cargo.lock generated
View File

@ -178,6 +178,12 @@ dependencies = [
"syn",
]
[[package]]
name = "assert_approx_eq"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c07dab4369547dbe5114677b33fbbf724971019f3818172d59a97a61c774ffd"
[[package]]
name = "assert_matches"
version = "1.3.0"
@ -2936,6 +2942,7 @@ dependencies = [
name = "network"
version = "0.1.2"
dependencies = [
"assert_approx_eq",
"beacon_chain",
"environment",
"error-chain",

View File

@ -25,6 +25,7 @@ use std::{
marker::PhantomData,
sync::Arc,
task::{Context, Poll},
time::Instant,
};
use types::{EnrForkId, EthSpec, SignedBeaconBlock, SubnetId};
@ -459,9 +460,10 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
self.update_metadata();
}
/// A request to search for peers connected to a long-lived subnet.
pub fn peers_request(&mut self, subnet_id: SubnetId) {
self.discovery.peers_request(subnet_id);
/// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we
/// would like to retain the peers for.
pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>) {
self.discovery.discover_subnet_peers(subnet_id, min_ttl)
}
/// Updates the local ENR's "eth2" field with the latest EnrForkId.

View File

@ -8,7 +8,7 @@ pub use enr_ext::{CombinedKeyExt, EnrExt};
use crate::metrics;
use crate::{error, Enr, NetworkConfig, NetworkGlobals};
use discv5::{enr::NodeId, Discv5, Discv5Event};
use discv5::{enr::NodeId, Discv5, Discv5Event, QueryId};
use enr::{Eth2Enr, BITFIELD_ENR_KEY, ETH2_ENR_KEY};
use futures::prelude::*;
use libp2p::core::{connection::ConnectionId, Multiaddr, PeerId};
@ -18,20 +18,24 @@ use libp2p::swarm::{
NetworkBehaviourAction, PollParameters, ProtocolsHandler,
};
use lru::LruCache;
use slog::{crit, debug, info, warn};
use slog::{crit, debug, info, trace, warn};
use ssz::{Decode, Encode};
use ssz_types::BitVector;
use std::{
collections::{HashSet, VecDeque},
collections::{HashMap, HashSet, VecDeque},
net::SocketAddr,
path::Path,
sync::Arc,
task::{Context, Poll},
time::Duration,
time::{Duration, Instant},
};
use tokio::time::{delay_until, Delay, Instant};
use tokio::time::{delay_until, Delay};
use types::{EnrForkId, EthSpec, SubnetId};
mod subnet_predicate;
use subnet_predicate::subnet_predicate;
/// Maximum seconds before searching for extra peers.
const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 120;
/// Initial delay between peer searches.
@ -41,7 +45,18 @@ const MINIMUM_PEERS_BEFORE_DELAY_INCREASE: usize = 5;
/// Local ENR storage filename.
pub const ENR_FILENAME: &str = "enr.dat";
/// Number of peers we'd like to have connected to a given long-lived subnet.
const TARGET_SUBNET_PEERS: u64 = 3;
const TARGET_SUBNET_PEERS: usize = 3;
/// Number of times to attempt a discovery request
const MAX_DISCOVERY_RETRY: u64 = 3;
/// A struct representing the information associated with a single discovery request,
/// which can be retried with multiple queries
#[derive(Clone, Debug)]
pub struct Request {
pub query_id: Option<QueryId>,
pub min_ttl: Option<Instant>,
pub retries: u64,
}
/// Lighthouse discovery behaviour. This provides peer management and discovery using the Discv5
/// libp2p protocol.
@ -79,6 +94,9 @@ pub struct Discovery<TSpec: EthSpec> {
/// A collection of network constants that can be read from other threads.
network_globals: Arc<NetworkGlobals<TSpec>>,
/// A mapping of SubnetId that we are currently searching for to all information associated with each request.
subnet_queries: HashMap<SubnetId, Request>,
/// Logger for the discovery behaviour.
log: slog::Logger,
}
@ -139,11 +157,12 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
cached_enrs: LruCache::new(50),
banned_peers: HashSet::new(),
max_peers: config.max_peers,
peer_discovery_delay: delay_until(Instant::now()),
peer_discovery_delay: delay_until(tokio::time::Instant::now()),
past_discovery_delay: INITIAL_SEARCH_DELAY,
tcp_port: config.libp2p_port,
discovery,
network_globals,
subnet_queries: HashMap::new(),
log,
enr_dir,
})
@ -280,57 +299,93 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
/// A request to find peers on a given subnet.
// TODO: This logic should be improved with added sophistication in peer management
// This currently checks for currently connected peers and if we don't have
// PEERS_WANTED_BEFORE_DISCOVERY connected to a given subnet we search for more.
pub fn peers_request(&mut self, subnet_id: SubnetId) {
pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>) {
// TODO: Extend this to an event once discovery becomes a thread managed by the peer
// manager
if let Some(min_ttl) = min_ttl {
self.network_globals
.peers
.write()
.extend_peers_on_subnet(subnet_id, min_ttl);
}
// If there is already a discovery request in process for this subnet, ignore this request,
// but update the min_ttl.
if let Some(request) = self.subnet_queries.get_mut(&subnet_id) {
// update the min_ttl if required
if let Some(min_ttl) = min_ttl {
if request.min_ttl < Some(min_ttl) {
request.min_ttl = Some(min_ttl);
}
}
return;
}
// Insert a request and start a query for the subnet
self.subnet_queries.insert(
subnet_id.clone(),
Request {
query_id: None,
min_ttl,
retries: 0,
},
);
self.run_subnet_query(subnet_id);
}
/// Runs a discovery request for a given subnet_id if one already exists.
fn run_subnet_query(&mut self, subnet_id: SubnetId) {
let mut request = match self.subnet_queries.remove(&subnet_id) {
Some(v) => v,
None => return, // request doesn't exist
};
// increment the retry count
request.retries += 1;
let peers_on_subnet = self
.network_globals
.peers
.read()
.peers_on_subnet(subnet_id)
.count() as u64;
.count();
if peers_on_subnet < TARGET_SUBNET_PEERS {
let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet;
debug!(self.log, "Searching for peers for subnet";
"subnet_id" => *subnet_id,
"connected_peers_on_subnet" => peers_on_subnet,
"target_subnet_peers" => TARGET_SUBNET_PEERS,
"peers_to_find" => target_peers
);
let log_clone = self.log.clone();
let subnet_predicate = move |enr: &Enr| {
if let Some(bitfield_bytes) = enr.get(BITFIELD_ENR_KEY) {
let bitfield = match BitVector::<TSpec::SubnetBitfieldLength>::from_ssz_bytes(
bitfield_bytes,
) {
Ok(v) => v,
Err(e) => {
warn!(log_clone, "Could not decode ENR bitfield for peer"; "peer_id" => format!("{}", enr.peer_id()), "error" => format!("{:?}", e));
return false;
}
};
return bitfield.get(*subnet_id as usize).unwrap_or_else(|_| {
debug!(log_clone, "Peer found but not on desired subnet"; "peer_id" => format!("{}", enr.peer_id()));
false
});
}
false
};
// start the query
self.start_query(subnet_predicate, target_peers as usize);
} else {
debug!(self.log, "Discovery ignored";
if peers_on_subnet > TARGET_SUBNET_PEERS {
trace!(self.log, "Discovery ignored";
"reason" => "Already connected to desired peers",
"connected_peers_on_subnet" => peers_on_subnet,
"target_subnet_peers" => TARGET_SUBNET_PEERS,
);
return;
}
// remove the entry and complete the query if greater than the maximum search count
if request.retries >= MAX_DISCOVERY_RETRY {
debug!(
self.log,
"Subnet peer discovery did not find sufficient peers. Reached max retry limit"
);
return;
}
let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet;
debug!(self.log, "Searching for peers for subnet";
"subnet_id" => *subnet_id,
"connected_peers_on_subnet" => peers_on_subnet,
"target_subnet_peers" => TARGET_SUBNET_PEERS,
"peers_to_find" => target_peers,
"attempt" => request.retries,
);
// start the query, and update the queries map if necessary
let subnet_predicate = subnet_predicate::<TSpec>(subnet_id, &self.log);
if let Some(query_id) = self.start_query(subnet_predicate, target_peers) {
request.query_id = Some(query_id);
} else {
// ENR is not present remove the query
return;
}
self.subnet_queries.insert(subnet_id, request);
}
/* Internal Functions */
@ -348,7 +403,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
/// This can optionally search for peers for a given predicate. Regardless of the predicate
/// given, this will only search for peers on the same enr_fork_id as specified in the local
/// ENR.
fn start_query<F>(&mut self, enr_predicate: F, num_nodes: usize)
fn start_query<F>(&mut self, enr_predicate: F, num_nodes: usize) -> Option<QueryId>
where
F: Fn(&Enr) -> bool + Send + 'static + Clone,
{
@ -359,18 +414,54 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
Ok(v) => v,
Err(e) => {
crit!(self.log, "Local ENR has no fork id"; "error" => e);
return;
return None;
}
};
// predicate for finding nodes with a matching fork
let eth2_fork_predicate = move |enr: &Enr| {
enr.eth2().map(|enr| enr.fork_digest) == Ok(enr_fork_id.fork_digest.clone())
};
let eth2_fork_predicate = move |enr: &Enr| enr.eth2() == Ok(enr_fork_id.clone());
let predicate = move |enr: &Enr| eth2_fork_predicate(enr) && enr_predicate(enr);
// general predicate
self.discovery
.find_enr_predicate(random_node, predicate, num_nodes);
Some(
self.discovery
.find_enr_predicate(random_node, predicate, num_nodes),
)
}
/// Peers that are found during discovery are optionally dialed.
// TODO: Shift to peer manager. As its own service, discovery should spit out discovered nodes
// and the peer manager should decide about who to connect to.
fn dial_discovered_peers(&mut self, peers: Vec<Enr>, min_ttl: Option<Instant>) {
for enr in peers {
// cache known peers
let peer_id = enr.peer_id();
self.cached_enrs.put(enr.peer_id(), enr);
// if we need more peers, attempt a connection
if self.network_globals.connected_or_dialing_peers() < self.max_peers
&& !self
.network_globals
.peers
.read()
.is_connected_or_dialing(&peer_id)
&& !self.banned_peers.contains(&peer_id)
{
debug!(self.log, "Connecting to discovered peer"; "peer_id"=> peer_id.to_string());
// TODO: Update output
// This should be updated with the peer dialing. In fact created once the peer is
// dialed
if let Some(min_ttl) = min_ttl {
self.network_globals
.peers
.write()
.update_min_ttl(&peer_id, min_ttl);
}
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
});
}
}
}
}
@ -440,7 +531,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
}
// Set to maximum, and update to earlier, once we get our results back.
self.peer_discovery_delay.reset(
Instant::now() + Duration::from_secs(MAX_TIME_BETWEEN_PEER_SEARCHES),
tokio::time::Instant::now()
+ Duration::from_secs(MAX_TIME_BETWEEN_PEER_SEARCHES),
);
}
Poll::Pending => break,
@ -477,7 +569,11 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
address,
});
}
Discv5Event::FindNodeResult { closer_peers, .. } => {
Discv5Event::FindNodeResult {
closer_peers,
query_id,
..
} => {
debug!(self.log, "Discovery query completed"; "peers_found" => closer_peers.len());
// update the time to the next query
if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES
@ -486,40 +582,30 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
{
self.past_discovery_delay *= 2;
}
let delay = std::cmp::min(
let delay = std::cmp::max(
self.past_discovery_delay,
MAX_TIME_BETWEEN_PEER_SEARCHES,
);
self.peer_discovery_delay
.reset(Instant::now() + Duration::from_secs(delay));
.reset(tokio::time::Instant::now() + Duration::from_secs(delay));
for enr in closer_peers {
// cache known peers
let peer_id = enr.peer_id();
self.cached_enrs.put(enr.peer_id(), enr);
// if we need more peers, attempt a connection
if self.network_globals.connected_or_dialing_peers()
< self.max_peers
&& !self
.network_globals
.peers
.read()
.is_connected_or_dialing(&peer_id)
&& !self.banned_peers.contains(&peer_id)
{
// TODO: Debugging only
// NOTE: The peer manager will get updated by the global swarm.
let connection_status = self
.network_globals
.peers
.read()
.connection_status(&peer_id);
debug!(self.log, "Connecting to discovered peer"; "peer_id"=> peer_id.to_string(), "status" => format!("{:?}", connection_status));
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
});
// if this is a subnet query, run it to completion
if let Some((subnet_id, min_ttl)) = self
.subnet_queries
.iter()
.find(|(_, request)| request.query_id == Some(query_id))
.map(|(subnet_id, request)| {
(subnet_id.clone(), request.min_ttl.clone())
})
{
debug!(self.log, "Peer subnet discovery request completed"; "peers_found" => closer_peers.len(), "subnet_id" => *subnet_id);
self.dial_discovered_peers(closer_peers, min_ttl);
self.run_subnet_query(subnet_id);
} else {
if closer_peers.is_empty() {
debug!(self.log, "Peer Discovery request yielded no results.");
} else {
self.dial_discovered_peers(closer_peers, None);
}
}
}

View File

@ -0,0 +1,33 @@
///! The subnet predicate used for searching for a particular subnet.
use super::*;
/// Returns the predicate for a given subnet.
pub fn subnet_predicate<TSpec>(
subnet_id: SubnetId,
log: &slog::Logger,
) -> impl Fn(&Enr) -> bool + Send + 'static + Clone
where
TSpec: EthSpec,
{
let log_clone = log.clone();
move |enr: &Enr| {
if let Some(bitfield_bytes) = enr.get(BITFIELD_ENR_KEY) {
let bitfield = match BitVector::<TSpec::SubnetBitfieldLength>::from_ssz_bytes(
bitfield_bytes,
) {
Ok(v) => v,
Err(e) => {
warn!(log_clone, "Could not decode ENR bitfield for peer"; "peer_id" => format!("{}", enr.peer_id()), "error" => format!("{:?}", e));
return false;
}
};
return bitfield.get(*subnet_id as usize).unwrap_or_else(|_| {
debug!(log_clone, "Peer found but not on desired subnet"; "peer_id" => format!("{}", enr.peer_id()));
false
});
}
false
}
}

View File

@ -31,6 +31,10 @@ pub struct PeerInfo<T: EthSpec> {
/// The ENR subnet bitfield of the peer. This may be determined after it's initial
/// connection.
pub meta_data: Option<MetaData<T>>,
/// The time we would like to retain this peer. After this time, the peer is no longer
/// necessary.
#[serde(skip)]
pub min_ttl: Option<Instant>,
}
impl<TSpec: EthSpec> Default for PeerInfo<TSpec> {
@ -43,6 +47,7 @@ impl<TSpec: EthSpec> Default for PeerInfo<TSpec> {
listening_addresses: vec![],
sync_status: PeerSyncStatus::Unknown,
meta_data: None,
min_ttl: None,
}
}
}

View File

@ -2,7 +2,7 @@ use super::peer_info::{PeerConnectionStatus, PeerInfo};
use super::peer_sync_status::PeerSyncStatus;
use crate::rpc::methods::MetaData;
use crate::PeerId;
use slog::{crit, debug, warn};
use slog::{crit, debug, trace, warn};
use std::collections::{hash_map::Entry, HashMap};
use std::time::Instant;
use types::{EthSpec, SubnetId};
@ -236,6 +236,42 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
debug!(self.log, "Peer dialing in db"; "peer_id" => peer_id.to_string(), "n_dc" => self.n_dc);
}
/// Update min ttl of a peer.
pub fn update_min_ttl(&mut self, peer_id: &PeerId, min_ttl: Instant) {
let info = self.peers.entry(peer_id.clone()).or_default();
// only update if the ttl is longer
if info.min_ttl.is_none() || Some(min_ttl) > info.min_ttl {
info.min_ttl = Some(min_ttl);
let min_ttl_secs = min_ttl
.checked_duration_since(Instant::now())
.map(|duration| duration.as_secs())
.unwrap_or_else(|| 0);
debug!(self.log, "Updating the time a peer is required for"; "peer_id" => peer_id.to_string(), "future_min_ttl_secs" => min_ttl_secs);
}
}
/// Extends the ttl of all peers on the given subnet that have a shorter
/// min_ttl than what's given.
pub fn extend_peers_on_subnet(&mut self, subnet_id: SubnetId, min_ttl: Instant) {
let log = &self.log;
self.peers.iter_mut()
.filter(move |(_, info)| {
info.connection_status.is_connected() && info.on_subnet(subnet_id)
})
.for_each(|(peer_id,info)| {
if info.min_ttl.is_none() || Some(min_ttl) > info.min_ttl {
info.min_ttl = Some(min_ttl);
}
let min_ttl_secs = min_ttl
.checked_duration_since(Instant::now())
.map(|duration| duration.as_secs())
.unwrap_or_else(|| 0);
trace!(log, "Updating minimum duration a peer is required for"; "peer_id" => peer_id.to_string(), "min_ttl" => min_ttl_secs);
});
}
/// Sets a peer as connected with an ingoing connection.
pub fn connect_ingoing(&mut self, peer_id: &PeerId) {
let info = self.peers.entry(peer_id.clone()).or_default();

View File

@ -11,6 +11,7 @@ lazy_static = "1.4.0"
matches = "0.1.8"
tempfile = "3.1.0"
exit-future = "0.2.0"
assert_approx_eq = "1.1.0"
[dependencies]
beacon_chain = { path = "../beacon_chain" }
@ -35,4 +36,4 @@ fnv = "1.0.6"
rlp = "0.4.5"
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
environment = { path = "../../lighthouse/environment" }
environment = { path = "../../lighthouse/environment" }

View File

@ -28,15 +28,19 @@ const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 1;
const TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 6;
/// The time (in slots) before a last seen validator is considered absent and we unsubscribe from the random
/// gossip topics that we subscribed to due to the validator connection.
const LAST_SEEN_VALIDATOR_TIMEOUT: u32 = 150; // 30 mins at a 12s slot time
const LAST_SEEN_VALIDATOR_TIMEOUT: u32 = 150;
// 30 mins at a 12s slot time
/// The fraction of a slot that we subscribe to a subnet before the required slot.
///
/// Note: The time is calculated as `time = milliseconds_per_slot / ADVANCE_SUBSCRIPTION_TIME`.
const ADVANCE_SUBSCRIBE_TIME: u32 = 3;
/// The default number of slots before items in hash delay sets used by this class should expire.
const DEFAULT_EXPIRATION_TIMEOUT: u32 = 3; // 36s at 12s slot time
const DEFAULT_EXPIRATION_TIMEOUT: u32 = 3;
// 36s at 12s slot time
/// The default number of slots before items in hash delay sets used by this class should expire.
const DURATION_DIFFERENCE: Duration = Duration::from_millis(1);
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, Eq, Clone)]
pub enum AttServiceMessage {
/// Subscribe to the specified subnet id.
Subscribe(SubnetId),
@ -47,12 +51,45 @@ pub enum AttServiceMessage {
/// Remove the `SubnetId` from the ENR bitfield.
EnrRemove(SubnetId),
/// Discover peers for a particular subnet.
DiscoverPeers(SubnetId),
/// The includes the `Instant` we need the discovered peer until.
DiscoverPeers {
subnet_id: SubnetId,
min_ttl: Option<Instant>,
},
}
impl PartialEq for AttServiceMessage {
fn eq(&self, other: &AttServiceMessage) -> bool {
match (self, other) {
(&AttServiceMessage::Subscribe(a), &AttServiceMessage::Subscribe(b)) => a == b,
(&AttServiceMessage::Unsubscribe(a), &AttServiceMessage::Unsubscribe(b)) => a == b,
(&AttServiceMessage::EnrAdd(a), &AttServiceMessage::EnrAdd(b)) => a == b,
(&AttServiceMessage::EnrRemove(a), &AttServiceMessage::EnrRemove(b)) => a == b,
(
&AttServiceMessage::DiscoverPeers { subnet_id, min_ttl },
&AttServiceMessage::DiscoverPeers {
subnet_id: other_subnet_id,
min_ttl: other_min_ttl,
},
) => match (min_ttl, other_min_ttl) {
(Some(min_ttl_instant), Some(other_min_ttl_instant)) => {
min_ttl_instant.saturating_duration_since(other_min_ttl_instant)
< DURATION_DIFFERENCE
&& other_min_ttl_instant.saturating_duration_since(min_ttl_instant)
< DURATION_DIFFERENCE
&& subnet_id == other_subnet_id
}
(None, None) => subnet_id == other_subnet_id,
_ => false,
},
_ => false,
}
}
}
/// A particular subnet at a given slot.
#[derive(PartialEq, Eq, Hash, Clone)]
struct ExactSubnet {
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub struct ExactSubnet {
/// The `SubnetId` associated with this subnet.
pub subnet_id: SubnetId,
/// The `Slot` associated with this subnet.
@ -244,24 +281,18 @@ impl<T: BeaconChainTypes> AttestationService<T> {
return Ok(());
}
// check current event log to see if there is a discovery event queued
if self
.events
.iter()
.find(|event| event == &&AttServiceMessage::DiscoverPeers(exact_subnet.subnet_id))
.is_some()
{
// already queued a discovery event
return Ok(());
}
// if the slot is more than epoch away, add an event to start looking for peers
if exact_subnet.slot
< current_slot.saturating_add(TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD)
{
// then instantly add a discovery request
self.events
.push_back(AttServiceMessage::DiscoverPeers(exact_subnet.subnet_id));
// add one slot to ensure we keep the peer for the subscription slot
let min_ttl = self
.beacon_chain
.slot_clock
.duration_to_slot(exact_subnet.slot + 1)
.map(|duration| std::time::Instant::now() + duration);
self.send_or_update_discovery_event(exact_subnet.subnet_id, min_ttl);
} else {
// Queue the discovery event to be executed for
// TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD
@ -296,6 +327,52 @@ impl<T: BeaconChainTypes> AttestationService<T> {
Ok(())
}
/// Checks if we have a discover peers event already and sends a new event if necessary
///
/// If a message exists for the same subnet, compare the `min_ttl` of the current and
/// existing messages and extend the existing message as necessary.
fn send_or_update_discovery_event(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>) {
// track whether this message already exists in the event queue
let mut is_duplicate = false;
self.events.iter_mut().for_each(|event| {
match event {
AttServiceMessage::DiscoverPeers {
subnet_id: other_subnet_id,
min_ttl: other_min_ttl,
} => {
if subnet_id == *other_subnet_id {
let other_min_ttl_clone = other_min_ttl.clone();
match (min_ttl, other_min_ttl_clone) {
(Some(min_ttl_instant), Some(other_min_ttl_instant)) =>
// only update the min_ttl if it is greater than the existing min_ttl and a DURATION_DIFFERENCE padding
{
if min_ttl_instant.saturating_duration_since(other_min_ttl_instant)
> DURATION_DIFFERENCE
{
*other_min_ttl = min_ttl;
}
}
(None, Some(_)) => {
// Update the min_ttl to None, because the new message is longer-lived.
*other_min_ttl = None;
}
(Some(_), None) => {} // Don't replace this because the existing message is for a longer-lived peer.
(None, None) => {} // Duplicate message, do nothing.
}
is_duplicate = true;
return;
}
}
_ => {}
};
});
if !is_duplicate {
self.events
.push_back(AttServiceMessage::DiscoverPeers { subnet_id, min_ttl });
}
}
/// Checks the current random subnets and subscriptions to determine if a new subscription for this
/// subnet is required for the given slot.
///
@ -436,18 +513,17 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// if we are not already subscribed, then subscribe
let topic_kind = &GossipKind::CommitteeIndex(subnet_id);
if let None = self
let already_subscribed = self
.network_globals
.gossipsub_subscriptions
.read()
.iter()
.find(|topic| topic.kind() == topic_kind)
{
// not already subscribed to the topic
.is_some();
if !already_subscribed {
// send a discovery request and a subscription
self.events
.push_back(AttServiceMessage::DiscoverPeers(subnet_id));
self.send_or_update_discovery_event(subnet_id, None);
self.events
.push_back(AttServiceMessage::Subscribe(subnet_id));
}
@ -461,8 +537,15 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/// Request a discovery query to find peers for a particular subnet.
fn handle_discover_peers(&mut self, exact_subnet: ExactSubnet) {
debug!(self.log, "Searching for peers for subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot);
self.events
.push_back(AttServiceMessage::DiscoverPeers(exact_subnet.subnet_id));
// add one slot to ensure we keep the peer for the subscription slot
let min_ttl = self
.beacon_chain
.slot_clock
.duration_to_slot(exact_subnet.slot + 1)
.map(|duration| std::time::Instant::now() + duration);
self.send_or_update_discovery_event(exact_subnet.subnet_id, min_ttl)
}
/// A queued subscription is ready.
@ -619,7 +702,7 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
match self.discover_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_discover_peers(exact_subnet),
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for peer discovery requests"; "error"=> format!("{}", e));
error!(self.log, "Failed to check for peer discovery requests"; "error"=> format ! ("{}", e));
}
Poll::Ready(None) | Poll::Pending => {}
}

View File

@ -16,10 +16,9 @@ mod tests {
use slog::Logger;
use sloggers::{null::NullLoggerBuilder, Build};
use slot_clock::{SlotClock, SystemTimeSlotClock};
use std::time::SystemTime;
use std::time::{Duration, SystemTime};
use store::MemoryStore;
use tempfile::tempdir;
use tokio::time::Duration;
use types::{CommitteeIndex, EnrForkId, EthSpec, MinimalEthSpec};
const SLOT_DURATION_MILLIS: u64 = 200;
@ -192,7 +191,10 @@ mod tests {
assert_matches!(
events[..3],
[
AttServiceMessage::DiscoverPeers(_any2),
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant
},
AttServiceMessage::Subscribe(_any1),
AttServiceMessage::EnrAdd(_any3)
]
@ -240,7 +242,10 @@ mod tests {
assert_matches!(
events[..3],
[
AttServiceMessage::DiscoverPeers(_any2),
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant
},
AttServiceMessage::Subscribe(_any1),
AttServiceMessage::EnrAdd(_any3)
]
@ -278,16 +283,28 @@ mod tests {
.validator_subscriptions(subscriptions)
.unwrap();
let min_ttl = Instant::now().checked_add(
attestation_service
.beacon_chain
.slot_clock
.duration_to_slot(current_slot + Slot::new(subscription_slot) + Slot::new(1))
.unwrap(),
);
// just discover peers, don't subscribe yet
let expected = vec![AttServiceMessage::DiscoverPeers(SubnetId::new(
validator_index,
))];
let expected = vec![AttServiceMessage::DiscoverPeers {
subnet_id: SubnetId::new(validator_index),
min_ttl,
}];
let events = get_events(attestation_service, no_events_expected, 1).await;
assert_matches!(
events[..3],
[
AttServiceMessage::DiscoverPeers(_any1),
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant
},
AttServiceMessage::Subscribe(_any2),
AttServiceMessage::EnrAdd(_any3)
]
@ -325,9 +342,20 @@ mod tests {
.validator_subscriptions(subscriptions)
.unwrap();
let min_ttl = Instant::now().checked_add(
attestation_service
.beacon_chain
.slot_clock
.duration_to_slot(current_slot + Slot::new(subscription_slot) + Slot::new(1))
.unwrap(),
);
// we should discover peers, wait, then subscribe
let expected = vec![
AttServiceMessage::DiscoverPeers(SubnetId::new(validator_index)),
AttServiceMessage::DiscoverPeers {
subnet_id: SubnetId::new(validator_index),
min_ttl,
},
AttServiceMessage::Subscribe(SubnetId::new(validator_index)),
];
@ -335,7 +363,10 @@ mod tests {
assert_matches!(
events[..3],
[
AttServiceMessage::DiscoverPeers(_any1),
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant
},
AttServiceMessage::Subscribe(_any2),
AttServiceMessage::EnrAdd(_any3)
]
@ -381,7 +412,10 @@ mod tests {
assert_matches!(
events[..3],
[
AttServiceMessage::DiscoverPeers(_any1),
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant
},
AttServiceMessage::Subscribe(_any2),
AttServiceMessage::EnrAdd(_any3)
]
@ -419,17 +453,29 @@ mod tests {
.validator_subscriptions(subscriptions)
.unwrap();
let min_ttl = Instant::now().checked_add(
attestation_service
.beacon_chain
.slot_clock
.duration_to_slot(current_slot + Slot::new(subscription_slot) + Slot::new(1))
.unwrap(),
);
// expect discover peers because we will enter TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD range
let expected: Vec<AttServiceMessage> = vec![AttServiceMessage::DiscoverPeers(
SubnetId::new(validator_index),
)];
let expected: Vec<AttServiceMessage> = vec![AttServiceMessage::DiscoverPeers {
subnet_id: SubnetId::new(validator_index),
min_ttl,
}];
let events = get_events(attestation_service, no_events_expected, 5).await;
assert_matches!(
events[..3],
[
AttServiceMessage::DiscoverPeers(_any1),
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant
},
AttServiceMessage::Subscribe(_any2),
AttServiceMessage::EnrAdd(_any3)
]
@ -470,9 +516,10 @@ mod tests {
for event in events {
match event {
AttServiceMessage::DiscoverPeers(_any_subnet) => {
discover_peer_count = discover_peer_count + 1
}
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant,
} => discover_peer_count = discover_peer_count + 1,
AttServiceMessage::Subscribe(_any_subnet) => subscribe_count = subscribe_count + 1,
AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count = enr_add_count + 1,
_ => unexpected_msg_count = unexpected_msg_count + 1,
@ -517,9 +564,10 @@ mod tests {
for event in events {
match event {
AttServiceMessage::DiscoverPeers(_any_subnet) => {
discover_peer_count = discover_peer_count + 1
}
AttServiceMessage::DiscoverPeers {
subnet_id: _any_subnet,
min_ttl: _any_instant,
} => discover_peer_count = discover_peer_count + 1,
AttServiceMessage::Subscribe(_any_subnet) => subscribe_count = subscribe_count + 1,
AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count = enr_add_count + 1,
_ => unexpected_msg_count = unexpected_msg_count + 1,

View File

@ -258,8 +258,8 @@ fn spawn_service<T: BeaconChainTypes>(
AttServiceMessage::EnrRemove(subnet_id) => {
service.libp2p.swarm.update_enr_subnet(subnet_id, false);
}
AttServiceMessage::DiscoverPeers(subnet_id) => {
service.libp2p.swarm.peers_request(subnet_id);
AttServiceMessage::DiscoverPeers{subnet_id, min_ttl} => {
service.libp2p.swarm.discover_subnet_peers(subnet_id, min_ttl);
}
}
}