Deactivate RPC Connection Handler after goodbye message is sent (#5250)

* Deactivate RPC Connection Handler

after goodbye message is sent

* nit: use to_string instead of format

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into rpc-shutdown-improvement

* clippy

* Fix cargo.lock

* Merge latest unstable
This commit is contained in:
João Oliveira 2024-02-19 07:16:01 +00:00 committed by GitHub
parent 4d625951b8
commit a229b52723
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 216 additions and 211 deletions

378
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -24,7 +24,7 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::time::{sleep_until, Instant as TInstant, Sleep}; use tokio::time::{sleep, Sleep};
use tokio_util::time::{delay_queue, DelayQueue}; use tokio_util::time::{delay_queue, DelayQueue};
use types::{EthSpec, ForkContext}; use types::{EthSpec, ForkContext};
@ -32,7 +32,7 @@ use types::{EthSpec, ForkContext};
const IO_ERROR_RETRIES: u8 = 3; const IO_ERROR_RETRIES: u8 = 3;
/// Maximum time given to the handler to perform shutdown operations. /// Maximum time given to the handler to perform shutdown operations.
const SHUTDOWN_TIMEOUT_SECS: u8 = 15; const SHUTDOWN_TIMEOUT_SECS: u64 = 15;
/// Maximum number of simultaneous inbound substreams we keep for this peer. /// Maximum number of simultaneous inbound substreams we keep for this peer.
const MAX_INBOUND_SUBSTREAMS: usize = 32; const MAX_INBOUND_SUBSTREAMS: usize = 32;
@ -266,9 +266,9 @@ where
self.dial_queue.push((id, OutboundRequest::Goodbye(reason))); self.dial_queue.push((id, OutboundRequest::Goodbye(reason)));
} }
self.state = HandlerState::ShuttingDown(Box::pin(sleep_until( self.state = HandlerState::ShuttingDown(Box::pin(sleep(Duration::from_secs(
TInstant::now() + Duration::from_secs(SHUTDOWN_TIMEOUT_SECS as u64), SHUTDOWN_TIMEOUT_SECS,
))); ))));
} }
} }
@ -349,23 +349,7 @@ where
} }
fn connection_keep_alive(&self) -> bool { fn connection_keep_alive(&self) -> bool {
// Check that we don't have outbound items pending for dialing, nor dialing, nor !matches!(self.state, HandlerState::Deactivated)
// established. Also check that there are no established inbound substreams.
// Errors and events need to be reported back, so check those too.
match self.state {
HandlerState::ShuttingDown(_) => {
!self.dial_queue.is_empty()
|| !self.outbound_substreams.is_empty()
|| !self.inbound_substreams.is_empty()
|| !self.events_out.is_empty()
|| !self.dial_negotiated != 0
}
HandlerState::Deactivated => {
// Regardless of events, the timeout has expired. Force the disconnect.
false
}
_ => true,
}
} }
fn poll( fn poll(
@ -395,7 +379,7 @@ where
match delay.as_mut().poll(cx) { match delay.as_mut().poll(cx) {
Poll::Ready(_) => { Poll::Ready(_) => {
self.state = HandlerState::Deactivated; self.state = HandlerState::Deactivated;
debug!(self.log, "Handler deactivated"); debug!(self.log, "Shutdown timeout elapsed, Handler deactivated");
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Close(RPCError::Disconnected), HandlerEvent::Close(RPCError::Disconnected),
)); ));
@ -844,6 +828,8 @@ where
&& self.events_out.is_empty() && self.events_out.is_empty()
&& self.dial_negotiated == 0 && self.dial_negotiated == 0
{ {
debug!(self.log, "Goodbye sent, Handler deactivated");
self.state = HandlerState::Deactivated;
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Close(RPCError::Disconnected), HandlerEvent::Close(RPCError::Disconnected),
)); ));

View File

@ -223,7 +223,7 @@ where
fn handle_established_inbound_connection( fn handle_established_inbound_connection(
&mut self, &mut self,
_connection_id: ConnectionId, connection_id: ConnectionId,
peer_id: PeerId, peer_id: PeerId,
_local_addr: &libp2p::Multiaddr, _local_addr: &libp2p::Multiaddr,
_remote_addr: &libp2p::Multiaddr, _remote_addr: &libp2p::Multiaddr,
@ -238,9 +238,9 @@ where
}, },
(), (),
); );
// NOTE: this is needed because PeerIds have interior mutability. let log = self
let peer_repr = peer_id.to_string(); .log
let log = self.log.new(slog::o!("peer_id" => peer_repr)); .new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string()));
let handler = RPCHandler::new( let handler = RPCHandler::new(
protocol, protocol,
self.fork_context.clone(), self.fork_context.clone(),
@ -253,7 +253,7 @@ where
fn handle_established_outbound_connection( fn handle_established_outbound_connection(
&mut self, &mut self,
_connection_id: ConnectionId, connection_id: ConnectionId,
peer_id: PeerId, peer_id: PeerId,
_addr: &libp2p::Multiaddr, _addr: &libp2p::Multiaddr,
_role_override: libp2p::core::Endpoint, _role_override: libp2p::core::Endpoint,
@ -269,9 +269,10 @@ where
(), (),
); );
// NOTE: this is needed because PeerIds have interior mutability. let log = self
let peer_repr = peer_id.to_string(); .log
let log = self.log.new(slog::o!("peer_id" => peer_repr)); .new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string()));
let handler = RPCHandler::new( let handler = RPCHandler::new(
protocol, protocol,
self.fork_context.clone(), self.fork_context.clone(),