Add support for TCP+TLS to backend

This commit is contained in:
Richard Russo 2024-09-05 12:53:00 -07:00 committed by GitHub
parent db1335e150
commit 5221b5e1e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 683 additions and 273 deletions

196
Cargo.lock generated
View File

@ -245,6 +245,33 @@ dependencies = [
"zeroize",
]
[[package]]
name = "aws-lc-rs"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ae74d9bd0a7530e8afd1770739ad34b36838829d6ad61818f9230f683f5ad77"
dependencies = [
"aws-lc-sys",
"mirai-annotations",
"paste",
"zeroize",
]
[[package]]
name = "aws-lc-sys"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e89b6941c2d1a7045538884d6e760ccfffdf8e1ffc2613d8efa74305e1f3752"
dependencies = [
"bindgen",
"cc",
"cmake",
"dunce",
"fs_extra",
"libc",
"paste",
]
[[package]]
name = "aws-runtime"
version = "1.3.1"
@ -653,6 +680,29 @@ dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.69.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0"
dependencies = [
"bitflags 2.6.0",
"cexpr",
"clang-sys",
"itertools",
"lazy_static",
"lazycell",
"log",
"prettyplease",
"proc-macro2",
"quote",
"regex",
"rustc-hash 1.1.0",
"shlex",
"syn",
"which",
]
[[package]]
name = "bitflags"
version = "1.3.2"
@ -770,6 +820,8 @@ dependencies = [
"rand_core",
"rand_distr",
"reqwest",
"rustls 0.23.12",
"rustls-pemfile 2.1.3",
"scopeguard",
"serde",
"serde_json",
@ -865,6 +917,19 @@ name = "cc"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052"
dependencies = [
"jobserver",
"libc",
]
[[package]]
name = "cexpr"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
]
[[package]]
name = "cfg-if"
@ -896,6 +961,17 @@ dependencies = [
"zeroize",
]
[[package]]
name = "clang-sys"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4"
dependencies = [
"glob",
"libc",
"libloading",
]
[[package]]
name = "clap"
version = "4.5.9"
@ -936,6 +1012,15 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70"
[[package]]
name = "cmake"
version = "0.1.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130"
dependencies = [
"cc",
]
[[package]]
name = "colorchoice"
version = "1.0.1"
@ -1166,6 +1251,12 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
[[package]]
name = "dunce"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b"
[[package]]
name = "either"
version = "1.13.0"
@ -1250,6 +1341,12 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "fs_extra"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "futures"
version = "0.3.30"
@ -1377,6 +1474,12 @@ version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd"
[[package]]
name = "glob"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "h2"
version = "0.3.26"
@ -1496,6 +1599,15 @@ dependencies = [
"digest",
]
[[package]]
name = "home"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5"
dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "http"
version = "0.2.12"
@ -1767,6 +1879,15 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "jobserver"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.69"
@ -1782,12 +1903,28 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
[[package]]
name = "libloading"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets 0.52.6",
]
[[package]]
name = "libm"
version = "0.2.8"
@ -1861,6 +1998,12 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.7.4"
@ -1881,6 +2024,12 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "mirai-annotations"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1"
[[package]]
name = "mock_instant"
version = "0.3.2"
@ -1954,6 +2103,16 @@ dependencies = [
"pin-utils",
]
[[package]]
name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
name = "num-conv"
version = "0.1.0"
@ -2086,6 +2245,12 @@ dependencies = [
"syn",
]
[[package]]
name = "paste"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
[[package]]
name = "percent-encoding"
version = "2.3.1"
@ -2301,7 +2466,7 @@ dependencies = [
"pin-project-lite",
"quinn-proto",
"quinn-udp",
"rustc-hash",
"rustc-hash 2.0.0",
"rustls 0.23.12",
"socket2",
"thiserror",
@ -2318,7 +2483,7 @@ dependencies = [
"bytes",
"rand",
"ring",
"rustc-hash",
"rustc-hash 2.0.0",
"rustls 0.23.12",
"slab",
"thiserror",
@ -2515,6 +2680,12 @@ version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustc-hash"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc-hash"
version = "2.0.0"
@ -2561,6 +2732,8 @@ version = "0.23.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044"
dependencies = [
"aws-lc-rs",
"log",
"once_cell",
"ring",
"rustls-pki-types",
@ -2622,6 +2795,7 @@ version = "0.102.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e"
dependencies = [
"aws-lc-rs",
"ring",
"rustls-pki-types",
"untrusted",
@ -2820,6 +2994,12 @@ dependencies = [
"digest",
]
[[package]]
name = "shlex"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-crypto"
version = "0.1.0"
@ -3369,6 +3549,18 @@ dependencies = [
"rustls-pki-types",
]
[[package]]
name = "which"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
dependencies = [
"either",
"home",
"once_cell",
"rustix",
]
[[package]]
name = "windows-core"
version = "0.52.0"

View File

@ -84,6 +84,10 @@ unique_id = "0.1.5"
# For current process memory stats
psutil = { version = "3.3.0", default-features = false, features = ["cpu", "process"] }
# For TLS sockets in epoll backend
rustls = "0.23.12"
rustls-pemfile = "2.1.3"
[target.'cfg(target_os="linux")'.dependencies]
# Only currently needed for metrics (won't build on mac, see https://github.com/rust-lang/rust/issues/115149).
accounting-allocator = "0.2.0"

View File

@ -49,10 +49,17 @@ COPY --from=build-stage /usr/src/calling-service/target/release/calling_backend
# Update system packages.
RUN apt-get update \
&& apt-get upgrade -y \
# Install ca certificates
&& apt-get install -y --no-install-recommends --no-install-suggests \
ca-certificates \
&& update-ca-certificates \
# Install curl for ip detection.
&& apt-get install -y --no-install-recommends --no-install-suggests curl \
# Install jq for parsing gcp metadata.
&& apt-get install -y --no-install-recommends --no-install-suggests jq \
# make a directory for certificate files
&& mkdir /etc/calling_server \
&& chown -R nobody:nogroup /etc/calling_server \
# Allow non-root using privileged ports.
&& apt-get install -y --no-install-recommends --no-install-suggests libcap2-bin \
&& setcap CAP_NET_BIND_SERVICE=+ep /usr/local/bin/calling_backend \

View File

@ -7,6 +7,15 @@
#
#
CERT_PATH="/etc/calling_server/cert.pem"
PKEY_PATH="/etc/calling_server/pkey.pem"
TOKEN="$(curl -Ss "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token" -H "Metadata-Flavor: Google" | jq '.access_token')"
curl -Ss "https://secretmanager.googleapis.com/v1/projects/$SECRET_PROJECT/secrets/$CERT_NAME/versions/latest:access" -H "Metadata-Flavor: Google" -H "authorization: Bearer $TOKEN" | jq -r '.payload.data' | base64 --decode > $CERT_PATH
curl -Ss "https://secretmanager.googleapis.com/v1/projects/$SECRET_PROJECT/secrets/$PRIVATE_KEY_NAME/versions/latest:access" -H "Metadata-Flavor: Google" -H "authorization: Bearer $TOKEN" | jq -r '.payload.data' | base64 --decode > $PKEY_PATH
if [[ -z "${EXTERNAL_IP}" ]]; then
EXTERNAL_IP="$(curl -Ss "http://metadata.google.internal/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip" -H "Metadata-Flavor: Google")"
if [[ -z "${EXTERNAL_IP}" ]]; then
@ -36,4 +45,6 @@ calling_backend \
--ice-candidate-ip "$EXTERNAL_IP" \
"${IPV6_ICE[@]}" \
--signaling-ip "$INTERNAL_IP" \
--certificate-file-path "$CERT_PATH" \
--key-file-path "$PKEY_PATH" \
"$@"

View File

@ -34,6 +34,11 @@ pub struct Config {
#[arg(long, default_value = "10000")]
pub ice_candidate_port_tcp: u16,
/// The port to use for ICE candidates when connected over TCP+TLS. Clients
/// will connect to the calling backend using this port.
#[arg(long)]
pub ice_candidate_port_tls: Option<u16>,
/// The IP address to share for direct access to the signaling_server. If
/// defined, then the signaling_server will be used, otherwise the
/// http_server will be used for testing.
@ -128,6 +133,18 @@ pub struct Config {
#[arg(long)]
pub persist_approval_for_all_users_who_join: bool,
/// The path to the certificate file for TLS
#[arg(long)]
pub certificate_file_path: Option<String>,
/// The path to the private key file for TLS
#[arg(long)]
pub key_file_path: Option<String>,
// The hostname to give the client to validate the certificate used for TLS
#[arg(long)]
pub hostname: Option<String>,
#[clap(flatten)]
pub metrics: MetricsOptions,
}
@ -151,14 +168,16 @@ pub struct MetricsOptions {
pub struct MediaPorts {
pub udp: u16,
pub tcp: u16,
pub tls: Option<u16>,
}
pub struct ServerMediaAddress {
pub addresses: Vec<IpAddr>,
pub ports: MediaPorts,
pub hostname: Option<String>,
}
/// Public address of the server for media/UDP/TCP derived from the configuration.
/// Public address of the server for media/UDP/TCP/TLS derived from the configuration.
impl ServerMediaAddress {
pub fn from(config: &'static Config) -> Self {
let addresses = if config.ice_candidate_ip.is_empty() {
@ -178,7 +197,9 @@ impl ServerMediaAddress {
ports: MediaPorts {
udp: config.ice_candidate_port,
tcp: config.ice_candidate_port_tcp,
tls: config.ice_candidate_port_tls,
},
hostname: config.hostname.clone(),
}
}
@ -215,5 +236,9 @@ pub(crate) fn default_test_config() -> Config {
persist_approval_for_all_users_who_join: false,
metrics: Default::default(),
frontend_operation_timeout_ms: 1000,
certificate_file_path: None,
key_file_path: None,
hostname: None,
ice_candidate_port_tls: None,
}
}

View File

@ -54,6 +54,8 @@ pub enum AddressType {
UdpV6,
TcpV4,
TcpV6,
TlsV4,
TlsV6,
}
/// The state of a connection to a client.
@ -250,13 +252,14 @@ impl Connection {
AddressType::UdpV4
}
}
SocketLocator::Tcp { is_ipv6, .. } => {
if is_ipv6 {
AddressType::TcpV6
} else {
AddressType::TcpV4
}
}
SocketLocator::Tcp {
is_ipv6, is_tls, ..
} => match (is_ipv6, is_tls) {
(false, false) => AddressType::TcpV4,
(false, true) => AddressType::TlsV4,
(true, false) => AddressType::TcpV6,
(true, true) => AddressType::TlsV6,
},
});
}
self.ice.binding_request_received = Some(now);

View File

@ -68,8 +68,10 @@ pub struct JoinResponse {
pub demux_id: u32,
pub port: u16,
pub port_tcp: u16,
pub port_tls: Option<u16>,
pub ip: String,
pub ips: Vec<String>,
pub hostname: Option<String>,
pub ice_ufrag: String,
pub ice_pwd: String,
pub dhe_public_key: String,
@ -360,12 +362,14 @@ async fn join_conference(
demux_id: demux_id.into(),
port: media_server.ports.udp,
port_tcp: media_server.ports.tcp,
port_tls: media_server.ports.tls,
ip: media_server.ip().to_string(),
ips: media_server
.addresses
.iter()
.map(|ip| ip.to_string())
.collect(),
hostname: media_server.hostname,
ice_ufrag: server_ice_ufrag,
ice_pwd: server_ice_pwd,
dhe_public_key: server_dhe_public_key.encode_hex(),

View File

@ -6,7 +6,11 @@
#[macro_use]
extern crate log;
use std::sync::{atomic::AtomicBool, Arc};
use std::{
fs::File,
io::BufReader,
sync::{atomic::AtomicBool, Arc},
};
use anyhow::Result;
use calling_backend::{
@ -17,6 +21,7 @@ use clap::Parser;
use env_logger::Env;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use rustls::{server::NoServerSessionStorage, version::TLS13, ServerConfig};
use tokio::{
runtime,
signal::unix::{signal, SignalKind},
@ -48,6 +53,12 @@ fn print_config(config: &'static config::Config) {
Some(host) => host,
None => "Off",
});
if config.ice_candidate_port_tls.is_some() {
info!(" {:38}{:?}", "ice_candidate_port_tls:", config.ice_candidate_port_tls);
info!(" {:38}{:?}", "hostname:", config.hostname);
info!(" {:38}{:?}", "certificate_file_path:", config.certificate_file_path);
info!(" {:38}{:?}", "key_file_path:", config.key_file_path);
}
}
/// Waits for a SIGINT or SIGTERM signal and returns. Can be cancelled
@ -110,6 +121,48 @@ fn main() -> Result<()> {
let config = &CONFIG;
print_config(config);
let tls_config = if config.ice_candidate_port_tls.is_some()
&& config.hostname.is_some()
&& config.certificate_file_path.is_some()
&& config.key_file_path.is_some()
{
let certificates = rustls_pemfile::certs(&mut BufReader::new(&mut File::open(
config
.certificate_file_path
.as_ref()
.expect("must have a certificate file path"),
)?))
.collect::<Result<Vec<_>, _>>()?;
let private_key = rustls_pemfile::private_key(&mut BufReader::new(&mut File::open(
config
.key_file_path
.as_ref()
.expect("must have a key file path"),
)?))?;
let mut tls_config = ServerConfig::builder_with_provider(Arc::new(
rustls::crypto::aws_lc_rs::default_provider(),
))
.with_protocol_versions(&[&TLS13])?
.with_no_client_auth()
.with_single_cert(certificates, private_key.expect("must have a private key"))?;
// Explicitly disable TLS sessions and tickets, WebRTC does not use them, so don't waste bandwidth
tls_config.session_storage = Arc::new(NoServerSessionStorage {});
tls_config.max_early_data_size = 0;
tls_config.send_tls13_tickets = 0;
Some(Arc::new(tls_config))
} else {
if config.ice_candidate_port_tls.is_some()
|| config.hostname.is_some()
|| config.certificate_file_path.is_some()
|| config.key_file_path.is_some()
{
panic!("For TLS, all values must be set: ice-candidate-port-tls, hostname, certificate-file-path, key-file-path");
}
None
};
// Create the shared SFU context.
let sfu: Arc<Mutex<Sfu>> = Arc::new(Mutex::new(Sfu::new(Instant::now(), config)?));
@ -118,19 +171,19 @@ fn main() -> Result<()> {
let threaded_rt = runtime::Runtime::new()?;
let (signaling_ender_tx, signaling_ender_rx) = oneshot::channel();
let (udp_ender_tx, udp_ender_rx) = oneshot::channel();
let (packet_ender_tx, packet_ender_rx) = oneshot::channel();
let (metrics_ender_tx, metrics_ender_rx) = oneshot::channel();
let (call_lifecycle_ender_tx, call_lifecycle_ender_rx) = oneshot::channel();
let (signal_canceller_tx, signal_canceller_rx) = mpsc::channel(1);
let is_healthy = Arc::new(AtomicBool::new(true));
let sfu_clone_for_udp = sfu.clone();
let sfu_clone_for_packet = sfu.clone();
let sfu_clone_for_metrics = sfu.clone();
let sfu_clone_for_call_lifecycle = sfu.clone();
let signal_canceller_tx_clone_for_udp = signal_canceller_tx.clone();
let signal_canceller_tx_clone_for_packet = signal_canceller_tx.clone();
let signal_canceller_tx_clone_for_metrics = signal_canceller_tx.clone();
let signal_canceller_tx_clone_for_call_lifecycle = signal_canceller_tx.clone();
let is_healthy_clone_for_udp = is_healthy.clone();
let is_healthy_clone_for_packet = is_healthy.clone();
threaded_rt.block_on(async {
// Start the signaling server, either the signaling_server for production
@ -152,15 +205,16 @@ fn main() -> Result<()> {
let packet_server_handle = tokio::spawn(async move {
if let Err(err) = packet_server::start(
config,
sfu_clone_for_udp,
udp_ender_rx,
is_healthy_clone_for_udp,
tls_config,
sfu_clone_for_packet,
packet_ender_rx,
is_healthy_clone_for_packet,
)
.await
{
error!("udp server shutdown {:?}", err);
error!("packet server shutdown {:?}", err);
}
let _ = signal_canceller_tx_clone_for_udp.send(()).await;
let _ = signal_canceller_tx_clone_for_packet.send(()).await;
});
// Start the metrics_server.
@ -186,7 +240,7 @@ fn main() -> Result<()> {
// Gracefully exit the servers if needed.
let _ = signaling_ender_tx.send(());
let _ = udp_ender_tx.send(());
let _ = packet_ender_tx.send(());
let _ = metrics_ender_tx.send(());
let _ = call_lifecycle_ender_tx.send(());

View File

@ -18,6 +18,7 @@ use std::{
use anyhow::Result;
use log::*;
use parking_lot::Mutex;
use rustls::ServerConfig;
use tokio::sync::oneshot::Receiver;
#[cfg(all(feature = "epoll", target_os = "linux"))]
@ -41,20 +42,29 @@ use crate::{
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub enum SocketLocator {
Udp(SocketAddr),
Tcp { id: i64, is_ipv6: bool },
Tcp {
id: i64,
is_ipv6: bool,
is_tls: bool,
},
}
impl fmt::Display for SocketLocator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SocketLocator::Udp(a) => write!(f, "U{}", a),
SocketLocator::Tcp { id, is_ipv6 } => write!(f, "T{}-{}", id, is_ipv6),
SocketLocator::Tcp {
id,
is_ipv6,
is_tls,
} => write!(f, "T{}-{}-{}", id, is_ipv6, is_tls),
}
}
}
pub async fn start(
config: &'static config::Config,
tls_config: Option<Arc<ServerConfig>>,
sfu: Arc<Mutex<Sfu>>,
packet_ender_rx: Receiver<()>,
is_healthy: Arc<AtomicBool>,
@ -65,9 +75,18 @@ pub async fn start(
let local_addr_udp = SocketAddr::new(config.binding_ip, config.ice_candidate_port);
let local_addr_tcp = SocketAddr::new(config.binding_ip, config.ice_candidate_port_tcp);
let local_addr_tls = config
.ice_candidate_port_tls
.map(|tls_port| SocketAddr::new(config.binding_ip, tls_port));
let packet_handler_state =
PacketServerState::new(local_addr_udp, local_addr_tcp, num_threads, tick_interval)?;
let packet_handler_state = PacketServerState::new(
local_addr_udp,
local_addr_tcp,
local_addr_tls,
tls_config,
num_threads,
tick_interval,
)?;
let packet_handler_state_for_tick = packet_handler_state.clone();
let packet_handler_state_for_stats = packet_handler_state.clone();

View File

@ -29,6 +29,7 @@ use nix::{
sys::{epoll::*, timerfd::*},
};
use parking_lot::{Mutex, RwLock};
use rustls::{ServerConfig, ServerConnection};
use scopeguard::ScopeGuard;
use unique_id::sequence::SequenceGenerator;
use unique_id::Generator;
@ -87,6 +88,8 @@ pub struct PacketServerState {
local_addr_udp: SocketAddr,
new_client_socket: Socket,
new_tcp_socket: TcpListener,
new_tls_socket: Option<TcpListener>,
tls_config: Option<Arc<ServerConfig>>,
all_epoll_fds: Vec<RawFd>,
all_connections: RwLock<ConnectionMap>,
tick_interval: Duration,
@ -102,11 +105,20 @@ impl PacketServerState {
pub fn new(
local_addr_udp: SocketAddr,
local_addr_tcp: SocketAddr,
local_addr_tls: Option<SocketAddr>,
tls_config: Option<Arc<ServerConfig>>,
num_threads: usize,
tick_interval: Duration,
) -> Result<Arc<Self>> {
let new_client_socket = Socket::Udp(Self::open_socket_with_reusable_port(&local_addr_udp)?);
let new_tcp_socket = Self::open_listen_socket(&local_addr_tcp)?;
let new_tls_socket = if let Some(local_addr_tls) = local_addr_tls {
Some(Self::open_listen_socket(&local_addr_tls)?)
} else {
None
};
let all_epoll_fds = (0..num_threads)
.map(|_| epoll_create1(EpollCreateFlags::empty()))
.collect::<nix::Result<_>>()?;
@ -116,6 +128,8 @@ impl PacketServerState {
local_addr_udp,
new_client_socket,
new_tcp_socket,
new_tls_socket,
tls_config,
all_epoll_fds,
all_connections: RwLock::new(ConnectionMap::new()),
tick_interval,
@ -125,6 +139,9 @@ impl PacketServerState {
};
result.add_socket_to_poll_for_reads(&result.new_client_socket)?;
result.add_socket_to_poll_for_reads(&result.new_tcp_socket)?;
if let Some(listen_socket) = &result.new_tls_socket {
result.add_socket_to_poll_for_reads(listen_socket)?;
}
Ok(Arc::new(result))
}
@ -259,7 +276,8 @@ impl PacketServerState {
fn run(self: Arc<Self>, epoll_fd: RawFd, sfu: &Arc<Mutex<Sfu>>) {
let new_client_socket_fd = self.new_client_socket.as_raw_fd();
let new_tcp_socket_fd = self.new_tcp_socket.as_raw_fd();
let mut buf = [0u8; MAX_RTP_LENGTH];
let new_tls_socket_fd = self.new_tls_socket.as_ref().map_or(-1, |s| s.as_raw_fd());
let mut bufs = vec![PacketBuffer::new()];
let mut poll_timeout_ms = EPOLL_WAIT_TIMEOUT_MS;
let (timer, timer_fd) = match TimerFd::new(ClockId::CLOCK_MONOTONIC, TimerFlags::empty()) {
@ -304,10 +322,27 @@ impl PacketServerState {
let connections_lock = self.all_connections.read();
let socket = if socket_fd == new_client_socket_fd {
&self.new_client_socket
} else if socket_fd == new_tcp_socket_fd {
} else if socket_fd == new_tcp_socket_fd || socket_fd == new_tls_socket_fd {
drop(connections_lock);
match self.new_tcp_socket.accept() {
let (accepted, tls_config) = if socket_fd == new_tcp_socket_fd {
(self.new_tcp_socket.accept(), None)
} else {
(
self.new_tls_socket
.as_ref()
.expect("tls socket must exist if we got an event on it")
.accept(),
Some(
self.tls_config
.as_ref()
.expect("tls config must exist if we got a tls accept")
.clone(),
),
)
};
match accepted {
Ok((client_socket, addr)) => {
// TODO: explore TCP_CORK instead of/in addition to TCP_NODELAY
let _ = client_socket.set_nodelay(true); // fail quietly
@ -320,16 +355,22 @@ impl PacketServerState {
V6(addr) => addr.to_ipv4_mapped().is_none(),
};
let id = self.tcp_id_generator.next_id();
let client_socket = Socket::new_tcp(client_socket, id, is_ipv6);
let mut write_lock = self.all_connections.write();
if self.add_socket_to_poll_for_reads(&client_socket).is_ok() {
write_lock.get_or_insert_connected(
client_socket,
SocketLocator::Tcp { id, is_ipv6 },
Some(self.tick_number.load(AtomicOrdering::Relaxed)),
);
let is_tls = tls_config.is_some();
if let Ok(client_socket) =
Socket::new_tcp(client_socket, id, is_ipv6, tls_config)
{
let mut write_lock = self.all_connections.write();
if self.add_socket_to_poll_for_reads(&client_socket).is_ok() {
write_lock.get_or_insert_connected(
client_socket,
SocketLocator::Tcp {
id,
is_ipv6,
is_tls,
},
Some(self.tick_number.load(AtomicOrdering::Relaxed)),
);
}
}
}
Err(err) => match err.kind() {
@ -427,54 +468,79 @@ impl PacketServerState {
//
// Note that this relies on using epoll in level-triggered mode rather than
// edge-triggered.
let (size, sender_addr) = match socket.recv_from(&mut buf) {
Err(err) => {
match err.kind() {
io::ErrorKind::TimedOut
| io::ErrorKind::WouldBlock
| io::ErrorKind::Interrupted => {}
io::ErrorKind::ConnectionRefused => {
// This can happen when someone leaves a call
// because e.g. their router stops forwarding packets.
// This is normal with UDP; technically this error happened
// with the previous *sent* packet and we're just finding out now.
trace!("recv_from() failed: {}", err);
}
io::ErrorKind::InvalidData => {
// got invalid data, so drop the connection
if let Ok(peer_addr) = socket.peer_addr() {
// Drop the read lock...
drop(connections_lock);
// ...and connect with a write lock...
let mut write_lock = self.all_connections.write();
write_lock.mark_closed(&peer_addr, Instant::now());
//
// We loop here, to allow reading multiple RTP packets from TLS connections;
// it's possible that the TLS layer will have read all data from the socket and
// there are multiple RTP packets within that data. If we only read one RTP
// packet, the next packet will remain buffered in the TLS layer, but epoll will
// not find the socket ready for read, until a future packet arrives.
let mut index = 0;
let mut sender_addr = None;
loop {
match bufs[index].recv_from(socket) {
Err(err) => {
match err.kind() {
io::ErrorKind::TimedOut
| io::ErrorKind::WouldBlock
| io::ErrorKind::Interrupted => {}
io::ErrorKind::ConnectionRefused => {
// This can happen when someone leaves a call
// because e.g. their router stops forwarding packets.
// This is normal with UDP; technically this error happened
// with the previous *sent* packet and we're just finding out now.
trace!("recv_from() failed: {}", err);
}
continue;
}
_ => {
Self::socket_error(&err);
}
io::ErrorKind::UnexpectedEof | io::ErrorKind::InvalidData => {
// got invalid data, so drop the connection
if let Ok(peer_addr) = socket.peer_addr() {
// Drop the read lock...
drop(connections_lock);
// ...and connect with a write lock...
let mut write_lock = self.all_connections.write();
write_lock.mark_closed(&peer_addr, Instant::now());
break;
}
}
_ => {
Self::socket_error(&err);
}
};
drop(connections_lock);
break;
}
continue;
Ok(s_a) => {
sender_addr = Some(s_a);
}
};
index += 1;
if socket.has_pending_data() {
if bufs.len() <= index {
bufs.push(PacketBuffer::new());
}
} else {
drop(connections_lock);
break;
}
Ok((size, sender_addr)) => (size, sender_addr),
};
drop(connections_lock);
let HandleOutput {
packets_to_send,
dequeues_to_schedule,
} = packet_server::handle_packet(sfu, sender_addr, &mut buf[..size]);
for (buf, addr) in packets_to_send {
self.send_packet(&buf, addr)
}
if !dequeues_to_schedule.is_empty() {
let mut timer_heap = self.timer_heap.lock();
for (time, addr) in dequeues_to_schedule {
timer_heap.schedule(time, addr);
if let Some(sender_addr) = sender_addr {
for inbuf in bufs.iter_mut().take(index) {
let HandleOutput {
packets_to_send,
dequeues_to_schedule,
} = packet_server::handle_packet(sfu, sender_addr, inbuf.as_mut());
for (buf, addr) in packets_to_send {
self.send_packet(&buf, addr)
}
if !dequeues_to_schedule.is_empty() {
let mut timer_heap = self.timer_heap.lock();
for (time, addr) in dequeues_to_schedule {
timer_heap.schedule(time, addr);
}
}
}
}
}
@ -813,14 +879,41 @@ impl PacketServerState {
}
}
struct PacketBuffer {
buf: [u8; MAX_RTP_LENGTH],
size: usize,
}
impl PacketBuffer {
fn new() -> Self {
Self {
buf: [0; MAX_RTP_LENGTH],
size: 0,
}
}
fn recv_from(&mut self, socket: &Socket) -> io::Result<SocketLocator> {
self.size = 0;
socket.recv_from(&mut self.buf).map(|(size, sender_addr)| {
self.size = size;
sender_addr
})
}
fn as_mut(&mut self) -> &mut [u8] {
&mut self.buf[0..self.size]
}
}
struct TcpState {
socket: TcpStream,
stream: SocketStream,
size: usize,
pos: usize,
buf: [u8; MAX_RTP_LENGTH],
outq: VecDeque<u8>,
id: i64,
is_ipv6: bool,
is_tls: bool,
}
impl TcpState {
@ -830,7 +923,7 @@ impl TcpState {
return Err(io::Error::from(io::ErrorKind::InvalidData));
}
if self.size == 0 {
match self.socket.read(&mut self.buf[self.pos..2]) {
match self.stream.read(&mut self.buf[self.pos..2]) {
Ok(read) => {
if read == 0 {
return Err(io::Error::from(io::ErrorKind::InvalidData));
@ -862,7 +955,7 @@ impl TcpState {
event!("calling.udp.epoll.tcp_too_large");
return Err(io::Error::from(io::ErrorKind::InvalidData));
} else if self.size != 0 {
match self.socket.read(&mut self.buf[self.pos..self.size]) {
match self.stream.read(&mut self.buf[self.pos..self.size]) {
Ok(read) => {
if read == 0 {
return Err(io::Error::from(io::ErrorKind::InvalidData));
@ -878,6 +971,7 @@ impl TcpState {
SocketLocator::Tcp {
id: self.id,
is_ipv6: self.is_ipv6,
is_tls: self.is_tls,
},
));
}
@ -901,7 +995,7 @@ impl TcpState {
let mut dropped = false;
if self.outq.is_empty() {
let sent = match self
.socket
.stream
.write_vectored(&[IoSlice::new(&size), IoSlice::new(buf)])
{
Ok(sent) => sent,
@ -927,7 +1021,7 @@ impl TcpState {
}
} else {
let (a, b) = self.outq.as_slices();
let mut sent = match self.socket.write_vectored(&[
let mut sent = match self.stream.write_vectored(&[
IoSlice::new(a),
IoSlice::new(b),
IoSlice::new(&size),
@ -983,22 +1077,101 @@ impl AsRawFd for Socket {
fn as_raw_fd(&self) -> RawFd {
match self {
Socket::Udp(s) => s.as_raw_fd(),
Socket::Tcp(m) => m.lock().socket.as_raw_fd(),
Socket::Tcp(m) => m.lock().stream.as_raw_fd(),
}
}
}
enum SocketStream {
Tcp(TcpStream),
Tls(Box<(ServerConnection, TcpStream)>),
}
impl SocketStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
SocketStream::Tcp(s) => s.read(buf),
SocketStream::Tls(b) => {
let (c, s) = b.as_mut();
let was_handshaking = c.is_handshaking();
if c.wants_read() {
c.read_tls(s)?;
}
if let Err(e) = c.process_new_packets() {
// try to write any alerts generated
if c.wants_write() {
_ = c.write_tls(s);
}
return Err(io::Error::new(io::ErrorKind::InvalidData, e));
}
if was_handshaking && c.wants_write() {
// ignore any write errors during handshaking
_ = c.write_tls(s);
}
c.reader().read(buf)
}
}
}
fn write_vectored(&mut self, bufs: &[IoSlice]) -> io::Result<usize> {
match self {
SocketStream::Tcp(s) => s.write_vectored(bufs),
SocketStream::Tls(b) => {
let (c, s) = b.as_mut();
let result = c.writer().write_vectored(bufs);
// ignore result from socket write
_ = c.write_tls(s);
result
}
}
}
fn as_raw_fd(&self) -> RawFd {
match self {
SocketStream::Tcp(s) => s.as_raw_fd(),
SocketStream::Tls(b) => b.1.as_raw_fd(),
}
}
fn take_error(&self) -> io::Result<Option<io::Error>> {
match self {
SocketStream::Tcp(s) => s.take_error(),
SocketStream::Tls(b) => b.1.take_error(),
}
}
fn has_pending_data(&self) -> bool {
match self {
SocketStream::Tcp(_) => false,
// If the ServerConnection wants_read, it doesn't have any data for us to read
SocketStream::Tls(b) => !b.0.wants_read(),
}
}
}
impl Socket {
fn new_tcp(s: TcpStream, id: i64, is_ipv6: bool) -> Self {
Socket::Tcp(Box::new(Mutex::new(TcpState {
socket: s,
fn new_tcp(
s: TcpStream,
id: i64,
is_ipv6: bool,
tls_config: Option<Arc<ServerConfig>>,
) -> Result<Self> {
let (stream, is_tls) = if let Some(tls_config) = tls_config {
let connection = rustls::ServerConnection::new(tls_config)?;
(SocketStream::Tls(Box::new((connection, s))), true)
} else {
(SocketStream::Tcp(s), false)
};
Ok(Socket::Tcp(Box::new(Mutex::new(TcpState {
stream,
size: 0,
pos: 0,
buf: [0u8; MAX_RTP_LENGTH],
outq: VecDeque::new(),
id,
is_ipv6,
})))
is_tls,
}))))
}
fn send(&self, buf: &[u8]) -> io::Result<()> {
@ -1025,6 +1198,7 @@ impl Socket {
Ok(SocketLocator::Tcp {
id: state.id,
is_ipv6: state.is_ipv6,
is_tls: state.is_tls,
})
}
}
@ -1033,7 +1207,14 @@ impl Socket {
fn take_error(&self) -> io::Result<Option<io::Error>> {
match self {
Socket::Udp(s) => s.take_error(),
Socket::Tcp(m) => m.lock().socket.take_error(),
Socket::Tcp(m) => m.lock().stream.take_error(),
}
}
fn has_pending_data(&self) -> bool {
match self {
Socket::Udp(_) => false,
Socket::Tcp(m) => m.lock().stream.has_pending_data(),
}
}
}
@ -1406,6 +1587,7 @@ mod tests {
let addr = SocketLocator::Tcp {
id: 1,
is_ipv6: false,
is_tls: false,
};
// Insert
@ -1439,6 +1621,7 @@ mod tests {
let addr = SocketLocator::Tcp {
id: 2,
is_ipv6: false,
is_tls: false,
};
// Insert

View File

@ -15,6 +15,7 @@ use anyhow::Result;
use calling_common::{Duration, Instant};
use log::*;
use parking_lot::Mutex;
use rustls::ServerConfig;
use crate::{
metrics::TimingOptions,
@ -38,6 +39,8 @@ impl PacketServerState {
pub fn new(
local_addr_udp: SocketAddr,
_local_addr_tcp: SocketAddr,
_local_addr_tls: Option<SocketAddr>,
_tls_config: Option<Arc<ServerConfig>>,
num_threads: usize,
_tick_interval: Duration,
) -> Result<Arc<Self>> {

View File

@ -336,6 +336,8 @@ impl Sfu {
let mut udp_v6_connections = 0;
let mut tcp_v4_connections = 0;
let mut tcp_v6_connections = 0;
let mut tls_v4_connections = 0;
let mut tls_v6_connections = 0;
let mut connections_with_video_available = 0;
let now = Instant::now();
@ -369,6 +371,8 @@ impl Sfu {
AddressType::UdpV6 => udp_v6_connections += 1,
AddressType::TcpV4 => tcp_v4_connections += 1,
AddressType::TcpV6 => tcp_v6_connections += 1,
AddressType::TlsV4 => tls_v4_connections += 1,
AddressType::TlsV6 => tls_v6_connections += 1,
}
}
@ -425,6 +429,14 @@ impl Sfu {
"calling.sfu.connections.tcp_v6_count",
tcp_v6_connections as f32,
);
values.insert(
"calling.sfu.connections.tls_v4_count",
tls_v4_connections as f32,
);
values.insert(
"calling.sfu.connections.tls_v6_count",
tls_v6_connections as f32,
);
values.insert(
"calling.sfu.connections.video_available",
connections_with_video_available as f32,

View File

@ -112,6 +112,8 @@ pub struct JoinResponse {
pub server_ips: Vec<String>,
pub server_port: u16,
pub server_port_tcp: u16,
pub server_port_tls: Option<u16>,
pub server_hostname: Option<String>,
pub server_ice_ufrag: String,
pub server_ice_pwd: String,
pub server_dhe_public_key: String,
@ -356,6 +358,8 @@ async fn join(
.collect(),
server_port: media_server.ports.udp,
server_port_tcp: media_server.ports.tcp,
server_port_tls: media_server.ports.tls,
server_hostname: media_server.hostname,
server_ice_ufrag,
server_ice_pwd,
server_dhe_public_key,

View File

@ -66,17 +66,19 @@ pub struct JoinRequest {
pub struct JoinResponse {
pub demux_id: u32,
pub port: u16,
pub port_tcp: u16,
#[serde(skip_serializing_if = "Option::is_none")]
pub port_tcp: Option<u16>,
pub ip: String, // TODO remove once all clients use 'ips' field instead
pub port_tls: Option<u16>,
pub ips: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub hostname: Option<String>,
pub ice_ufrag: String,
pub ice_pwd: String,
pub dhe_public_key: String,
pub call_creator: String,
#[serde(rename = "conferenceId")]
pub era_id: String,
pub client_status: Option<String>,
pub client_status: String,
}
#[derive(Deserialize, Serialize, Debug)]
@ -352,8 +354,9 @@ pub async fn join(
demux_id: response.demux_id,
port: response.port,
port_tcp: response.port_tcp,
ip: response.ip,
port_tls: response.port_tls,
ips: response.ips,
hostname: response.hostname,
ice_ufrag: response.ice_ufrag,
ice_pwd: response.ice_pwd,
dhe_public_key: response.dhe_public_key,
@ -401,8 +404,7 @@ pub mod api_server_v2_tests {
const AUTH_KEY: &str = "f00f0014fe091de31827e8d686969fad65013238aadd25ef8629eb8a9e5ef69b";
const ZKPARAMS: &str = "AMJqvmQRYwEGlm0MSy6QFPIAvgOVsqRASNX1meQyCOYHJFqxO8lITPkow5kmhPrsNbu9JhVfKFwesVSKhdZaqQko3IZlJZMqP7DDw0DgTWpdnYzSt0XBWT50DM1cw1nCUXXBZUiijdaFs+JRlTKdh54M7sf43pFxyMHlS3URH50LOeR8jVQKaUHi1bDP2GR9ZXp3Ot9Fsp0pM4D/vjL5PwoOUuzNNdpIqUSFhKVrtazwuHNn9ecHMsFsN0QPzByiDA8nhKcGpdzyWUvGjEDBvpKkBtqjo8QuXWjyS3jSl2oJ/Z4Fh3o2N1YfD2aWV/K88o+TN2/j2/k+KbaIZgmiWwppLU+SYGwthxdDfZgnbaaGT/vMYX9P5JlUWSuP3xIxDzPzxBEFho67BP0Pvux+0a5nEOEVEpfRSs61MMvwNXEKZtzkO0QFbOrFYrPntyb7ToqNi66OQNyTfl/J7kqFZg2MTm3CKjHTAIvVMFAGCIamsrT9sWXOtuNeMS94xazxDA==";
pub static ACTIVE_CLIENT_STATUS: Lazy<Option<String>> =
Lazy::new(|| Some("active".to_string()));
pub static ACTIVE_CLIENT_STATUS: Lazy<String> = Lazy::new(|| "active".to_string());
pub const USER_ID_1: &str = "1111111111111111";
const USER_ID_2: &str = "2222222222222222";
@ -888,14 +890,15 @@ pub mod api_server_v2_tests {
// Result<JoinResponse, BackendError>
.returning(|_, _, _, _| {
Ok(backend::JoinResponse {
ip: "127.0.0.1".to_string(),
ips: Some(vec!["127.0.0.1".to_string()]),
ips: vec!["127.0.0.1".to_string()],
port: 8080,
port_tcp: Some(8080),
port_tcp: 8080,
ice_ufrag: BACKEND_ICE_UFRAG.to_string(),
ice_pwd: BACKEND_ICE_PWD.to_string(),
dhe_public_key: Some(BACKEND_DHE_PUBLIC_KEY.to_string()),
dhe_public_key: BACKEND_DHE_PUBLIC_KEY.to_string(),
client_status: ACTIVE_CLIENT_STATUS.clone(),
hostname: None,
port_tls: None,
})
});
@ -929,7 +932,6 @@ pub mod api_server_v2_tests {
let join_response: JoinResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(join_response.demux_id, DEMUX_ID_1);
assert_eq!(join_response.port, 8080);
assert_eq!(join_response.ip, "127.0.0.1".to_string());
assert_eq!(join_response.ips, vec!["127.0.0.1".to_string()]);
assert_eq!(join_response.ice_ufrag, BACKEND_ICE_UFRAG.to_string());
assert_eq!(join_response.ice_pwd, BACKEND_ICE_PWD.to_string());
@ -995,14 +997,15 @@ pub mod api_server_v2_tests {
// Result<JoinResponse, BackendError>
.returning(|_, _, _, _| {
Ok(backend::JoinResponse {
ip: "127.0.0.1".to_string(),
ips: Some(vec!["127.0.0.1".to_string()]),
ips: vec!["127.0.0.1".to_string()],
port: 8080,
port_tcp: Some(8080),
port_tcp: 8080,
ice_ufrag: BACKEND_ICE_UFRAG.to_string(),
ice_pwd: BACKEND_ICE_PWD.to_string(),
dhe_public_key: Some(BACKEND_DHE_PUBLIC_KEY.to_string()),
dhe_public_key: BACKEND_DHE_PUBLIC_KEY.to_string(),
client_status: ACTIVE_CLIENT_STATUS.clone(),
hostname: None,
port_tls: None,
})
});
@ -1036,114 +1039,6 @@ pub mod api_server_v2_tests {
let join_response: JoinResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(join_response.demux_id, DEMUX_ID_2);
assert_eq!(join_response.port, 8080);
assert_eq!(join_response.ip, "127.0.0.1".to_string());
assert_eq!(join_response.ips, vec!["127.0.0.1".to_string()]);
assert_eq!(join_response.ice_ufrag, BACKEND_ICE_UFRAG.to_string());
assert_eq!(join_response.ice_pwd, BACKEND_ICE_PWD.to_string());
assert_eq!(
join_response.dhe_public_key,
BACKEND_DHE_PUBLIC_KEY.to_string()
);
assert_eq!(&join_response.call_creator, USER_ID_1);
assert_eq!(&join_response.era_id, ERA_ID_1);
}
/// Invoke the "PUT /v2/conference/participants" to join in the case where there is a call and backend is older and does not return ips.
#[tokio::test]
async fn test_join_with_call_old_backend() {
let config = &CONFIG;
// Create mocked dependencies with expectations.
let storage = create_mocked_storage_for_join(&config.region, USER_ID_2);
let mut backend = Box::new(MockBackend::new());
let mut id_generator = Box::new(MockIdGenerator::new());
// Create additional expectations.
backend
.expect_select_ip()
.once()
// Result<String, BackendError>
.returning(|| Ok("127.0.0.1".to_string()));
id_generator
.expect_get_random_era_id()
.with(eq(16))
.once()
.returning(|_| ERA_ID_1.to_string());
id_generator
.expect_get_random_demux_id()
// user_id: &str
.with(eq(USER_ID_2))
.once()
// DemuxId
.returning(|_| DEMUX_ID_2.try_into().unwrap());
let expected_demux_id: DemuxId = DEMUX_ID_2.try_into().unwrap();
backend
.expect_join()
// backend_address: &BackendAddress, call_id: &str, demux_id: DemuxId, join_request: &JoinRequest,
.with(
eq(backend::Address::try_from("127.0.0.1").unwrap()),
eq(ERA_ID_1),
eq(expected_demux_id),
eq(backend::JoinRequest {
user_id: USER_ID_2.to_string(),
ice_ufrag: CLIENT_ICE_UFRAG.to_string(),
dhe_public_key: Some(CLIENT_DHE_PUBLIC_KEY.to_string()),
hkdf_extra_info: None,
region: LOCAL_REGION.to_string(),
new_clients_require_approval: false,
is_admin: false,
room_id: RoomId::from(GROUP_ID_1),
approved_users: None,
}),
)
.once()
// Result<JoinResponse, BackendError>
.returning(|_, _, _, _| {
Ok(backend::JoinResponse {
ip: "127.0.0.1".to_string(),
ips: None,
port: 8080,
port_tcp: None,
ice_ufrag: BACKEND_ICE_UFRAG.to_string(),
ice_pwd: BACKEND_ICE_PWD.to_string(),
dhe_public_key: Some(BACKEND_DHE_PUBLIC_KEY.to_string()),
client_status: ACTIVE_CLIENT_STATUS.clone(),
})
});
let frontend = create_frontend_with_id_generator(config, storage, backend, id_generator);
// Create an axum application.
let app = app(frontend);
// Create the request.
let join_request = create_join_request();
let request = Request::builder()
.method(http::Method::PUT)
.uri("/v2/conference/participants")
.header(header::USER_AGENT, "test/user/agent")
.header(header::CONTENT_TYPE, mime::APPLICATION_JSON.as_ref())
.header(
header::AUTHORIZATION,
create_authorization_header_for_user(USER_ID_2),
)
.body(Body::from(serde_json::to_vec(&join_request).unwrap()))
.unwrap();
// Submit the request.
let response = app.oneshot(request).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let join_response: JoinResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(join_response.demux_id, DEMUX_ID_2);
assert_eq!(join_response.port, 8080);
assert_eq!(join_response.ip, "127.0.0.1".to_string());
assert_eq!(join_response.ips, vec!["127.0.0.1".to_string()]);
assert_eq!(join_response.ice_ufrag, BACKEND_ICE_UFRAG.to_string());
assert_eq!(join_response.ice_pwd, BACKEND_ICE_PWD.to_string());
@ -2315,14 +2210,15 @@ pub mod api_server_v2_tests {
// Result<JoinResponse, BackendError>
.returning(|_, _, _, _| {
Ok(backend::JoinResponse {
ip: "127.0.0.1".to_string(),
ips: Some(vec!["127.0.0.1".to_string()]),
ips: vec!["127.0.0.1".to_string()],
port: 8080,
port_tcp: Some(8080),
port_tcp: 8080,
ice_ufrag: BACKEND_ICE_UFRAG.to_string(),
ice_pwd: BACKEND_ICE_PWD.to_string(),
dhe_public_key: Some(BACKEND_DHE_PUBLIC_KEY.to_string()),
dhe_public_key: BACKEND_DHE_PUBLIC_KEY.to_string(),
client_status: ACTIVE_CLIENT_STATUS.clone(),
hostname: None,
port_tls: None,
})
});
@ -2356,7 +2252,6 @@ pub mod api_server_v2_tests {
let join_response: JoinResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(join_response.demux_id, DEMUX_ID_1);
assert_eq!(join_response.port, 8080);
assert_eq!(join_response.ip, "127.0.0.1".to_string());
assert_eq!(join_response.ips, vec!["127.0.0.1".to_string()]);
assert_eq!(join_response.ice_ufrag, BACKEND_ICE_UFRAG.to_string());
assert_eq!(join_response.ice_pwd, BACKEND_ICE_PWD.to_string());
@ -2447,14 +2342,15 @@ pub mod api_server_v2_tests {
// Result<JoinResponse, BackendError>
.returning(|_, _, _, _| {
Ok(backend::JoinResponse {
ip: "127.0.0.1".to_string(),
ips: Some(vec!["127.0.0.1".to_string()]),
ips: vec!["127.0.0.1".to_string()],
port: 8080,
port_tcp: Some(8080),
port_tcp: 8080,
ice_ufrag: BACKEND_ICE_UFRAG.to_string(),
ice_pwd: BACKEND_ICE_PWD.to_string(),
dhe_public_key: Some(BACKEND_DHE_PUBLIC_KEY.to_string()),
dhe_public_key: BACKEND_DHE_PUBLIC_KEY.to_string(),
client_status: ACTIVE_CLIENT_STATUS.clone(),
hostname: None,
port_tls: None,
})
});
@ -2488,7 +2384,6 @@ pub mod api_server_v2_tests {
let join_response: JoinResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(join_response.demux_id, DEMUX_ID_1);
assert_eq!(join_response.port, 8080);
assert_eq!(join_response.ip, "127.0.0.1".to_string());
assert_eq!(join_response.ips, vec!["127.0.0.1".to_string()]);
assert_eq!(join_response.ice_ufrag, BACKEND_ICE_UFRAG.to_string());
assert_eq!(join_response.ice_pwd, BACKEND_ICE_PWD.to_string());
@ -2580,14 +2475,15 @@ pub mod api_server_v2_tests {
// Result<JoinResponse, BackendError>
.returning(|_, _, _, _| {
Ok(backend::JoinResponse {
ip: "127.0.0.1".to_string(),
ips: Some(vec!["127.0.0.1".to_string()]),
ips: vec!["127.0.0.1".to_string()],
port: 8080,
port_tcp: Some(8080),
port_tcp: 8080,
ice_ufrag: BACKEND_ICE_UFRAG.to_string(),
ice_pwd: BACKEND_ICE_PWD.to_string(),
dhe_public_key: Some(BACKEND_DHE_PUBLIC_KEY.to_string()),
dhe_public_key: BACKEND_DHE_PUBLIC_KEY.to_string(),
client_status: ACTIVE_CLIENT_STATUS.clone(),
hostname: None,
port_tls: None,
})
});
@ -2621,7 +2517,6 @@ pub mod api_server_v2_tests {
let join_response: JoinResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(join_response.demux_id, DEMUX_ID_1);
assert_eq!(join_response.port, 8080);
assert_eq!(join_response.ip, "127.0.0.1".to_string());
assert_eq!(join_response.ips, vec!["127.0.0.1".to_string()]);
assert_eq!(join_response.ice_ufrag, BACKEND_ICE_UFRAG.to_string());
assert_eq!(join_response.ice_pwd, BACKEND_ICE_PWD.to_string());
@ -2686,14 +2581,15 @@ pub mod api_server_v2_tests {
// Result<JoinResponse, BackendError>
.returning(|_, _, _, _| {
Ok(backend::JoinResponse {
ip: "127.0.0.1".to_string(),
ips: Some(vec!["127.0.0.1".to_string()]),
ips: vec!["127.0.0.1".to_string()],
port: 8080,
port_tcp: Some(8080),
port_tcp: 8080,
ice_ufrag: BACKEND_ICE_UFRAG.to_string(),
ice_pwd: BACKEND_ICE_PWD.to_string(),
dhe_public_key: Some(BACKEND_DHE_PUBLIC_KEY.to_string()),
dhe_public_key: BACKEND_DHE_PUBLIC_KEY.to_string(),
client_status: ACTIVE_CLIENT_STATUS.clone(),
hostname: None,
port_tls: None,
})
});
@ -2728,7 +2624,6 @@ pub mod api_server_v2_tests {
let join_response: JoinResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(join_response.demux_id, DEMUX_ID_2);
assert_eq!(join_response.port, 8080);
assert_eq!(join_response.ip, "127.0.0.1".to_string());
assert_eq!(join_response.ips, vec!["127.0.0.1".to_string()]);
assert_eq!(join_response.ice_ufrag, BACKEND_ICE_UFRAG.to_string());
assert_eq!(join_response.ice_pwd, BACKEND_ICE_PWD.to_string());
@ -2793,14 +2688,15 @@ pub mod api_server_v2_tests {
// Result<JoinResponse, BackendError>
.returning(|_, _, _, _| {
Ok(backend::JoinResponse {
ip: "127.0.0.1".to_string(),
ips: Some(vec!["127.0.0.1".to_string()]),
ips: vec!["127.0.0.1".to_string()],
port: 8080,
port_tcp: Some(8080),
port_tcp: 8080,
ice_ufrag: BACKEND_ICE_UFRAG.to_string(),
ice_pwd: BACKEND_ICE_PWD.to_string(),
dhe_public_key: Some(BACKEND_DHE_PUBLIC_KEY.to_string()),
dhe_public_key: BACKEND_DHE_PUBLIC_KEY.to_string(),
client_status: ACTIVE_CLIENT_STATUS.clone(),
hostname: None,
port_tls: None,
})
});
@ -2835,7 +2731,6 @@ pub mod api_server_v2_tests {
let join_response: JoinResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(join_response.demux_id, DEMUX_ID_2);
assert_eq!(join_response.port, 8080);
assert_eq!(join_response.ip, "127.0.0.1".to_string());
assert_eq!(join_response.ips, vec!["127.0.0.1".to_string()]);
assert_eq!(join_response.ice_ufrag, BACKEND_ICE_UFRAG.to_string());
assert_eq!(join_response.ice_pwd, BACKEND_ICE_PWD.to_string());
@ -3004,14 +2899,15 @@ pub mod api_server_v2_tests {
// Result<JoinResponse, BackendError>
.returning(|_, _, _, _| {
Ok(backend::JoinResponse {
ip: "127.0.0.1".to_string(),
ips: Some(vec!["127.0.0.1".to_string()]),
ips: vec!["127.0.0.1".to_string()],
port: 8080,
port_tcp: Some(8080),
port_tcp: 8080,
ice_ufrag: BACKEND_ICE_UFRAG.to_string(),
ice_pwd: BACKEND_ICE_PWD.to_string(),
dhe_public_key: Some(BACKEND_DHE_PUBLIC_KEY.to_string()),
dhe_public_key: BACKEND_DHE_PUBLIC_KEY.to_string(),
client_status: ACTIVE_CLIENT_STATUS.clone(),
hostname: None,
port_tls: None,
})
});
@ -3046,7 +2942,6 @@ pub mod api_server_v2_tests {
let join_response: JoinResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(join_response.demux_id, DEMUX_ID_2);
assert_eq!(join_response.port, 8080);
assert_eq!(join_response.ip, "127.0.0.1".to_string());
assert_eq!(join_response.ips, vec!["127.0.0.1".to_string()]);
assert_eq!(join_response.ice_ufrag, BACKEND_ICE_UFRAG.to_string());
assert_eq!(join_response.ice_pwd, BACKEND_ICE_PWD.to_string());
@ -3112,14 +3007,15 @@ pub mod api_server_v2_tests {
// Result<JoinResponse, BackendError>
.returning(|_, _, _, _| {
Ok(backend::JoinResponse {
ip: "127.0.0.1".to_string(),
ips: Some(vec!["127.0.0.1".to_string()]),
ips: vec!["127.0.0.1".to_string()],
port: 8080,
port_tcp: Some(8080),
port_tcp: 8080,
ice_ufrag: BACKEND_ICE_UFRAG.to_string(),
ice_pwd: BACKEND_ICE_PWD.to_string(),
dhe_public_key: Some(BACKEND_DHE_PUBLIC_KEY.to_string()),
dhe_public_key: BACKEND_DHE_PUBLIC_KEY.to_string(),
client_status: ACTIVE_CLIENT_STATUS.clone(),
hostname: None,
port_tls: None,
})
});
@ -3154,7 +3050,6 @@ pub mod api_server_v2_tests {
let join_response: JoinResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(join_response.demux_id, DEMUX_ID_2);
assert_eq!(join_response.port, 8080);
assert_eq!(join_response.ip, "127.0.0.1".to_string());
assert_eq!(join_response.ips, vec!["127.0.0.1".to_string()]);
assert_eq!(join_response.ice_ufrag, BACKEND_ICE_UFRAG.to_string());
assert_eq!(join_response.ice_pwd, BACKEND_ICE_PWD.to_string());

View File

@ -97,22 +97,24 @@ pub struct JoinRequest {
#[derive(Deserialize, Debug)]
pub struct JoinResponse {
#[serde(rename = "serverIp")]
pub ip: String,
#[serde(rename = "serverIps")]
pub ips: Option<Vec<String>>,
pub ips: Vec<String>,
#[serde(rename = "serverPort")]
pub port: u16,
#[serde(rename = "serverPortTcp", default)]
pub port_tcp: Option<u16>,
#[serde(rename = "serverPortTcp")]
pub port_tcp: u16,
#[serde(rename = "serverPortTls", default)]
pub port_tls: Option<u16>,
#[serde(rename = "serverHostname", default)]
pub hostname: Option<String>,
#[serde(rename = "serverIceUfrag")]
pub ice_ufrag: String,
#[serde(rename = "serverIcePwd")]
pub ice_pwd: String,
#[serde(rename = "serverDhePublicKey")]
pub dhe_public_key: Option<String>,
pub dhe_public_key: String,
#[serde(rename = "clientStatus")]
pub client_status: Option<String>,
pub client_status: String,
}
#[derive(thiserror::Error, Debug)]

View File

@ -59,13 +59,14 @@ pub struct JoinRequestWrapper {
pub struct JoinResponseWrapper {
pub demux_id: u32,
pub port: u16,
pub port_tcp: Option<u16>,
pub ip: String,
pub port_tcp: u16,
pub port_tls: Option<u16>,
pub hostname: Option<String>,
pub ips: Vec<String>,
pub ice_ufrag: String,
pub ice_pwd: String,
pub dhe_public_key: String,
pub client_status: Option<String>,
pub client_status: String,
}
pub struct ClientInfo {
@ -289,25 +290,16 @@ impl Frontend {
FrontendError::InternalError
})?;
let backend_dhe_public_key = backend_join_response.dhe_public_key.ok_or_else(|| {
error!("join_client_to_call: failed to receive dhe_public_key from the backend");
FrontendError::InternalError
})?;
let ips = match backend_join_response.ips {
Some(ips) => ips,
None => vec![backend_join_response.ip.clone()],
};
Ok(JoinResponseWrapper {
demux_id: demux_id.as_u32(),
port: backend_join_response.port,
port_tcp: backend_join_response.port_tcp,
ip: backend_join_response.ip,
ips,
port_tls: backend_join_response.port_tls,
ips: backend_join_response.ips,
hostname: backend_join_response.hostname,
ice_ufrag: backend_join_response.ice_ufrag,
ice_pwd: backend_join_response.ice_pwd,
dhe_public_key: backend_dhe_public_key,
dhe_public_key: backend_join_response.dhe_public_key,
client_status: backend_join_response.client_status,
})
}