Keep a map of minimum round trip time by network

To reduce lock contention, one map per relay server.

Admin server periodically tells the relay servers to cycle to fresh maps,
and the admin server outputs the old maps for external analysis.

Also, clang-format earlier changes
This commit is contained in:
Richard Russo 2023-10-12 10:32:01 -07:00
parent 7de3df6f62
commit 7f119911ba
10 changed files with 305 additions and 71 deletions

View File

@ -529,37 +529,37 @@ const char *socket_type_name(SOCKET_TYPE st) {
}
const char *duration_name(unsigned long duration) {
if (duration < 60) {
return "1min";
} else if (duration < 600) {
return "10mins";
} else if (duration < 86400) {
return "24hrs";
} else {
return "days";
}
if (duration < 60) {
return "1min";
} else if (duration < 600) {
return "10mins";
} else if (duration < 86400) {
return "24hrs";
} else {
return "days";
}
}
const char *rate_name(unsigned long rate_kbps) {
if (rate_kbps < 1) {
return "1kbps";
} else if (rate_kbps < 50) {
return "50kbps";
} else if (rate_kbps < 2500) {
return "2500kbps";
} else {
return "10000kbps";
}
if (rate_kbps < 1) {
return "1kbps";
} else if (rate_kbps < 50) {
return "50kbps";
} else if (rate_kbps < 2500) {
return "2500kbps";
} else {
return "10000kbps";
}
}
const char *addr_family_name(int addr_family) {
if (addr_family == AF_INET) {
return "ipv4";
} else if (addr_family == AF_INET6) {
return "ipv6";
} else {
return "other";
}
if (addr_family == AF_INET) {
return "ipv4";
} else if (addr_family == AF_INET6) {
return "ipv6";
} else {
return "other";
}
}
/////////////////// MTU /////////////////////////////////////////

View File

