From 1b497e2e24269c73e8db5f2c3ccd3cd5b78efe93 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sun, 15 Sep 2019 09:32:27 -0400 Subject: [PATCH] Gracefully shutdown the websocket server --- beacon_node/client/src/lib.rs | 19 +++++++--- beacon_node/websocket_server/src/lib.rs | 48 ++++++++++++++++++++----- 2 files changed, 54 insertions(+), 13 deletions(-) diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index b4c7c9347..73b0e5aed 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -64,6 +64,8 @@ where pub slot_timer_exit_signal: Option, /// Signal to terminate the API pub api_exit_signal: Option, + /// Signal to terminate the websocket server + pub websocket_exit_signal: Option, /// The clients logger. log: slog::Logger, /* @@ -182,11 +184,17 @@ where InteropEth1ChainBackend::new(String::new()).map_err(|e| format!("{:?}", e))?; // Start the websocket server. - let websocket_sender: WebSocketSender = if client_config.websocket_server.enabled { - websocket_server::start_server(&client_config.websocket_server, &log)? - } else { - WebSocketSender::dummy() - }; + let (websocket_sender, websocket_exit_signal): (WebSocketSender, Option<_>) = + if client_config.websocket_server.enabled { + let (sender, exit) = websocket_server::start_server( + &client_config.websocket_server, + executor, + &log, + )?; + (sender, Some(exit)) + } else { + (WebSocketSender::dummy(), None) + }; let beacon_chain: Arc>> = Arc::new( beacon_chain_builder @@ -278,6 +286,7 @@ where rpc_exit_signal, slot_timer_exit_signal: Some(slot_timer_exit_signal), api_exit_signal, + websocket_exit_signal, log, network, }) diff --git a/beacon_node/websocket_server/src/lib.rs b/beacon_node/websocket_server/src/lib.rs index ad9cabf4a..c161224c7 100644 --- a/beacon_node/websocket_server/src/lib.rs +++ b/beacon_node/websocket_server/src/lib.rs @@ -1,7 +1,9 @@ use beacon_chain::events::{EventHandler, EventKind}; -use slog::{error, info, Logger}; +use futures::Future; +use slog::{debug, error, info, warn, Logger}; use std::marker::PhantomData; use std::thread; +use tokio::runtime::TaskExecutor; use types::EthSpec; use ws::{Sender, WebSocket}; @@ -45,8 +47,9 @@ impl EventHandler for WebSocketSender { pub fn start_server( config: &Config, + executor: &TaskExecutor, log: &Logger, -) -> Result, String> { +) -> Result<(WebSocketSender, exit_future::Signal), String> { let server_string = format!("{}:{}", config.listen_address, config.port); info!( @@ -61,12 +64,38 @@ pub fn start_server( let broadcaster = server.broadcaster(); + // Produce a signal/channel that can gracefully shutdown the websocket server. + let exit_signal = { + let (exit_signal, exit) = exit_future::signal(); + + let log_inner = log.clone(); + let broadcaster_inner = server.broadcaster(); + let exit_future = exit.and_then(move |_| { + if let Err(e) = broadcaster_inner.shutdown() { + warn!( + log_inner, + "Websocket server errored on shutdown"; + "error" => format!("{:?}", e) + ); + } else { + info!(log_inner, "Websocket server shutdown"); + } + Ok(()) + }); + + // Place a future on the executor that will shutdown the websocket server when the + // application exits. + executor.spawn(exit_future); + + exit_signal + }; + let log_inner = log.clone(); let _handle = thread::spawn(move || match server.listen(server_string) { Ok(_) => { - info!( + debug!( log_inner, - "Websocket server stopped"; + "Websocket server thread stopped"; ); } Err(e) => { @@ -78,8 +107,11 @@ pub fn start_server( } }); - Ok(WebSocketSender { - sender: Some(broadcaster), - _phantom: PhantomData, - }) + Ok(( + WebSocketSender { + sender: Some(broadcaster), + _phantom: PhantomData, + }, + exit_signal, + )) }