From 08a31c5a1ae11826c9a96d63d9849668b6d33da6 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Sat, 8 Aug 2020 06:08:44 +0000 Subject: [PATCH] Disconnect peers (#1484) ## Issue Addressed Peers that connected after the peer limit may remain connected in some circumstances. This ensures peers not in the peer manager's list get disconnected. Further logging is also added to track this behaviour. --- .../src/behaviour/handler/delegate.rs | 2 +- .../eth2_libp2p/src/behaviour/handler/mod.rs | 11 +++--- beacon_node/eth2_libp2p/src/behaviour/mod.rs | 35 ++++++++++--------- beacon_node/eth2_libp2p/src/rpc/handler.rs | 23 +++++++----- 4 files changed, 37 insertions(+), 34 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs b/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs index 03a2a08a7..ae1bcd05c 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs @@ -49,7 +49,7 @@ impl DelegatingHandler { } /// Gives access to identify's handler. - pub fn identify(&self) -> &IdentifyHandler { + pub fn _identify(&self) -> &IdentifyHandler { &self.identify_handler } } diff --git a/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs index f74040d91..761695041 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs @@ -77,7 +77,7 @@ impl ProtocolsHandler for BehaviourHandler { fn inject_event(&mut self, event: Self::InEvent) { match event { BehaviourHandlerIn::Delegate(delegated_ev) => self.delegate.inject_event(delegated_ev), - /* Events comming from the behaviour */ + /* Events coming from the behaviour */ BehaviourHandlerIn::Shutdown(last_message) => { self.shutting_down = true; self.delegate.rpc_mut().shutdown(last_message); @@ -113,12 +113,9 @@ impl ProtocolsHandler for BehaviourHandler { >, > { // Disconnect if the sub-handlers are ready. - if self.shutting_down { - let rpc_keep_alive = self.delegate.rpc().connection_keep_alive(); - let identify_keep_alive = self.delegate.identify().connection_keep_alive(); - if KeepAlive::No == rpc_keep_alive.max(identify_keep_alive) { - return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Disconnected)); - } + // Currently we only respect the RPC handler. + if self.shutting_down && KeepAlive::No == self.delegate.rpc().connection_keep_alive() { + return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Disconnected)); } match self.delegate.poll(cx) { diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index adf391e0c..8d8df58f9 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -694,15 +694,28 @@ impl NetworkBehaviour for Behaviour { conn_id: &ConnectionId, endpoint: &ConnectedPoint, ) { + // If the peer manager (and therefore the behaviour's) believe this peer connected, inform + // about the disconnection. + if self.network_globals.peers.read().is_connected(&peer_id) { + return; + } delegate_to_behaviours!(self, inject_connection_closed, peer_id, conn_id, endpoint); } // This gets called once there are no more active connections. fn inject_disconnected(&mut self, peer_id: &PeerId) { + // If the application/behaviour layers thinks this peer has connected inform it of the disconnect. + if self.network_globals.peers.read().is_connected(&peer_id) { + // Inform the application. + self.add_event(BehaviourEvent::PeerDisconnected(peer_id.clone())); + // Inform the behaviour. + delegate_to_behaviours!(self, inject_disconnected, peer_id); + } // Inform the peer manager. + // NOTE: It may be the case that a rejected node, due to too many peers is disconnected + // here and the peer manager has no knowledge of its connection. We insert it here for + // reference so that peer manager can track this peer. self.peer_manager.notify_disconnect(&peer_id); - // Inform the application. - self.add_event(BehaviourEvent::PeerDisconnected(peer_id.clone())); // Update the prometheus metrics metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); @@ -710,9 +723,6 @@ impl NetworkBehaviour for Behaviour { &metrics::PEERS_CONNECTED, self.network_globals.connected_peers() as i64, ); - - // Inform the behaviour. - delegate_to_behaviours!(self, inject_disconnected, peer_id); } // This gets called every time a connection is established. @@ -741,6 +751,7 @@ impl NetworkBehaviour for Behaviour { }; if goodbye_reason.is_some() { + debug!(self.log, "Disconnecting newly connected peer"; "peer_id" => peer_id.to_string(), "reason" => goodbye_reason.as_ref().expect("Is some").to_string()); self.peers_to_dc .push_back((peer_id.clone(), goodbye_reason)); return; @@ -771,18 +782,8 @@ impl NetworkBehaviour for Behaviour { // This gets called on the initial connection establishment. fn inject_connected(&mut self, peer_id: &PeerId) { - // Drop any connection from a banned peer. The goodbye and disconnects are handled in - // `inject_connection_established()`, which gets called first. - // The same holds if we reached the peer limit and the connected peer has no future duty. - if self.peer_manager.is_banned(peer_id) - || (self.peer_manager.peer_limit_reached() - && self - .network_globals - .peers - .read() - .peer_info(peer_id) - .map_or(true, |i| !i.has_future_duty())) - { + // If the PeerManager has connected this peer, inform the behaviours + if !self.network_globals.peers.read().is_connected(&peer_id) { return; } diff --git a/beacon_node/eth2_libp2p/src/rpc/handler.rs b/beacon_node/eth2_libp2p/src/rpc/handler.rs index 4ea412a50..9492942d7 100644 --- a/beacon_node/eth2_libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2_libp2p/src/rpc/handler.rs @@ -348,15 +348,20 @@ where // Check that we don't have outbound items pending for dialing, nor dialing, nor // established. Also check that there are no established inbound substreams. // Errors and events need to be reported back, so check those too. - let should_shutdown = if let HandlerState::ShuttingDown(_) = self.state { - self.dial_queue.is_empty() - && self.outbound_substreams.is_empty() - && self.inbound_substreams.is_empty() - && self.pending_errors.is_empty() - && self.events_out.is_empty() - && self.dial_negotiated == 0 - } else { - false + let should_shutdown = match self.state { + HandlerState::ShuttingDown(_) => { + self.dial_queue.is_empty() + && self.outbound_substreams.is_empty() + && self.inbound_substreams.is_empty() + && self.pending_errors.is_empty() + && self.events_out.is_empty() + && self.dial_negotiated == 0 + } + HandlerState::Deactivated => { + // Regardless of events, the timeout has expired. Force the disconnect. + true + } + _ => false, }; match self.keep_alive {