@ -43,6 +43,10 @@ static unsigned int barrier_count = 0;
static pthread_barrier_t barrier;
#endif
// Signal change to add rtt metrics
static pthread_barrier_t rtt_barrier;
////////////// Auth Server ////////////////
typedef unsigned char authserver_id;
@ -686,6 +690,37 @@ err:
return ret;
}
// Signal change to add rtt metrics
int send_cycle_rtt_map_to_relay(turnserver_id id) {
int ret = 0;
struct message_to_relay sm;
memset(&sm, 0, sizeof(struct message_to_relay));
sm.t = RMT_CYCLE_RTT_MAP;
struct relay_server *rs = get_relay_server(id);
if (!rs) {
TURN_LOG_FUNC(TURN_LOG_LEVEL_ERROR, "%s: can't find relay for turn_server_id: %d\n", __FUNCTION__, (int)id);
ret = -1;
goto err;
}
sm.relay_server = rs;
{
struct evbuffer *output = bufferevent_get_output(rs->out_buf);
if (output) {
evbuffer_add(output, &sm, sizeof(struct message_to_relay));
} else {
TURN_LOG_FUNC(TURN_LOG_LEVEL_ERROR, "%s: Empty output buffer\n", __FUNCTION__);
ret = -1;
}
}
err:
return ret;
}
static int handle_relay_message(relay_server_handle rs, struct message_to_relay *sm) {
if (rs && sm) {
@ -766,6 +801,18 @@ static int handle_relay_message(relay_server_handle rs, struct message_to_relay
sm->m.sm.nd.nbh = NULL;
break;
}
// Signal change to add rtt metric
case RMT_CYCLE_RTT_MAP: {
rs->server.rtt_ms_mins = ur_map_create();
int br = 0;
do {
br = pthread_barrier_wait(&rtt_barrier);
if ((br < 0) && (br != PTHREAD_BARRIER_SERIAL_THREAD)) {
perror("rtt barrier wait (message)");
}
} while ((br < 0) && (br != PTHREAD_BARRIER_SERIAL_THREAD));
break;
}
default: {
perror("Weird buffer type\n");
}
@ -1872,3 +1919,61 @@ void setup_server(void) {
void init_listener(void) { memset(&turn_params.listener, 0, sizeof(struct listener_server)); }
///////////////////////////////
// Signal change to add rtt metrics
size_t cycle_rtt_ms_maps(ur_map **rtt_ms_maps, size_t len) {
if (len != 1 + ((turnserver_id)-1)) {
TURN_LOG_FUNC(TURN_LOG_LEVEL_ERROR, "cycle_rtt_ms_maps, length is %ld, must be %ld\n", len,
1L + ((turnserver_id)-1));
return 0;
}
size_t count = 0;
for (size_t i = 0; i < get_real_general_relay_servers_number(); i++) {
if (general_relay_servers[i] && general_relay_servers[i]->server.rtt_ms_mins) {
rtt_ms_maps[count] = general_relay_servers[i]->server.rtt_ms_mins;
++count;
}
}
for (size_t i = 0; i < get_real_udp_relay_servers_number(); i++) {
if (udp_relay_servers[i] && udp_relay_servers[i]->server.rtt_ms_mins) {
rtt_ms_maps[count] = udp_relay_servers[i]->server.rtt_ms_mins;
++count;
}
}
static size_t last_count = 0;
if (last_count != count) {
if (last_count) {
if (pthread_barrier_destroy(&rtt_barrier) != 0) {
perror("rtt barrier destroy");
return 0;
}
}
if (pthread_barrier_init(&rtt_barrier, NULL, count + 1) != 0) {
perror("rtt barrier init");
return 0;
}
last_count = count;
}
for (size_t i = 0; i < get_real_general_relay_servers_number(); i++) {
if (general_relay_servers[i] && general_relay_servers[i]->server.rtt_ms_mins) {
send_cycle_rtt_map_to_relay(i);
}
}
for (size_t i = 0; i < get_real_udp_relay_servers_number(); i++) {
if (udp_relay_servers[i] && udp_relay_servers[i]->server.rtt_ms_mins) {
send_cycle_rtt_map_to_relay(i + TURNSERVER_ID_BOUNDARY_BETWEEN_TCP_AND_UDP);
}
}
int br = 0;
do {
br = pthread_barrier_wait(&rtt_barrier);
if ((br < 0) && (br != PTHREAD_BARRIER_SERIAL_THREAD)) {
perror("rtt barrier wait");
}
} while ((br < 0) && (br != PTHREAD_BARRIER_SERIAL_THREAD));
return count;
}

View File

@ -3574,7 +3574,8 @@ void turn_report_allocation_set(void *a, turn_time_t lifetime, int refresh) {
#endif
{
if (!refresh)
prom_inc_allocation(get_ioa_socket_type(ss->client_socket), get_ioa_socket_address_family(ss->client_socket));
prom_inc_allocation(get_ioa_socket_type(ss->client_socket),
get_ioa_socket_address_family(ss->client_socket));
}
}
}
@ -3653,10 +3654,7 @@ void turn_report_allocation_delete(void *a, SOCKET_TYPE socket_type, int family)
turn_time_t ct = get_turn_server_time(server) - ss->start_time;
const uint32_t byte_to_kilobit = 125;
uint64_t sent_rate_kbps = ss->sent_rate / byte_to_kilobit;
prom_dec_allocation(socket_type,
family,
(unsigned long)ct,
(unsigned long)sent_rate_kbps);
prom_dec_allocation(socket_type, family, (unsigned long)ct, (unsigned long)sent_rate_kbps);
}
}
}

View File

@ -288,6 +288,10 @@ void *allocate_super_memory_engine_func(ioa_engine_handle e, size_t size, const
/////////////////////////////////////////////////
// Signal change to add rtt metrics
int send_cycle_rtt_map_to_relay(turnserver_id id);
size_t cycle_rtt_ms_maps(ur_map **rtt_ms_maps, size_t len);
#ifdef __cplusplus
}
#endif

View File

