Move some message retrieval metrics to MessageMetrics

This commit is contained in:
Ravi Khadiwala 2026-05-26 17:02:45 -05:00 committed by ravi-signal
parent 660011017d
commit c50b4e52b0
2 changed files with 58 additions and 29 deletions

View File

@ -8,9 +8,11 @@ package org.whispersystems.textsecuregcm.metrics;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.time.Instant;
@ -23,6 +25,7 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
import org.whispersystems.textsecuregcm.websocket.WebSocketConnection;
public final class MessageMetrics {
@ -32,11 +35,23 @@ public final class MessageMetrics {
"mismatchedAccountEnvelopeUuid");
public static final String DELIVERY_LATENCY_TIMER_NAME = name(MessageMetrics.class, "deliveryLatency");
private static final String SEND_MESSAGE_DURATION_TIMER_NAME = name(WebSocketConnection.class, "sendMessageDuration");
private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class, "initialQueueLength");
private static final String INITIAL_QUEUE_DRAIN_TIMER_NAME = name(WebSocketConnection.class, "drainInitialQueue");
private static final String SLOW_QUEUE_DRAIN_COUNTER_NAME = name(WebSocketConnection.class, "slowQueueDrain");
private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement");
private static final Duration SLOW_DRAIN_THRESHOLD = Duration.ofSeconds(10);
private final MeterRegistry metricRegistry;
private final Counter sendMessageCounter;
private final Counter bytesSentCounter;
@VisibleForTesting
MessageMetrics(final MeterRegistry metricRegistry) {
this.metricRegistry = metricRegistry;
sendMessageCounter = metricRegistry.counter(name(WebSocketConnection.class, "sendMessage"));
bytesSentCounter = metricRegistry.counter(name(WebSocketConnection.class, "bytesSent"));
}
public MessageMetrics() {
@ -86,4 +101,39 @@ public final class MessageMetrics {
.register(metricRegistry)
.record(Duration.between(Instant.ofEpochMilli(serverTimestamp), Instant.now()));
}
public void measureMessageStreamDisplaced(
final String channel,
final Tags platformTag,
final boolean connectedElsewhere) {
metricRegistry.counter(
DISPLACEMENT_COUNTER_NAME,
platformTag
.and("connectedElsewhere", Boolean.toString(connectedElsewhere))
.and("channel", channel))
.increment();
}
public void measureQueueDrain(
final String channel,
final Tags platformTag,
final long messagesDrained,
final Timer.Sample drainStart) {
metricRegistry.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, platformTag.and("channel", channel)).record(messagesDrained);
final long drainDurationNanos = drainStart.stop(metricRegistry.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, platformTag));
if (Duration.ofNanos(drainDurationNanos).compareTo(SLOW_DRAIN_THRESHOLD) > 0) {
metricRegistry.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, platformTag.and("channel", channel)).increment();
}
}
public void measureMessageSent(long messageByteLength) {
sendMessageCounter.increment();
bytesSentCounter.increment(messageByteLength);
}
public void measureSendMessageDuration(final String channel, final Tags platformTag, final Timer.Sample sample) {
sample.stop(Timer.builder(SEND_MESSAGE_DURATION_TIMER_NAME)
.tags(platformTag.and("channel", channel))
.register(metricRegistry));
}
}

View File

