diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MessageMetrics.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MessageMetrics.java index 05813eaf8..60c1cb99f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MessageMetrics.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MessageMetrics.java @@ -8,9 +8,11 @@ package org.whispersystems.textsecuregcm.metrics; import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; import com.google.common.annotations.VisibleForTesting; +import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.Timer; import java.time.Duration; import java.time.Instant; @@ -23,6 +25,7 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.ClientReleaseManager; +import org.whispersystems.textsecuregcm.websocket.WebSocketConnection; public final class MessageMetrics { @@ -32,11 +35,23 @@ public final class MessageMetrics { "mismatchedAccountEnvelopeUuid"); public static final String DELIVERY_LATENCY_TIMER_NAME = name(MessageMetrics.class, "deliveryLatency"); + private static final String SEND_MESSAGE_DURATION_TIMER_NAME = name(WebSocketConnection.class, "sendMessageDuration"); + + private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class, "initialQueueLength"); + private static final String INITIAL_QUEUE_DRAIN_TIMER_NAME = name(WebSocketConnection.class, "drainInitialQueue"); + private static final String SLOW_QUEUE_DRAIN_COUNTER_NAME = name(WebSocketConnection.class, "slowQueueDrain"); + private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement"); + private static final Duration SLOW_DRAIN_THRESHOLD = Duration.ofSeconds(10); + private final MeterRegistry metricRegistry; + private final Counter sendMessageCounter; + private final Counter bytesSentCounter; @VisibleForTesting MessageMetrics(final MeterRegistry metricRegistry) { this.metricRegistry = metricRegistry; + sendMessageCounter = metricRegistry.counter(name(WebSocketConnection.class, "sendMessage")); + bytesSentCounter = metricRegistry.counter(name(WebSocketConnection.class, "bytesSent")); } public MessageMetrics() { @@ -86,4 +101,39 @@ public final class MessageMetrics { .register(metricRegistry) .record(Duration.between(Instant.ofEpochMilli(serverTimestamp), Instant.now())); } + + public void measureMessageStreamDisplaced( + final String channel, + final Tags platformTag, + final boolean connectedElsewhere) { + metricRegistry.counter( + DISPLACEMENT_COUNTER_NAME, + platformTag + .and("connectedElsewhere", Boolean.toString(connectedElsewhere)) + .and("channel", channel)) + .increment(); + } + + public void measureQueueDrain( + final String channel, + final Tags platformTag, + final long messagesDrained, + final Timer.Sample drainStart) { + metricRegistry.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, platformTag.and("channel", channel)).record(messagesDrained); + final long drainDurationNanos = drainStart.stop(metricRegistry.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, platformTag)); + if (Duration.ofNanos(drainDurationNanos).compareTo(SLOW_DRAIN_THRESHOLD) > 0) { + metricRegistry.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, platformTag.and("channel", channel)).increment(); + } + } + + public void measureMessageSent(long messageByteLength) { + sendMessageCounter.increment(); + bytesSentCounter.increment(messageByteLength); + } + + public void measureSendMessageDuration(final String channel, final Tags platformTag, final Timer.Sample sample) { + sample.stop(Timer.builder(SEND_MESSAGE_DURATION_TIMER_NAME) + .tags(platformTag.and("channel", channel)) + .register(metricRegistry)); + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 4d4c2b8fa..02dd679b8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -62,30 +62,19 @@ import javax.annotation.Nullable; public class WebSocketConnection implements DisconnectionRequestListener { - private static final Counter sendMessageCounter = Metrics.counter(name(WebSocketConnection.class, "sendMessage")); - private static final Counter bytesSentCounter = Metrics.counter(name(WebSocketConnection.class, "bytesSent")); private static final Counter sendFailuresCounter = Metrics.counter(name(WebSocketConnection.class, "sendFailures")); - private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class, - "initialQueueLength"); - private static final String INITIAL_QUEUE_DRAIN_TIMER_NAME = name(WebSocketConnection.class, "drainInitialQueue"); - private static final String SLOW_QUEUE_DRAIN_COUNTER_NAME = name(WebSocketConnection.class, "slowQueueDrain"); - private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement"); private static final String NON_SUCCESS_RESPONSE_COUNTER_NAME = name(WebSocketConnection.class, "clientNonSuccessResponse"); private static final String SEND_MESSAGES_FLUX_NAME = MetricsUtil.name(WebSocketConnection.class, "sendMessages"); private static final String SEND_MESSAGE_ERROR_COUNTER = MetricsUtil.name(WebSocketConnection.class, "sendMessageError"); - private static final String SEND_MESSAGE_DURATION_TIMER_NAME = name(WebSocketConnection.class, "sendMessageDuration"); private static final String STATUS_CODE_TAG = "status"; private static final String STATUS_MESSAGE_TAG = "message"; private static final String ERROR_TYPE_TAG = "errorType"; private static final String EXCEPTION_TYPE_TAG = "exceptionType"; - private static final String CONNECTED_ELSEWHERE_TAG = "connectedElsewhere"; - - private static final Duration SLOW_DRAIN_THRESHOLD = Duration.ofSeconds(10); @VisibleForTesting static final int MESSAGE_PUBLISHER_LIMIT_RATE = 100; @@ -118,7 +107,8 @@ public class WebSocketConnection implements DisconnectionRequestListener { private final ClientReleaseManager clientReleaseManager; - public WebSocketConnection(final ReceiptSender receiptSender, + public WebSocketConnection( + final ReceiptSender receiptSender, final MessagesManager messagesManager, final MessageMetrics messageMetrics, final PushNotificationManager pushNotificationManager, @@ -153,7 +143,7 @@ public class WebSocketConnection implements DisconnectionRequestListener { public void start() { pushNotificationManager.handleMessagesRetrieved(authenticatedAccount, authenticatedDevice, client.getUserAgent()); - final long queueDrainStartNanos = System.nanoTime(); + final Timer.Sample queueDrainStart = Timer.start(); final AtomicBoolean hasSentFirstMessage = new AtomicBoolean(); messageSubscription.set(JdkFlowAdapter.flowPublisherToFlux(messageStream.getMessages()) @@ -165,7 +155,7 @@ public class WebSocketConnection implements DisconnectionRequestListener { // process messages before sending the "connected elsewhere" signal, and while that's ultimately not harmful, // it's also not ideal. .doOnError(ConflictingMessageConsumerException.class, _ -> { - Metrics.counter(DISPLACEMENT_COUNTER_NAME, platformTag.and(CONNECTED_ELSEWHERE_TAG, "true")).increment(); + messageMetrics.measureMessageStreamDisplaced("websocket", platformTag, true); client.close(4409, "Connected elsewhere"); }) .doOnNext(entry -> { @@ -187,15 +177,7 @@ public class WebSocketConnection implements DisconnectionRequestListener { .subscribe( entry -> { if (entry instanceof MessageStreamEntry.QueueEmpty) { - final Duration drainDuration = Duration.ofNanos(System.nanoTime() - queueDrainStartNanos); - - Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, platformTag).record(sentMessageCounter.sum()); - Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, platformTag).record(drainDuration); - - if (drainDuration.compareTo(SLOW_DRAIN_THRESHOLD) > 0) { - Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, platformTag).increment(); - } - + messageMetrics.measureQueueDrain("websocket", platformTag, sentMessageCounter.sum(), queueDrainStart); client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(HeaderUtils.getTimestampHeader()), Optional.empty()); } @@ -243,10 +225,9 @@ public class WebSocketConnection implements DisconnectionRequestListener { final Optional body = Optional.of(serializeMessage(message)); - sendMessageCounter.increment(); sentMessageCounter.increment(); - bytesSentCounter.increment(body.map(bytes -> bytes.length).orElse(0)); messageMetrics.measureAccountEnvelopeUuidMismatches(authenticatedAccount, message); + messageMetrics.measureMessageSent(body.map(bytes -> bytes.length).orElse(0)); // Retain only the parts of the message we need to avoid retaining the whole `Envelope` in memory longer than // necessary @@ -316,9 +297,7 @@ public class WebSocketConnection implements DisconnectionRequestListener { return result; }) - .thenRun(() -> sample.stop(Timer.builder(SEND_MESSAGE_DURATION_TIMER_NAME) - .tags(platformTag) - .register(Metrics.globalRegistry))); + .thenRun(() -> messageMetrics.measureSendMessageDuration("websocket", platformTag, sample)); } @VisibleForTesting @@ -357,7 +336,7 @@ public class WebSocketConnection implements DisconnectionRequestListener { @Override public void handleDisconnectionRequest() { - Metrics.counter(DISPLACEMENT_COUNTER_NAME, platformTag.and(CONNECTED_ELSEWHERE_TAG, "false")).increment(); + messageMetrics.measureMessageStreamDisplaced("websocket", platformTag, false); client.close(4401, "Reauthentication required"); } }