@ -106,8 +106,7 @@ void start_prometheus_server(void) {
prom_counter_new("turn_total_traffic_peer_sentb", "Represents total finished sessions peer sent bytes", 0, NULL));
// Create total completed session counter metric
const char *total_sessions_labels[] =
{"duration", "sent_rate" };
const char *total_sessions_labels[] = {"duration", "sent_rate"};
turn_total_sessions = prom_collector_registry_must_register_metric(
prom_counter_new("turn_total_sessions", "Represents total completed sessions", 2, total_sessions_labels));
@ -120,20 +119,20 @@ void start_prometheus_server(void) {
// Create round trip time pseudo-histogram metrics
// values must be kept in sync with observation function below
turn_rtt_client[0] = prom_collector_registry_must_register_metric(
prom_counter_new("turn_rtt_client_le_25ms", "Represents measured round trip time of client with channel", 0, NULL));
turn_rtt_client[1] = prom_collector_registry_must_register_metric(
prom_counter_new("turn_rtt_client_le_50ms", "Represents measured round trip time of client with channel", 0, NULL));
turn_rtt_client[2] = prom_collector_registry_must_register_metric(
prom_counter_new("turn_rtt_client_le_100ms", "Represents measured round trip time of client with channel", 0, NULL));
turn_rtt_client[3] = prom_collector_registry_must_register_metric(
prom_counter_new("turn_rtt_client_le_200ms", "Represents measured round trip time of client with channel", 0, NULL));
turn_rtt_client[4] = prom_collector_registry_must_register_metric(
prom_counter_new("turn_rtt_client_le_400ms", "Represents measured round trip time of client with channel", 0, NULL));
turn_rtt_client[5] = prom_collector_registry_must_register_metric(
prom_counter_new("turn_rtt_client_le_800ms", "Represents measured round trip time of client with channel", 0, NULL));
turn_rtt_client[6] = prom_collector_registry_must_register_metric(
prom_counter_new("turn_rtt_client_le_1500ms", "Represents measured round trip time of client with channel", 0, NULL));
turn_rtt_client[0] = prom_collector_registry_must_register_metric(prom_counter_new(
"turn_rtt_client_le_25ms", "Represents measured round trip time of client with channel", 0, NULL));
turn_rtt_client[1] = prom_collector_registry_must_register_metric(prom_counter_new(
"turn_rtt_client_le_50ms", "Represents measured round trip time of client with channel", 0, NULL));
turn_rtt_client[2] = prom_collector_registry_must_register_metric(prom_counter_new(
"turn_rtt_client_le_100ms", "Represents measured round trip time of client with channel", 0, NULL));
turn_rtt_client[3] = prom_collector_registry_must_register_metric(prom_counter_new(
"turn_rtt_client_le_200ms", "Represents measured round trip time of client with channel", 0, NULL));
turn_rtt_client[4] = prom_collector_registry_must_register_metric(prom_counter_new(
"turn_rtt_client_le_400ms", "Represents measured round trip time of client with channel", 0, NULL));
turn_rtt_client[5] = prom_collector_registry_must_register_metric(prom_counter_new(
"turn_rtt_client_le_800ms", "Represents measured round trip time of client with channel", 0, NULL));
turn_rtt_client[6] = prom_collector_registry_must_register_metric(prom_counter_new(
"turn_rtt_client_le_1500ms", "Represents measured round trip time of client with channel", 0, NULL));
turn_rtt_client[7] = prom_collector_registry_must_register_metric(
prom_counter_new("turn_rtt_client_more", "Represents measured round trip time of client with channel", 0, NULL));
@ -241,19 +240,16 @@ void prom_set_finished_traffic(const char *realm, const char *user, unsigned lon
void prom_inc_allocation(SOCKET_TYPE type, int addr_family) {
if (turn_params.prometheus == 1) {
const char *labels[] = {socket_type_name(type), addr_family_name(addr_family) };
const char *labels[] = {socket_type_name(type), addr_family_name(addr_family)};
prom_gauge_inc(turn_total_allocations, labels);
}
}
void prom_dec_allocation(SOCKET_TYPE type,
int addr_family,
unsigned long duration,
unsigned long sent_rate_kbps) {
void prom_dec_allocation(SOCKET_TYPE type, int addr_family, unsigned long duration, unsigned long sent_rate_kbps) {
if (turn_params.prometheus == 1) {
const char *labels[] = {socket_type_name(type), addr_family_name(addr_family) };
const char *labels[] = {socket_type_name(type), addr_family_name(addr_family)};
prom_gauge_dec(turn_total_allocations, labels);
const char *total_sessions_labels[] = { duration_name(duration), rate_name(sent_rate_kbps) };
const char *total_sessions_labels[] = {duration_name(duration), rate_name(sent_rate_kbps)};
prom_counter_add(turn_total_sessions, 1, total_sessions_labels);
}
}

View File

@ -70,10 +70,7 @@ void prom_set_finished_traffic(const char *realm, const char *user, unsigned lon
unsigned long sentp, unsigned long sentb, bool peer);
void prom_inc_allocation(SOCKET_TYPE type, int addr_family);
void prom_dec_allocation(SOCKET_TYPE type,
int addr_family,
unsigned long duration,
unsigned long sent_rate_kbps);
void prom_dec_allocation(SOCKET_TYPE type, int addr_family, unsigned long duration, unsigned long sent_rate_kbps);
int is_ipv6_enabled(void);

View File

@ -81,6 +81,9 @@
#include "tls_listener.h"
// Signal change to add rtt metrics
#include <fcntl.h>
#include <unistd.h>
///////////////////////////////
struct admin_server adminserver;
@ -1358,6 +1361,11 @@ void setup_admin_thread(void) {
}
adminserver.sessions = ur_map_create();
// Signal change to add rtt metrics
// run once a day
adminserver.rtt_ev =
set_ioa_timer(adminserver.e, 86400, 0, admin_server_rtt_timer_handler, NULL, 1, "admin_server_rtt_timer");
}
void admin_server_receive_message(struct bufferevent *bev, void *ptr) {
@ -3735,3 +3743,82 @@ void send_https_socket(ioa_socket_handle s) {
}
///////////////////////////////
// Signal change to add rtt metrics
ur_map *rtt_maps[1 + ((turnserver_id)-1)] = {0};
size_t rtt_maps_count = 0;
size_t rtt_map_current = 0;
FILE *rtt_file;
int rtt_foreach(ur_map_key_type key, ur_map_value_type value) {
if (!value) {
return 0;
}
ur_map_value_type min = value;
for (size_t i = rtt_map_current + 1; i < rtt_maps_count; ++i) {
if (ur_map_get(rtt_maps[i], key, &value)) {
ur_map_put(rtt_maps[i], key, 0);
if (value && value < min) {
min = value;
}
}
}
// value is stored as measured rtt in ms + 1
value -= 1;
char saddr[INET6_ADDRSTRLEN] = "\0";
if (key & (1L << 63)) {
struct sockaddr_in6 addr = {0};
addr.sin6_family = AF_INET6;
size_t i = 6;
while (i--) {
addr.sin6_addr.s6_addr[i] = key & 0xFF;
key >>= 8;
}
inet_ntop(AF_INET6, &addr.sin6_addr, saddr, sizeof(saddr));
fprintf(rtt_file, "%s/48,%ld\n", saddr, min);
} else {
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET;
key <<= 8;
addr.sin_addr.s_addr = htonl(key & 0xFFFFFFFF);
inet_ntop(AF_INET, &addr.sin_addr, saddr, sizeof(saddr));
fprintf(rtt_file, "%s/24,%ld\n", saddr, min);
}
return 0;
}
void admin_server_rtt_timer_handler(ioa_engine *engine, void *arg) {
UNUSED_ARG(engine);
UNUSED_ARG(arg);
int fd = open("/var/tmp/rtt_dump.tmp", O_WRONLY | O_CREAT | O_NOFOLLOW | O_TRUNC, S_IRUSR | S_IWUSR);
if (fd == -1) {
TURN_LOG_FUNC(TURN_LOG_LEVEL_ERROR, "error opening temporary file during rtt timer (%d)\n", errno);
return;
}
rtt_file = fdopen(fd, "w");
if (rtt_file == NULL) {
TURN_LOG_FUNC(TURN_LOG_LEVEL_ERROR, "fdopen error during rtt timer\n");
close(fd);
return;
}
fprintf(rtt_file, "network,rtt_ms\n");
rtt_maps_count = cycle_rtt_ms_maps(rtt_maps, sizeof(rtt_maps) / sizeof(rtt_maps[0]));
for (rtt_map_current = 0; rtt_map_current < rtt_maps_count; ++rtt_map_current) {
ur_map_foreach(rtt_maps[rtt_map_current], rtt_foreach);
ur_map_free(&rtt_maps[rtt_map_current]);
}
if (fflush(rtt_file) != 0) {
TURN_LOG_FUNC(TURN_LOG_LEVEL_ERROR, "fflush /var/tmp/rtt_dump.tmp failed, not relinking (%d)\n", errno);
} else {
if (rename("/var/tmp/rtt_dump.tmp", "/var/tmp/rtt_dump") == -1) {
TURN_LOG_FUNC(TURN_LOG_LEVEL_ERROR, "could not rename rtt dump into /var/tmp/rtt_dump (%d)\n", errno);
}
}
fclose(rtt_file);
rtt_file = NULL;
}

View File

@ -73,6 +73,8 @@ struct admin_server {
struct bufferevent *https_out_buf;
ur_map *sessions;
pthread_t thr;
// Signal change to add rtt metrics
ioa_timer_handle rtt_ev;
};
///////////////////////////////////////////
@ -113,6 +115,10 @@ void https_admin_server_receive_message(struct bufferevent *bev, void *ptr);
int send_turn_session_info(struct turn_session_info *tsi);
void send_https_socket(ioa_socket_handle s);
// Signal change to add rtt metrics
int rtt_foreach(ur_map_key_type, ur_map_value_type);
void admin_server_rtt_timer_handler(ioa_engine *, void *);
////////////////////////////////////////////
#ifdef __cplusplus

View File

@ -2884,7 +2884,8 @@ static int handle_turn_binding(turn_turnserver *server, ts_ur_super_session *ss,
// Signal change to add rtt metrics
/////////////// inspect relayed packets, they might be ICE binds ///////////////
static void inspect_binds (ioa_net_data *in_buffer, turn_permission_info *tinfo, int from_peer, int is_channel) {
static void inspect_binds(turn_turnserver *server, ioa_net_data *in_buffer, turn_permission_info *tinfo, int from_peer,
int is_channel) {
if (!in_buffer || !tinfo || !(from_peer == 0 || from_peer == 1)) {
return;
}
@ -2909,7 +2910,6 @@ static void inspect_binds (ioa_net_data *in_buffer, turn_permission_info *tinfo,
from_client = 1;
}
if (tinfo->pings[from_client].ts.tv_sec == 0) {
return;
}
@ -2931,17 +2931,42 @@ static void inspect_binds (ioa_net_data *in_buffer, turn_permission_info *tinfo,
#if !defined(TURN_NO_PROMETHEUS)
if (is_channel) {
if (from_client) {
prom_observe_rtt_client(diffus);
} else {
prom_observe_rtt_peer(diffus);
}
if (tinfo->pings[from_peer].lastrttus > 0) {
prom_observe_rtt_combined(diffus + tinfo->pings[from_peer].lastrttus );
}
if (from_client) {
prom_observe_rtt_client(diffus);
} else {
prom_observe_rtt_peer(diffus);
}
if (tinfo->pings[from_peer].lastrttus > 0) {
prom_observe_rtt_combined(diffus + tinfo->pings[from_peer].lastrttus);
}
}
#endif
ur_map_key_type key = 0;
// add one to value to differentiate from zero
ur_map_value_type diffms = diffus / 1000 + 1;
if (in_buffer->src_addr.ss.sa_family == AF_INET) {
key = ntohl(((struct sockaddr_in *)&in_buffer->src_addr)->sin_addr.s_addr);
key >>= 8; // keep only the top 24 bits
} else if (in_buffer->src_addr.ss.sa_family == AF_INET6) {
// use the high 6 bytes (48 bits)
for (int i = 0; i < 6; ++i) {
key <<= 8;
key |= ((struct sockaddr_in6 *)&in_buffer->src_addr)->sin6_addr.s6_addr[i];
}
key |= (1L << 63);
}
// explicitly copy map pointer in case of concurrent access
ur_map *map = server->rtt_ms_mins;
ur_map_lock(map);
ur_map_value_type value = 0;
ur_map_get(map, key, &value);
if (value == 0 || diffms < value) {
ur_map_put(map, key, diffms);
}
ur_map_unlock(map);
}
}
// don't process retransmited responses
@ -3034,7 +3059,7 @@ static int handle_turn_send(turn_turnserver *server, ts_ur_super_session *ss, in
ioa_network_buffer_set_size(nbh, len);
}
// Signal change to add rtt metrics
inspect_binds(in_buffer, tinfo, 0, 0);
inspect_binds(server, in_buffer, tinfo, 0, 0);
ioa_network_buffer_header_init(nbh);
int skip = 0;
@ -4095,8 +4120,9 @@ static int write_to_peerchannel(ts_ur_super_session *ss, uint16_t chnum, ioa_net
ioa_network_buffer_get_size(in_buffer->nbh) - STUN_CHANNEL_HEADER_LENGTH);
// Signal change to add rtt metrics
turn_turnserver *server = (turn_turnserver *)ss->server;
turn_permission_info *tinfo = (turn_permission_info *)(chn->owner);
inspect_binds(in_buffer, tinfo, 0, 1);
inspect_binds(server, in_buffer, tinfo, 0, 1);
ioa_network_buffer_header_init(nbh);
@ -4792,7 +4818,7 @@ static void peer_input_handler(ioa_socket_handle s, int event_type, ioa_net_data
if (tinfo) {
chnum = get_turn_channel_number(tinfo, &(in_buffer->src_addr));
// Signal change to add rtt metrics
inspect_binds(in_buffer, tinfo, 1, chnum != 0);
inspect_binds(server, in_buffer, tinfo, 1, chnum != 0);
} else if (!(server->server_relay)) {
return;
}
@ -4981,6 +5007,9 @@ void init_turn_server(turn_turnserver *server, turnserver_id id, int verbose, io
server->response_origin_only_with_rfc5780 = response_origin_only_with_rfc5780;
server->respond_http_unsupported = respond_http_unsupported;
// Signal change to add rtt metrics
server->rtt_ms_mins = ur_map_create();
}
ioa_engine_handle turn_server_get_engine(turn_turnserver *s) {

View File

@ -67,7 +67,15 @@ extern int TURN_MAX_ALLOCATE_TIMEOUT_STUN_ONLY;
typedef uint8_t turnserver_id;
enum _MESSAGE_TO_RELAY_TYPE { RMT_UNKNOWN = 0, RMT_SOCKET, RMT_CB_SOCKET, RMT_MOBILE_SOCKET, RMT_CANCEL_SESSION };
// Signal change to add rtt metrics
enum _MESSAGE_TO_RELAY_TYPE {
RMT_UNKNOWN = 0,
RMT_SOCKET,
RMT_CB_SOCKET,
RMT_MOBILE_SOCKET,
RMT_CANCEL_SESSION,
RMT_CYCLE_RTT_MAP
};
typedef enum _MESSAGE_TO_RELAY_TYPE MESSAGE_TO_RELAY_TYPE;
///////// ALLOCATION DEFAULT ADDRESS FAMILY TYPES /////////////////////
@ -198,6 +206,10 @@ struct _turn_turnserver {
/* Return an HTTP 400 response to HTTP connections made to ports not
otherwise handling HTTP. */
vintp respond_http_unsupported;
// Signal change to add rtt metrics
/* measured round trip minimums per network */
ur_map *rtt_ms_mins;
};
const char *get_version(turn_turnserver *server);