net: Stop processing websocket events on H2 GOAWAY
This commit is contained in:
parent
29adeae754
commit
bbc16886ca
@ -58,6 +58,7 @@ pub struct Http2Client<B> {
|
||||
authority: http::uri::Authority,
|
||||
path_prefix: Option<http::uri::PathAndQuery>,
|
||||
default_per_request_headers: Arc<http::HeaderMap>,
|
||||
h2_connection_cancellation_token: tokio_util::sync::CancellationToken,
|
||||
}
|
||||
|
||||
impl<B: hyper::body::Body + 'static> Http2Client<B> {
|
||||
@ -116,6 +117,16 @@ impl<B: hyper::body::Body + 'static> Http2Client<B> {
|
||||
pub fn disconnect_all(self) {
|
||||
self.cancellation_token.cancel();
|
||||
}
|
||||
|
||||
/// Awaits a graceful shutdown of the H2 connection.
|
||||
///
|
||||
/// This is not a full disconnection, but no new requests will be accepted after this future
|
||||
/// completes.
|
||||
pub fn wait_for_h2_shutdown(&self) -> impl Future<Output = ()> + Send + 'static {
|
||||
self.h2_connection_cancellation_token
|
||||
.clone()
|
||||
.cancelled_owned()
|
||||
}
|
||||
}
|
||||
|
||||
/// Copy all headers from `default_headers` into `headers` *except* for those already present.
|
||||
@ -390,7 +401,11 @@ where
|
||||
// or if all clients are dropped.
|
||||
let log_tag = log_tag.to_owned();
|
||||
let ip_version = info.ip_version();
|
||||
tokio::spawn(async move {
|
||||
let h2_connection_cancellation_token = tokio_util::sync::CancellationToken::new();
|
||||
let h2_connection_cancellation_token_guard =
|
||||
h2_connection_cancellation_token.clone().drop_guard();
|
||||
_ = tokio::spawn(async move {
|
||||
let _guard = h2_connection_cancellation_token_guard;
|
||||
match connection.await {
|
||||
Ok(_) => log::info!("[{log_tag}] HTTP2 connection [{ip_version}] closed"),
|
||||
Err(err) => {
|
||||
@ -416,6 +431,7 @@ where
|
||||
authority,
|
||||
path_prefix,
|
||||
default_per_request_headers: Default::default(),
|
||||
h2_connection_cancellation_token,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -604,7 +604,7 @@ mod test {
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use futures_util::future::BoxFuture;
|
||||
use futures_util::{SinkExt as _, StreamExt as _};
|
||||
use futures_util::{FutureExt as _, SinkExt as _, StreamExt as _};
|
||||
use http::uri::PathAndQuery;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_boring_signal::SslStream;
|
||||
@ -1120,6 +1120,10 @@ mod test {
|
||||
.expect("can send frame");
|
||||
|
||||
conn.ready().await.expect("ready");
|
||||
assert!(
|
||||
conn.wait_for_h2_shutdown().now_or_never().is_none(),
|
||||
"not shut down yet"
|
||||
);
|
||||
|
||||
// Signal shutdown, and yield to the server task to make sure it gets acknowledged.
|
||||
shutdown_token.cancel();
|
||||
@ -1141,6 +1145,10 @@ mod test {
|
||||
// By now we've definitely learned of the H2 shutdown...but the websocket should still be
|
||||
// open.
|
||||
conn.ready().await.expect_err("no longer ready");
|
||||
assert!(
|
||||
conn.wait_for_h2_shutdown().now_or_never().is_some(),
|
||||
"shut down by now"
|
||||
);
|
||||
ws.stream
|
||||
.send(tungstenite::Message::Binary(bytes::Bytes::new()))
|
||||
.await
|
||||
|
||||
@ -398,12 +398,18 @@ impl Chat {
|
||||
tokio_stream::wrappers::WatchStream::from_changes(network_change_event)
|
||||
.chain(futures_util::stream::pending());
|
||||
|
||||
let h2_shutdown_rx = shared_h2_connection.as_ref().map_or_else(
|
||||
|| Box::pin(std::future::pending()) as futures_util::future::BoxFuture<'static, ()>,
|
||||
|c| Box::pin(c.wait_for_h2_shutdown()),
|
||||
);
|
||||
|
||||
let connection = ConnectionImpl {
|
||||
inner: inner_connection,
|
||||
requests_in_flight,
|
||||
network_change_event,
|
||||
config: connection_config,
|
||||
outgoing_request_tx: request_tx.downgrade(),
|
||||
h2_shutdown_rx,
|
||||
};
|
||||
|
||||
let task = tokio_runtime.spawn(spawned_task_body(
|
||||
@ -615,6 +621,7 @@ struct ConnectionImpl<I, GCI> {
|
||||
futures_util::stream::Pending<()>,
|
||||
>,
|
||||
outgoing_request_tx: WeakSender<OutgoingRequest>,
|
||||
h2_shutdown_rx: futures_util::future::BoxFuture<'static, ()>,
|
||||
config: ConnectionConfig<GCI>,
|
||||
}
|
||||
|
||||
@ -1005,6 +1012,7 @@ impl<I: InnerConnection, GCI: GetCurrentInterface<Representation = IpAddr>> Conn
|
||||
network_change_event,
|
||||
config,
|
||||
outgoing_request_tx,
|
||||
mut h2_shutdown_rx,
|
||||
} = self.project();
|
||||
|
||||
let mut event_fut = std::pin::pin!(inner.as_mut().handle_next_event());
|
||||
@ -1045,6 +1053,11 @@ impl<I: InnerConnection, GCI: GetCurrentInterface<Representation = IpAddr>> Conn
|
||||
|
||||
tokio::select! {
|
||||
inner_event = &mut event_fut => break inner_event,
|
||||
() = &mut h2_shutdown_rx => {
|
||||
// Treat H2 graceful shutdown as a connection failure immediately even if the
|
||||
// websocket is still open.
|
||||
break Outcome::Finished(Err(NextEventError::UnexpectedConnectionClose))
|
||||
}
|
||||
interruption = interruption_fut => match Self::handle_interruption(
|
||||
config,
|
||||
requests_in_flight,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user