@ -62,30 +62,19 @@ import javax.annotation.Nullable;
public class WebSocketConnection implements DisconnectionRequestListener {
private static final Counter sendMessageCounter = Metrics.counter(name(WebSocketConnection.class, "sendMessage"));
private static final Counter bytesSentCounter = Metrics.counter(name(WebSocketConnection.class, "bytesSent"));
private static final Counter sendFailuresCounter = Metrics.counter(name(WebSocketConnection.class, "sendFailures"));
private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class,
"initialQueueLength");
private static final String INITIAL_QUEUE_DRAIN_TIMER_NAME = name(WebSocketConnection.class, "drainInitialQueue");
private static final String SLOW_QUEUE_DRAIN_COUNTER_NAME = name(WebSocketConnection.class, "slowQueueDrain");
private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement");
private static final String NON_SUCCESS_RESPONSE_COUNTER_NAME = name(WebSocketConnection.class,
"clientNonSuccessResponse");
private static final String SEND_MESSAGES_FLUX_NAME = MetricsUtil.name(WebSocketConnection.class,
"sendMessages");
private static final String SEND_MESSAGE_ERROR_COUNTER = MetricsUtil.name(WebSocketConnection.class,
"sendMessageError");
private static final String SEND_MESSAGE_DURATION_TIMER_NAME = name(WebSocketConnection.class, "sendMessageDuration");
private static final String STATUS_CODE_TAG = "status";
private static final String STATUS_MESSAGE_TAG = "message";
private static final String ERROR_TYPE_TAG = "errorType";
private static final String EXCEPTION_TYPE_TAG = "exceptionType";
private static final String CONNECTED_ELSEWHERE_TAG = "connectedElsewhere";
private static final Duration SLOW_DRAIN_THRESHOLD = Duration.ofSeconds(10);
@VisibleForTesting
static final int MESSAGE_PUBLISHER_LIMIT_RATE = 100;
@ -118,7 +107,8 @@ public class WebSocketConnection implements DisconnectionRequestListener {
private final ClientReleaseManager clientReleaseManager;
public WebSocketConnection(final ReceiptSender receiptSender,
public WebSocketConnection(
final ReceiptSender receiptSender,
final MessagesManager messagesManager,
final MessageMetrics messageMetrics,
final PushNotificationManager pushNotificationManager,
@ -153,7 +143,7 @@ public class WebSocketConnection implements DisconnectionRequestListener {
public void start() {
pushNotificationManager.handleMessagesRetrieved(authenticatedAccount, authenticatedDevice, client.getUserAgent());
final long queueDrainStartNanos = System.nanoTime();
final Timer.Sample queueDrainStart = Timer.start();
final AtomicBoolean hasSentFirstMessage = new AtomicBoolean();
messageSubscription.set(JdkFlowAdapter.flowPublisherToFlux(messageStream.getMessages())
@ -165,7 +155,7 @@ public class WebSocketConnection implements DisconnectionRequestListener {
// process messages before sending the "connected elsewhere" signal, and while that's ultimately not harmful,
// it's also not ideal.
.doOnError(ConflictingMessageConsumerException.class, _ -> {
Metrics.counter(DISPLACEMENT_COUNTER_NAME, platformTag.and(CONNECTED_ELSEWHERE_TAG, "true")).increment();
messageMetrics.measureMessageStreamDisplaced("websocket", platformTag, true);
client.close(4409, "Connected elsewhere");
})
.doOnNext(entry -> {
@ -187,15 +177,7 @@ public class WebSocketConnection implements DisconnectionRequestListener {
.subscribe(
entry -> {
if (entry instanceof MessageStreamEntry.QueueEmpty) {
final Duration drainDuration = Duration.ofNanos(System.nanoTime() - queueDrainStartNanos);
Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, platformTag).record(sentMessageCounter.sum());
Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, platformTag).record(drainDuration);
if (drainDuration.compareTo(SLOW_DRAIN_THRESHOLD) > 0) {
Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, platformTag).increment();
}
messageMetrics.measureQueueDrain("websocket", platformTag, sentMessageCounter.sum(), queueDrainStart);
client.sendRequest("PUT", "/api/v1/queue/empty",
Collections.singletonList(HeaderUtils.getTimestampHeader()), Optional.empty());
}
@ -243,10 +225,9 @@ public class WebSocketConnection implements DisconnectionRequestListener {
final Optional<byte[]> body = Optional.of(serializeMessage(message));
sendMessageCounter.increment();
sentMessageCounter.increment();
bytesSentCounter.increment(body.map(bytes -> bytes.length).orElse(0));
messageMetrics.measureAccountEnvelopeUuidMismatches(authenticatedAccount, message);
messageMetrics.measureMessageSent(body.map(bytes -> bytes.length).orElse(0));
// Retain only the parts of the message we need to avoid retaining the whole `Envelope` in memory longer than
// necessary
@ -316,9 +297,7 @@ public class WebSocketConnection implements DisconnectionRequestListener {
return result;
})
.thenRun(() -> sample.stop(Timer.builder(SEND_MESSAGE_DURATION_TIMER_NAME)
.tags(platformTag)
.register(Metrics.globalRegistry)));
.thenRun(() -> messageMetrics.measureSendMessageDuration("websocket", platformTag, sample));
}
@VisibleForTesting
@ -357,7 +336,7 @@ public class WebSocketConnection implements DisconnectionRequestListener {
@Override
public void handleDisconnectionRequest() {
Metrics.counter(DISPLACEMENT_COUNTER_NAME, platformTag.and(CONNECTED_ELSEWHERE_TAG, "false")).increment();
messageMetrics.measureMessageStreamDisplaced("websocket", platformTag, false);
client.close(4401, "Reauthentication required");
}
}