Migrate service to GCP
Some checks failed
Security audit / audit (push) Has been cancelled

- Update Docker files for use on GCP
- Simplify configuration options
- Add support for accessing DynamoDB from GCP (identity token)
- Remove v1 frontend API

Co-authored-by: Ravi Khadiwala <ravi@signal.org>
This commit is contained in:
Jim Gustafson 2022-10-12 11:20:23 -07:00
parent 113dc6609d
commit 642b94f947
24 changed files with 593 additions and 1683 deletions

9
.cargo/audit.toml Normal file
View File

@ -0,0 +1,9 @@
[output]
deny = ["unmaintained", "unsound", "yanked"]
quiet = false
[advisories]
ignore = [
# Ignore advisory for time; the issue depends on modifying the environment across threads. It's not impossible but it's very unlikely in our use cases.
"RUSTSEC-2020-0071",
]

View File

@ -11,3 +11,4 @@ LICENSE
README.md
backend/fuzz
backend/Dockerfile
frontend/Dockerfile

View File

@ -18,6 +18,7 @@ jobs:
override: true
profile: minimal
components: rustfmt, clippy
- run: shellcheck **/*.sh
- name: Environment
run: rustup --version && cargo --version
- name: Build

View File

@ -1,15 +1,19 @@
# Building the Calling Backend
# Building
## For Development & Debugging
cargo run --bin calling_backend
You can specify a variety of command line arguments. See the [config.rs file](/src/config.rs) file for
more details or run:
or
cargo run --bin calling_backend -- --help
cargo run --bin calling_frontend
An example for debugging would be:
You must specify a variety of command line arguments. See [backend config.rs file](/backend/src/config.rs) or
[frontend config.rs file](/frontend/src/config.rs) for more details or run either with the `--help` option.
## Debugging with the Backend
An example for debugging the backend would be:
cargo run --bin calling_backend -- --binding-ip 192.168.1.100 --ice-candidate-ip 192.168.1.100 --diagnostics-interval-secs 1
@ -42,44 +46,20 @@ to instruct the compiler to optimize for the CPU that is performing the build it
## For Deployment
Signal uses the provided Dockerfile to build images for deployment. This uses a multi-stage process,
creating a stage for building, the binary for delivery, and a runnable image for testing.
Signal uses Docker files to build images for deployment. This uses a multi-stage process,
creating a stage for building and a runnable image.
### Building the Docker Images
Images currently run on AWS EC2 instances supporting the Intel Skylake architecture. When building
the images, we can target that specific CPU (or choose any other that matches the platform where the
container will be run):
When building the images, we can target that specific CPU (or choose any other that matches the platform where the
container will be run, such as the Intel Skylake architecture):
docker build -f backend/Dockerfile --build-arg rust_flags=-Ctarget-cpu=skylake -t signal-calling-backend .
or
docker build -f frontend/Dockerfile --build-arg rust_flags=-Ctarget-cpu=skylake -t signal-calling-frontend .
The ```build-arg``` can also be omitted to maintain maximum compatibility.
_Note: At the time of this writing, the skylake-avx512 target is not compatible with some dependencies._
### Deploying the Docker Image
The deployment is specific to the type of service or registry being used. For testing, the
image can be saved and copied somewhere for running. To save:
docker save signal-calling-backend:latest | gzip > signal-calling-backend-latest.tar.gz
### Running the Docker Container
To run the container, the following docker command can be used:
docker run -d --rm -p 8080:8080 -p 10000:10000/udp signal-calling-backend:latest
- ```-d``` runs the container in detached mode (can be omitted for easier testing)
- ```--rm``` will clean up the container when it is stopped
- ```-p 8080:8080``` connects the TCP port 8080 to the same one on the host
- ```-p 10000:10000/udp``` connects the UDP port 10000 to the same one on the host
### Binary Deployment
The docker file can also be used to obtain a binary file:
docker build -f backend/Dockerfile --build-arg rust_flags=-Ctarget-cpu=skylake -t signal-calling-backend --target export-stage -o bin .
This will build the calling_backend binary executable for Linux and copy it to the ./bin directory of
the host. The command will stop at the export-stage and not create the runnable docker image.

514
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,10 @@
Media forwarding server for group calls. Forwards media from 1 to N devices.
## Frontend
Signaling server for group calls that helps direct client requests to appropriate backends.
# Thanks
We thank WebRTC for the "googcc" congestion control algorithm (see googcc.rs for more details).

View File

@ -5,7 +5,7 @@
[package]
name = "calling_backend"
version = "1.4.3"
version = "1.5.0"
authors = ["Calling Team <callingteam@signal.org>"]
edition = "2021"
description = "Media forwarding server for group calls."
@ -22,7 +22,7 @@ scopeguard = "1.1"
# For logging and command line operations
log = "0.4"
env_logger = "0.9"
structopt = "0.3"
clap = { version = "3.0", features = ["derive"] }
# For runtime and threading
tokio = { version = "1", features = ["full"] }

View File

@ -3,10 +3,19 @@
# SPDX-License-Identifier: AGPL-3.0-only
#
# Use the current rust environment for building.
FROM rust:1.58.1-buster AS build-stage
ARG debian_ver=bullseye
FROM debian:${debian_ver} AS build-stage
# Update system packages.
RUN apt-get update \
&& apt-get install -y --no-install-recommends --no-install-suggests protobuf-compiler
&& apt-get upgrade -y \
&& apt-get install -y --no-install-recommends --no-install-suggests curl build-essential ca-certificates protobuf-compiler \
&& update-ca-certificates
# Install Rust.
RUN curl https://sh.rustup.rs -sSf | sh -s -- -y
ENV PATH="/root/.cargo/bin:${PATH}"
# Take in a build argument to specify RUSTFLAGS environment, usually a target-cpu.
ARG rust_flags
@ -30,19 +39,26 @@ RUN cargo build --bin calling_backend --release
COPY . .
RUN cargo build --bin calling_backend --release
# Export the calling_backend executable if the '-o' option is specified.
FROM scratch AS export-stage
# Create a minimal container to deploy and run the calling backend.
FROM debian:${debian_ver}-slim AS run-stage
COPY --from=build-stage /usr/src/calling-service/target/release/calling_backend calling_backend
# Update system packages.
RUN apt-get update \
&& apt-get upgrade -y \
# Install curl for ip detection.
&& apt-get install -y --no-install-recommends --no-install-suggests curl \
# Install jq for parsing gcp metadata.
&& apt-get install -y --no-install-recommends --no-install-suggests jq \
# Cleanup unnecessary stuff.
&& rm -rf /var/lib/apt/lists/*
# Create a minimal container to deploy and run the calling_backend.
FROM debian:buster-slim AS run-stage
COPY --from=build-stage /usr/src/calling-service/target/release/calling_backend /usr/local/bin/
COPY backend/docker-entrypoint.sh /usr/local/bin/
USER nobody:nogroup
# Expose http and udp server access ports to this container.
EXPOSE 8080
EXPOSE 10000/udp
COPY --from=build-stage /usr/src/calling-service/target/release/calling_backend .
USER 1000
ENTRYPOINT ["./calling_backend"]
ENTRYPOINT ["docker-entrypoint.sh"]

31
backend/docker-entrypoint.sh Executable file
View File

@ -0,0 +1,31 @@
#!/bin/bash
#
#
# Copyright 2022 Signal Messenger, LLC
# SPDX-License-Identifier: AGPL-3.0-only
#
#
if [[ -z "${EXTERNAL_IP}" ]]; then
EXTERNAL_IP="$(curl -Ss "http://metadata.google.internal/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip" -H "Metadata-Flavor: Google")"
if [[ -z "${EXTERNAL_IP}" ]]; then
echo "Error: EXTERNAL_IP not defined!"
exit 1
fi
fi
if [[ -z "${INTERNAL_IP}" ]]; then
INTERNAL_IP="$(curl -Ss "http://metadata.google.internal/computeMetadata/v1/instance/network-interfaces/0/ip" -H "Metadata-Flavor: Google")"
if [[ -z "${INTERNAL_IP}" ]]; then
echo "Error: INTERNAL_IP not defined!"
exit 1
fi
fi
set -- calling_backend \
--ice-candidate-ip "$EXTERNAL_IP" \
--signaling-ip "$INTERNAL_IP" \
"$@"
"$@"

View File

@ -1,7 +1,7 @@
This directory contains fuzz targets used with `cargo fuzz`.
```
// In the top-level source directory
// In the backend source directory
cargo install cargo-fuzz
cargo fuzz list
cargo +nightly fuzz run <fuzz-target>

View File

@ -18,7 +18,11 @@ use log::*;
use prost::Message;
use thiserror::Error;
use crate::{audio, protos, rtp::{self, VideoRotation}, vp8};
use crate::{
audio, protos,
rtp::{self, VideoRotation},
vp8,
};
pub const CLIENT_SERVER_DATA_SSRC: rtp::Ssrc = 1;
pub const CLIENT_SERVER_DATA_PAYLOAD_TYPE: rtp::PayloadType = 101;

View File

@ -7,68 +7,67 @@
use std::net::SocketAddr;
use serde::Deserialize;
use structopt::StructOpt;
use clap;
/// General configuration options, set by command line arguments or
/// falls back to default or environment variables (in some cases).
#[derive(Default, StructOpt, Debug, Clone)]
#[structopt(name = "calling_backend")]
#[derive(Default, clap::Parser, Debug, Clone)]
#[clap(name = "calling_backend")]
pub struct Config {
/// The IP address to bind to for all servers.
#[structopt(long, default_value = "0.0.0.0")]
#[clap(long, default_value = "0.0.0.0")]
pub binding_ip: String,
/// The IP address to share for for ICE candidates. Clients will connect
/// to the calling backend using this IP.
#[structopt(long, env = "ICE_CANDIDATE_IP")]
#[clap(long)]
pub ice_candidate_ip: Option<String>,
/// The port to use for ICE candidates. Clients will connect to the
/// calling backend using this port.
#[structopt(long, default_value = "10000")]
#[clap(long, default_value = "10000")]
pub ice_candidate_port: u16,
/// The IP address to share for direct access to the signaling_server. If
/// defined, then the signaling_server will be used, otherwise the
/// http_server will be used for testing.
#[structopt(long, env = "SIGNALING_IP")]
#[clap(long)]
pub signaling_ip: Option<String>,
/// The port to use for the signaling interface.
#[structopt(long, default_value = "8080")]
#[clap(long, default_value = "8080")]
pub signaling_port: u16,
/// Maximum clients per call, if using the http_server for testing.
#[structopt(long, default_value = "8")]
#[clap(long, default_value = "8")]
pub max_clients_per_call: u32,
/// The initial bitrate target for sending. In a 16-person call with
/// each base layer at 50kbps you'd need 800kbps to send them all.
#[structopt(long, default_value = "800")]
#[clap(long, default_value = "800")]
pub initial_target_send_rate_kbps: u64,
/// The min target send rate for sending.
/// This affects the congestion controller (googcc).
#[structopt(long, default_value = "100")]
#[clap(long, default_value = "100")]
pub min_target_send_rate_kbps: u64,
/// The max target send rate for sending.
/// This affects the congestion controller (googcc)
/// and indirectly the maximum that any client can receive
/// no matter how much the client requests.
#[structopt(long, default_value = "30000")]
#[clap(long, default_value = "30000")]
pub max_target_send_rate_kbps: u64,
/// If the client doesn't request a max send rate,
/// use this as the max send rate.
/// Affects the allocation of the target send rate,
/// not the calculation of the of the target send rate.
#[structopt(long, default_value = "5000")]
#[clap(long, default_value = "5000")]
pub default_requested_max_send_rate_kbps: u64,
/// Timer tick period for operating on the Sfu state (ms).
#[structopt(long, default_value = "100")]
#[clap(long, default_value = "100")]
pub tick_interval_ms: u64,
/// How quickly we want to drain each outgoing queue.
@ -76,57 +75,49 @@ pub struct Config {
/// It will push out other, lower-priority, streams to prioritize draining.
/// The lower the value here, the higher the rate and the
/// higher priority put on draining the outgoing queue.
#[structopt(long, default_value = "500")]
#[clap(long, default_value = "500")]
pub outgoing_queue_drain_ms: u64,
/// Optional interval used to post diagnostics to the log. If not defined
/// then no periodic information about calls will be posted to the log.
#[structopt(long, env = "DIAGNOSTICS_INTERVAL_SECS")]
#[clap(long)]
pub diagnostics_interval_secs: Option<u64>,
/// Interval for sending active speaker messages (ms). The amount of time
/// to wait between sending messages to the clients to remind them of the
/// current active speaker for the call. Using milliseconds in case sub-
/// second resolution is needed.
#[structopt(long, default_value = "1000")]
#[clap(long, default_value = "1000")]
pub active_speaker_message_interval_ms: u64,
/// Inactivity check interval (seconds). The amount of time to wait between
/// iterating structures for inactive calls and clients.
#[structopt(long, default_value = "5")]
#[clap(long, default_value = "5")]
pub inactivity_check_interval_secs: u64,
/// Amount of time to wait before dropping a call or client due to inactivity (seconds).
#[structopt(long, default_value = "30")]
#[clap(long, default_value = "30")]
pub inactivity_timeout_secs: u64,
#[structopt(flatten)]
#[clap(flatten)]
pub metrics: MetricsOptions,
}
#[derive(StructOpt, Clone, Debug, Default)]
#[derive(clap::Parser, Clone, Debug, Default)]
pub struct MetricsOptions {
/// Host and port of Datadog StatsD agent. Typically 127.0.0.1:8125.
#[structopt(long)]
#[clap(long)]
pub datadog: Option<String>,
/// Region appears as a tag in metrics and logging.
#[structopt(long = "metrics-region", default_value = "unspecified")]
#[clap(long = "metrics-region", default_value = "unspecified")]
pub region: String,
/// Deployment version appears as a tag in metrics and in logging if specified.
#[structopt(long = "metrics-version")]
#[clap(long = "metrics-version")]
pub version: Option<String>,
}
/// Deployment configuration options, used to set sensitive information
/// at runtime from a configuration file.
#[derive(Debug, Deserialize)]
pub struct DeploymentConfig {
#[serde(rename = "authenticationKey")]
pub authentication_key: String,
}
/// Returns the public address of the server for media/UDP as per configuration.
pub fn get_server_media_address(config: &'static Config) -> SocketAddr {
let ip = config

View File

@ -15,9 +15,9 @@ use calling_backend::{
config, http_server, metrics_server, sfu::Sfu, signaling_server, udp_server,
};
use calling_common::{DataRate, Duration, Instant};
use clap::Parser;
use env_logger::Env;
use parking_lot::Mutex;
use structopt::StructOpt;
use tokio::{
runtime,
signal::unix::{signal, SignalKind},
@ -26,7 +26,7 @@ use tokio::{
lazy_static! {
// Load the config and treat it as a read-only static value.
static ref CONFIG: config::Config = config::Config::from_args();
static ref CONFIG: config::Config = config::Config::parse();
}
#[rustfmt::skip]

View File

@ -5,7 +5,7 @@
[package]
name = "calling_frontend"
version = "0.3.2"
version = "0.4.0"
authors = ["Calling Team <callingteam@signal.org>"]
edition = "2021"
description = "Frontend server for group calls."
@ -22,7 +22,6 @@ thiserror = "1.0"
log = "0.4"
env_logger = "0.9"
clap = { version = "3.0", features = ["derive"] }
yaml-rust = "0.4"
# For runtime and threading
tokio = { version = "1", features = ["rt-multi-thread", "signal", "macros"] }
@ -35,8 +34,6 @@ base64 = "0.13"
hex = { version = "0.4", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9"
serde_dynamo = { version = "4", features = ["aws-sdk-dynamodb+0_16"] }
# For common and authentication
rand = "0.8"
@ -49,13 +46,15 @@ hyper = { version = "0.14", features = ["full"] }
axum = { version = "0.5", features = ["headers"] }
tower = "0.4"
mime = "0.3"
http = "0.2"
# For storage access to DynamoDB
aws-types = { version = "0.46", features = ["hardcoded-credentials"] }
aws-smithy-types = "0.46"
aws-config = "0.46"
aws-sdk-dynamodb = "0.16"
http = "0.2"
aws-types = { version = "0.49", features = ["hardcoded-credentials"] }
aws-smithy-types = "0.49"
aws-smithy-async = "0.49"
aws-config = "0.49"
aws-sdk-dynamodb = "0.19"
serde_dynamo = { version = "4", features = ["aws-sdk-dynamodb+0_19"] }
# For metrics
parking_lot = "0.12"

View File

@ -3,10 +3,14 @@
# SPDX-License-Identifier: AGPL-3.0-only
#
# Use Debian stretch for now to ensure GLIBC < 2.28.
FROM debian:stretch-20220316 AS build-stage
ARG debian_ver=bullseye
FROM debian:${debian_ver} AS build-stage
# Update system packages.
RUN apt-get update \
&& apt-get install -y --no-install-recommends --no-install-suggests build-essential curl ca-certificates \
&& apt-get upgrade -y \
&& apt-get install -y --no-install-recommends --no-install-suggests curl build-essential ca-certificates protobuf-compiler \
&& update-ca-certificates
# Install Rust.
@ -35,22 +39,27 @@ RUN cargo build --bin calling_frontend --release
COPY . .
RUN cargo build --bin calling_frontend --release
# Export the calling_frontend executable if the '-o' option is specified.
FROM scratch AS export-stage
COPY --from=build-stage /usr/src/calling-service/target/release/calling_frontend calling_frontend
# Create a minimal container to deploy and run the calling_frontend.
FROM debian:stretch-slim AS run-stage
# Create a minimal container to deploy and run the calling frontend.
FROM debian:${debian_ver}-slim AS run-stage
# Update system packages.
RUN apt-get update \
&& apt-get upgrade -y \
&& apt-get install -y --no-install-recommends --no-install-suggests ca-certificates \
&& update-ca-certificates
&& update-ca-certificates \
# Install curl for ip detection.
&& apt-get install -y --no-install-recommends --no-install-suggests curl \
# Install jq for parsing gcp metadata.
&& apt-get install -y --no-install-recommends --no-install-suggests jq \
# Cleanup unnecessary stuff.
&& rm -rf /var/lib/apt/lists/*
COPY --from=build-stage /usr/src/calling-service/target/release/calling_frontend /usr/local/bin/
COPY frontend/docker-entrypoint.sh /usr/local/bin/
USER nobody:nogroup
# Expose http server access ports to this container.
EXPOSE 8090
EXPOSE 8080
COPY --from=build-stage /usr/src/calling-service/target/release/calling_frontend .
USER 1000
ENTRYPOINT ["./calling_frontend"]
ENTRYPOINT ["docker-entrypoint.sh"]

42
frontend/docker-entrypoint.sh Executable file
View File

@ -0,0 +1,42 @@
#!/bin/bash
#
#
# Copyright 2022 Signal Messenger, LLC
# SPDX-License-Identifier: AGPL-3.0-only
#
#
if [[ -z "${REGION}" ]]; then
ZONE="$(curl -Ss "http://metadata.google.internal/computeMetadata/v1/instance/zone" -H "Metadata-Flavor: Google")"
REGION=$(echo "$ZONE" | awk -F/ '{ print $NF }' | awk -F- '{OFS="-"; NF--; print $0}')
if [[ -z "${REGION}" ]]; then
echo "Error: REGION not defined!"
exit 1
fi
fi
if [[ -z "${CALLING_AUTH_KEY}" ]]; then
if [[ -z "${SECRET_PROJECT}" ]]; then
echo "Error: SECRET_PROJECT not defined but needed to get calling-auth-key!"
exit 1
fi
if [[ -z "${AUTH_SECRET_NAME}" ]]; then
echo "Error: AUTH_SECRET_NAME not defined but needed to get calling-auth-key!"
exit 1
fi
TOKEN="$(curl -Ss "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token" -H "Metadata-Flavor: Google" | jq '.access_token')"
CALLING_AUTH_KEY="$(curl -Ss "https://secretmanager.googleapis.com/v1/projects/$SECRET_PROJECT/secrets/$AUTH_SECRET_NAME/versions/latest:access" -H "Metadata-Flavor: Google" -H "authorization: Bearer $TOKEN" | jq -r '.payload.data' | base64 --decode)"
if [[ -z "${CALLING_AUTH_KEY}" ]]; then
echo "Error: CALLING_AUTH_KEY not defined!"
exit 1
fi
fi
set -- calling_frontend \
--region "$REGION" \
--authentication-key "$CALLING_AUTH_KEY" \
"$@"
"$@"

View File

@ -3,7 +3,6 @@
// SPDX-License-Identifier: AGPL-3.0-only
//
mod v1;
mod v2;
use std::{
@ -20,7 +19,7 @@ use axum::{
handler::Handler,
middleware::{self, Next},
response::IntoResponse,
routing::{delete, get},
routing::get,
Extension, Router,
};
use http::{header, Request, StatusCode};
@ -117,14 +116,11 @@ async fn metrics<B>(
let latency = start.elapsed();
let version;
if path.starts_with("/v1/") {
version = "v1";
} else if path.starts_with("/v2/") {
version = "v2";
let version = if path.starts_with("/v2/") {
"v2"
} else {
version = "unknown";
}
"unknown"
};
let mut api_metrics = frontend.api_metrics.lock();
@ -253,14 +249,6 @@ fn app(frontend: Arc<Frontend>) -> Router {
let health_route = Router::new().route("/health", get(get_health));
let routes = Router::new()
.route(
"/v1/conference/participants",
get(v1::get_participants).put(v1::join),
)
.route(
"/v1/conference/participants/:endpoint_id",
delete(v1::leave),
)
.route(
"/v2/conference/participants",
get(v2::get_participants).put(v2::join),
@ -279,8 +267,6 @@ fn app(frontend: Arc<Frontend>) -> Router {
}
pub async fn start(frontend: Arc<Frontend>, ender_rx: Receiver<()>) -> Result<()> {
// TODO: Fix if address is already in use, be sure it brings down the whole server.
let addr = SocketAddr::new(
IpAddr::from_str(&frontend.config.server_ip)?,
frontend.config.server_port,

File diff suppressed because it is too large Load Diff

View File

@ -195,7 +195,6 @@ mod authenticator_tests {
use super::*;
use env_logger::Env;
use hex::ToHex;
//use hmac::digest::generic_array::GenericArray;
const AUTH_KEY_1: &str = "f00f0014fe091de31827e8d686969fad65013238aadd25ef8629eb8a9e5ef69b";
const AUTH_KEY_2: &str = "f00f0072f8ee256b9ba24255897230342cc83b76a3964d6288a7ac8ae4e8e9ca";

View File

@ -117,6 +117,8 @@ pub trait Backend: Sync + Send {
pub struct BackendHttpClient {
http_client: HttpClient<HttpConnector>,
/// URL used when invoking the get_info() API so that the request goes through
/// a load balancer.
base_url: String,
}
@ -124,16 +126,9 @@ impl BackendHttpClient {
pub fn from_config(config: &'static config::Config) -> Self {
let client = HttpClient::builder().build_http();
// Build the base_url string for the backend client. This is used when invoking
// the get_info() API so that the request goes through the load balancer for the region.
let base_url = config
.calling_server_url_template
.replace("<region>", &config.region)
.replace("<version>", &config.calling_server_version);
Self {
http_client: client,
base_url,
base_url: config.calling_server_url.clone(),
}
}
}

View File

@ -4,125 +4,77 @@
//
use clap;
use serde::Deserialize;
/// Configuration options from command line arguments.
#[derive(Default, clap::Parser, Debug, Clone)]
#[clap(name = "calling_frontend")]
pub struct ArgConfig {
pub struct Config {
/// The IP address to bind to for the server.
#[clap(long, default_value = "0.0.0.0")]
pub server_ip: String,
/// The port to use to access the server.
#[clap(long, default_value = "8090")]
#[clap(long, default_value = "8080")]
pub server_port: u16,
/// Region of the frontend. Appears as a tag in metrics and logging.
/// GCP region of the frontend. Appears as a tag in metrics and logging.
#[clap(long)]
pub region: String,
/// The authentication key to use when validating API requests.
#[clap(long)]
pub authentication_key: String,
/// Deployment version of the frontend. Appears as a tag in metrics and in logging.
#[clap(long)]
pub version: String,
/// A version string suitable to be used in the calling_server_url_template.
/// Examples: "21", "staging.21"
#[clap(long)]
pub calling_server_version: String,
/// The path to a yaml file with further configuration settings.
#[clap(long)]
pub yaml: String,
}
/// Configuration options from a yaml file.
#[derive(Default, Deserialize)]
pub struct YamlConfig {
/// Maximum clients per call.
#[clap(long)]
pub max_clients_per_call: u32,
/// Interval for removing ended calls from the database.
#[clap(long)]
pub cleanup_interval_ms: u64,
/// The authentication key to use when validating API requests.
pub authentication_key: String,
/// A URL template string that provides a region-specific address of the server and
/// used for redirects.
/// '<region>' will be substituted with the current region.
/// Example: "https://sfu.<region>.voip.signal.org"
#[clap(long)]
pub regional_url_template: String,
/// A URL template string that provides a specific address to a calling server.
/// '<region>' will be substituted with the current region.
/// '<version>' will be substituted with the current deployment version.
/// Example: `http://cs.<version>.<region>.voip.signal.org`
pub calling_server_url_template: String,
/// The URL of the calling server to access for the backend.
#[clap(long)]
pub calling_server_url: String,
/// The key used for accessing storage, such as the AWS_ACCESS_KEY_ID.
pub storage_key: String,
/// Interval for fetching a new identity token for storage support via DynamodDB.
#[clap(long, default_value = "600000")]
pub identity_fetcher_interval_ms: u64,
/// The password used for accessing storage, such as the AWS_SECRET_ACCESS_KEY.
pub storage_password: String,
/// Where to fetch identity tokens from for storage support via DynamodDB.
#[clap(long)]
pub identity_token_url: Option<String>,
/// The name of the table that provides the list of calls being tracked.
#[clap(long)]
pub storage_table: String,
/// The region in which the server resides.
/// The AWS region in which the DynamoDB server resides.
#[clap(long)]
pub storage_region: String,
/// The storage endpoint used only for testing. Typically something like http://dynamodb:8000.
/// The storage endpoint used only for testing. Typically something like "http://dynamodb:8000".
/// Do not specify anything for production.
#[clap(long)]
pub storage_endpoint: Option<String>,
/// IP and port of Datadog StatsD agent. Typically 127.0.0.1:8125. If not
/// present, metrics will be disabled.
#[clap(long)]
pub metrics_datadog_host: Option<String>,
}
pub struct Config {
pub server_ip: String,
pub server_port: u16,
pub region: String,
pub version: String,
pub calling_server_version: String,
pub max_clients_per_call: u32,
pub cleanup_interval_ms: u64,
pub authentication_key: String,
pub regional_url_template: String,
pub calling_server_url_template: String,
pub storage_key: String,
pub storage_password: String,
pub storage_table: String,
pub storage_region: String,
pub storage_endpoint: Option<String>,
pub metrics_datadog_host: Option<String>,
}
impl Config {
pub fn merge(arg_config: ArgConfig, yaml_config: YamlConfig) -> Config {
Config {
server_ip: arg_config.server_ip,
server_port: arg_config.server_port,
region: arg_config.region,
version: arg_config.version,
calling_server_version: arg_config.calling_server_version,
max_clients_per_call: yaml_config.max_clients_per_call,
cleanup_interval_ms: yaml_config.cleanup_interval_ms,
authentication_key: yaml_config.authentication_key,
regional_url_template: yaml_config.regional_url_template,
calling_server_url_template: yaml_config.calling_server_url_template,
storage_key: yaml_config.storage_key,
storage_password: yaml_config.storage_password,
storage_table: yaml_config.storage_table,
storage_region: yaml_config.storage_region,
storage_endpoint: yaml_config.storage_endpoint,
metrics_datadog_host: yaml_config.metrics_datadog_host,
}
}
}
#[cfg(test)]
pub fn default_test_config() -> Config {
Config {
@ -130,18 +82,17 @@ pub fn default_test_config() -> Config {
server_port: 8080,
max_clients_per_call: 8,
cleanup_interval_ms: 5000,
identity_fetcher_interval_ms: 1000 * 60 * 10,
identity_token_url: None,
authentication_key: "f00f0014fe091de31827e8d686969fad65013238aadd25ef8629eb8a9e5ef69b"
.to_string(),
region: "us-west-1".to_string(),
region: "us-west1".to_string(),
version: "1".to_string(),
regional_url_template: "".to_string(),
calling_server_url_template: "http://127.0.0.1:8080".to_string(),
calling_server_version: "1".to_string(),
storage_key: "DUMMYKEY".to_string(),
storage_password: "DUMMYSECRET".to_string(),
storage_table: "Conferences".to_string(),
storage_region: "us-east-2".to_string(),
storage_endpoint: Some("http://127.0.0.1:8000".to_string()),
calling_server_url: "http://127.0.0.1:8080".to_string(),
storage_table: "CallRecords".to_string(),
storage_region: "us-east-1".to_string(),
storage_endpoint: Some("localhost:9010".to_string()),
metrics_datadog_host: None,
}
}

View File

@ -33,6 +33,12 @@ pub type UserId = String;
#[derive(Clone, Deserialize, Serialize, Eq, PartialEq)]
pub struct GroupId(String);
impl From<String> for GroupId {
fn from(group_id_string: String) -> Self {
Self(group_id_string)
}
}
impl From<&str> for GroupId {
fn from(group_id: &str) -> Self {
Self(group_id.to_string())
@ -121,8 +127,6 @@ pub struct JoinResponseWrapper {
pub ice_ufrag: String,
pub ice_pwd: String,
pub dhe_public_key: String,
// Needed for the v1 api.
pub client_id: String,
}
#[derive(thiserror::Error, Debug, Eq, PartialEq)]
@ -382,7 +386,6 @@ impl Frontend {
ice_ufrag: backend_join_response.ice_ufrag,
ice_pwd: backend_join_response.ice_pwd,
dhe_public_key: backend_dhe_public_key,
client_id,
})
}

View File

@ -9,7 +9,7 @@ extern crate lazy_static;
#[macro_use]
extern crate log;
use std::{fs::File, sync::Arc};
use std::sync::Arc;
use anyhow::Result;
use calling_common::Duration;
@ -28,14 +28,7 @@ use tokio::{
lazy_static! {
// Load the config and treat it as a read-only static value.
static ref CONFIG: config::Config = {
let arg_config = config::ArgConfig::parse();
let f = File::open(&arg_config.yaml).expect("yaml file can be opened");
let yaml_config = serde_yaml::from_reader(f).expect("yaml file can be parsed");
config::Config::merge(arg_config, yaml_config)
};
static ref CONFIG: config::Config = config::Config::parse();
}
#[rustfmt::skip]
@ -48,10 +41,9 @@ fn print_config(config: &'static config::Config) {
info!(" {:38}{}", "region:", config.region);
info!(" {:38}{}", "version:", config.version);
info!(" {:38}{}", "regional_url_template:", config.regional_url_template);
info!(" {:38}{}", "calling_server_url_template:", config.calling_server_url_template);
info!(" {:38}{}", "calling_server_version_param:", config.calling_server_version);
info!(" {:38}{}", "calling_server_url:", config.calling_server_url);
info!(" {:38}{}", "storage_table:", config.storage_table);
info!(" {:38}{}", "storage_region:", config.storage_region);
info!(" {:38}{:?}", "identity_url:", config.identity_token_url);
info!(" {:38}{:?}", "storage_endpoint:", config.storage_endpoint);
info!(" {:38}{}", "metrics_datadog:",
match &config.metrics_datadog_host {
@ -120,23 +112,26 @@ fn main() -> Result<()> {
// for each core on the system.
let threaded_rt = runtime::Runtime::new()?;
// Create an authenticator.
let authenticator = Authenticator::from_hex_key(&config.authentication_key)?;
let (api_ender_tx, api_ender_rx) = oneshot::channel();
let (cleaner_ender_tx, cleaner_ender_rx) = oneshot::channel();
let (metrics_ender_tx, metrics_ender_rx) = oneshot::channel();
let (identity_fetcher_ender_tx, identity_fetcher_ender_rx) = oneshot::channel();
let (signal_canceller_tx, signal_canceller_rx) = mpsc::channel(1);
let signal_canceller_tx_clone_for_cleaner = signal_canceller_tx.clone();
let signal_canceller_tx_clone_for_metrics = signal_canceller_tx.clone();
let signal_canceller_tx_clone_for_identity_fetcher = signal_canceller_tx.clone();
// Create frontend entities that might fail.
let authenticator = Authenticator::from_hex_key(&config.authentication_key)?;
let (storage, identity_fetcher) = threaded_rt.block_on(DynamoDb::new(config))?;
threaded_rt.block_on(async {
// Create the shared Frontend state.
let frontend: Arc<Frontend> = Arc::new(Frontend {
config,
authenticator,
storage: Box::new(DynamoDb::new(config).await),
storage: Box::new(storage),
backend: Box::new(BackendHttpClient::from_config(config)),
id_generator: Box::new(FrontendIdGenerator),
api_metrics: Mutex::new(Default::default()),
@ -163,6 +158,14 @@ fn main() -> Result<()> {
let _ = signal_canceller_tx_clone_for_metrics.send(()).await;
});
// Start the identity token fetcher.
let fetcher_handle = tokio::spawn(async move {
let _ = identity_fetcher.start(identity_fetcher_ender_rx).await;
let _ = signal_canceller_tx_clone_for_identity_fetcher
.send(())
.await;
});
// Wait for any signals to be detected, or cancel due to one of the
// servers not being able to be started (the channel is buffered).
wait_for_signal(signal_canceller_rx).await;
@ -171,9 +174,10 @@ fn main() -> Result<()> {
let _ = api_ender_tx.send(());
let _ = cleaner_ender_tx.send(());
let _ = metrics_ender_tx.send(());
let _ = identity_fetcher_ender_tx.send(());
// Wait for the servers to exit.
let _ = tokio::join!(api_handle, cleaner_handle, metrics_handle,);
let _ = tokio::join!(api_handle, cleaner_handle, metrics_handle, fetcher_handle);
});
info!("shutting down the runtime");

View File

@ -3,19 +3,25 @@
// SPDX-License-Identifier: AGPL-3.0-only
//
use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use aws_sdk_dynamodb::{
model::{AttributeValue, Select},
types::SdkError,
Client, Config, Endpoint,
};
use aws_smithy_async::rt::sleep::default_async_sleep;
use aws_smithy_types::retry::RetryConfigBuilder;
use aws_types::{region::Region, Credentials};
use calling_common::Duration;
use http::Uri;
use hyper::client::HttpConnector;
use hyper::{Body, Method, Request};
use log::*;
use serde::{Deserialize, Serialize};
use serde_dynamo::{from_item, to_item};
use std::{env, path::PathBuf};
use tokio::{io::AsyncWriteExt, sync::oneshot::Receiver};
#[cfg(test)]
use mockall::{automock, predicate::*};
@ -23,6 +29,7 @@ use mockall::{automock, predicate::*};
use crate::{
config,
frontend::{GroupId, UserId},
metrics::Timer,
};
const GROUP_CONFERENCE_ID_STRING: &str = "groupConferenceId";
@ -85,49 +92,68 @@ pub struct DynamoDb {
}
impl DynamoDb {
pub async fn new(config: &'static config::Config) -> Self {
pub async fn new(config: &'static config::Config) -> Result<(Self, IdentityFetcher)> {
let sleep_impl =
default_async_sleep().ok_or_else(|| anyhow!("failed to create sleep_impl"))?;
let identity_fetcher;
let client = match &config.storage_endpoint {
None => {
Some(endpoint) => {
const KEY: &str = "DUMMY_KEY";
const PASSWORD: &str = "DUMMY_PASSWORD";
info!("Using endpoint for DynamodDB testing: {}", endpoint);
// Create an identity fetcher with a dummy token path, which isn't used
// for testing.
identity_fetcher = IdentityFetcher::new(config, "/tmp/token");
let aws_config = Config::builder()
.credentials_provider(Credentials::from_keys(KEY, PASSWORD, None))
.endpoint_resolver(Endpoint::immutable(Uri::from_static(endpoint)))
.sleep_impl(sleep_impl)
.region(Region::new(&config.storage_region))
.build();
Client::from_conf(aws_config)
}
_ => {
info!(
"Using region for DynamodDB access: {}",
config.storage_region.as_str()
);
// Get the location of the identity token file from the environment variable,
// the same location that the client will try to get it from for credentials.
let identity_token_path = env::var("AWS_WEB_IDENTITY_TOKEN_FILE")?;
identity_fetcher = IdentityFetcher::new(config, &identity_token_path);
// Fetch an identity token once before connecting for the first time.
identity_fetcher.fetch_token().await?;
let retry_config = RetryConfigBuilder::new()
.max_attempts(4)
.initial_backoff(std::time::Duration::from_millis(100))
.build();
let aws_config = Config::builder()
.credentials_provider(Credentials::from_keys(
&config.storage_key,
&config.storage_password,
None,
))
let aws_config = aws_config::from_env()
.sleep_impl(sleep_impl)
.retry_config(retry_config)
.region(Region::new(&config.storage_region))
.build();
Client::from_conf(aws_config)
}
Some(endpoint) => {
info!("Using endpoint for DynamodDB testing: {}", endpoint);
let aws_config = Config::builder()
.credentials_provider(Credentials::from_keys(
&config.storage_key,
&config.storage_password,
None,
))
.endpoint_resolver(Endpoint::immutable(Uri::from_static(endpoint)))
.region(Region::new(&config.storage_region))
.build();
Client::from_conf(aws_config)
.load()
.await;
Client::new(&aws_config)
}
};
Self {
client,
table_name: config.storage_table.to_string(),
}
Ok((
Self {
client,
table_name: config.storage_table.to_string(),
},
identity_fetcher,
))
}
}
@ -177,8 +203,6 @@ impl Storage for DynamoDb {
Err(SdkError::ServiceError { err: e, raw: _ })
if e.is_conditional_check_failed_exception() =>
{
// TODO: This log replicates behavior of the old server, remove if not useful.
info!("Conditional check failed, call now already exists");
Ok(self
.get_call_record(&call.group_id)
.await
@ -220,8 +244,6 @@ impl Storage for DynamoDb {
Err(SdkError::ServiceError { err: e, raw: _ })
if e.is_conditional_check_failed_exception() =>
{
// TODO: This log replicates behavior of the old server, remove if not useful.
info!("Item already removed or replaced: {:.6}", call_id);
Ok(())
}
Err(err) => Err(StorageError::UnexpectedError(err.into())),
@ -259,3 +281,79 @@ impl Storage for DynamoDb {
Ok(vec![])
}
}
/// Supports the DynamoDB storage implementation by periodically refreshing an identity
/// token file at the location given by `identity_token_path`.
pub struct IdentityFetcher {
client: hyper::Client<HttpConnector>,
fetch_interval: Duration,
identity_token_path: PathBuf,
identity_token_url: Option<String>,
}
impl IdentityFetcher {
fn new(config: &'static config::Config, identity_token_path: &str) -> Self {
IdentityFetcher {
client: hyper::client::Client::builder().build_http(),
fetch_interval: Duration::from_millis(config.identity_fetcher_interval_ms),
identity_token_path: PathBuf::from(identity_token_path),
identity_token_url: config.identity_token_url.to_owned(),
}
}
async fn fetch_token(&self) -> Result<()> {
if let Some(url) = &self.identity_token_url {
let request = Request::builder()
.method(Method::GET)
.uri(url)
.header("Metadata-Flavor", "Google")
.body(Body::empty())?;
debug!("Fetching identity token from {}", url);
let body = self.client.request(request).await?;
let body = hyper::body::to_bytes(body).await?;
let temp_name = self.identity_token_path.with_extension("bak");
let mut temp_file = tokio::fs::File::create(&temp_name).await?;
temp_file.write_all(&body).await?;
tokio::fs::rename(temp_name, &self.identity_token_path).await?;
debug!(
"Successfully wrote identity token to {:?}",
&self.identity_token_path
);
}
Ok(())
}
pub async fn start(self, ender_rx: Receiver<()>) -> Result<()> {
// Periodically fetch a new web identity from GCP.
let fetcher_handle = tokio::spawn(async move {
loop {
// Use sleep() instead of interval() so that we never wait *less* than one
// interval to do the next tick.
tokio::time::sleep(self.fetch_interval.into()).await;
let timer = start_timer_us!("calling.frontend.identity_fetcher.timed");
let result = &self.fetch_token().await;
if let Err(e) = result {
event!("calling.frontend.identity_fetcher.error");
error!("Failed to fetch identity token : {:?}", e);
}
timer.stop();
}
});
info!("fetcher ready");
// Wait for any task to complete and cancel the rest.
tokio::select!(
_ = fetcher_handle => {},
_ = ender_rx => {},
);
info!("fetcher shutdown");
Ok(())
}
}