From 5a0c39d429f066b136cd5649b455253b6a9f45f4 Mon Sep 17 00:00:00 2001 From: gram-signal <84339875+gram-signal@users.noreply.github.com> Date: Thu, 10 Mar 2022 11:58:28 -0700 Subject: [PATCH] Measure time spent for sending/receiving threads in HostQueue. --- .../signal/hsmenclave/queue/HostQueue.java | 37 +++++++++++++++++-- .../org/signal/hsmenclave/util/NanoDiff.java | 17 +++++++++ .../hsmenclave/queue/HostQueueTest.java | 3 ++ 3 files changed, 53 insertions(+), 4 deletions(-) create mode 100644 service/src/main/java/org/signal/hsmenclave/util/NanoDiff.java diff --git a/service/src/main/java/org/signal/hsmenclave/queue/HostQueue.java b/service/src/main/java/org/signal/hsmenclave/queue/HostQueue.java index 6f85cb5..924de6f 100644 --- a/service/src/main/java/org/signal/hsmenclave/queue/HostQueue.java +++ b/service/src/main/java/org/signal/hsmenclave/queue/HostQueue.java @@ -5,8 +5,11 @@ package org.signal.hsmenclave.queue; +import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Metrics; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import jakarta.inject.Singleton; import java.util.ArrayList; import java.util.List; @@ -21,6 +24,7 @@ import org.signal.hsmenclave.queue.message.Response; import org.signal.hsmenclave.queue.message.Response.Error; import org.signal.hsmenclave.queue.message.Response.ResetHsm; import org.signal.hsmenclave.queue.message.Response.ResponseMessages; +import org.signal.hsmenclave.util.NanoDiff; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,12 +57,15 @@ public class HostQueue { this.running = true; Gauge.builder("HostQueue.pendingRequestQueueSize", pendingRequests, BlockingQueue::size) - .register(Metrics.globalRegistry); + .register(Metrics.globalRegistry); Gauge.builder("HostQueue.pendingRefsQueueSize", pendingRefs, BlockingQueue::size) - .register(Metrics.globalRegistry); + .register(Metrics.globalRegistry); Gauge.builder("HostQueue.pendingResponsesQueueSize", pendingResponses, BlockingQueue::size) - .register(Metrics.globalRegistry); + .register(Metrics.globalRegistry); + } + @PostConstruct + void start() { this.senderThread.start(); this.receiverThread.start(); } @@ -79,6 +86,7 @@ public class HostQueue { this.receiverThread.interrupt(); } + @PreDestroy public void stop() throws InterruptedException { requestStop(); this.senderThread.join(); @@ -96,7 +104,12 @@ public class HostQueue { }); } + private static final Counter requestsReceiving = Metrics.counter("HostQueue.requestThread.receivingNanos"); + private static final Counter requestsSending = Metrics.counter("HostQueue.requestThread.sendingNanos"); + private static final Counter requestsQueueing = Metrics.counter("HostQueue.requestThread.queueingNanos"); + private void processRequests() { + NanoDiff nanoDiff = new NanoDiff(); while (running) { final Request request; final CompletableFuture> future; @@ -109,23 +122,34 @@ public class HostQueue { } catch (final InterruptedException e) { continue; } + requestsReceiving.increment(nanoDiff.nanosDelta()); try { logger.trace("Sending {}", request); final Object ref = connection.send(request.toByteArray()); + requestsSending.increment(nanoDiff.nanosDelta()); logger.trace("Got ref {}, {} before me", ref, this.pendingRefs.size()); this.pendingRefs.put(ref); if (future != null) { this.pendingResponses.put(future); } + requestsQueueing.increment(nanoDiff.nanosDelta()); } catch (Exception e) { logger.warn("Error processing request", e); future.completeExceptionally(e); - continue; + nanoDiff.nanosDelta(); } } } + + private static final Counter responsesReceivingRequest = Metrics.counter("HostQueue.responseThread.receivingRequestNanos"); + private static final Counter responsesReceivingRef = Metrics.counter("HostQueue.responseThread.receivingRefNanos"); + private static final Counter responsesReceivingMessage = Metrics.counter("HostQueue.responseThread.receivingMessageNanos"); + private static final Counter responsesPushingNulls = Metrics.counter("HostQueue.responseThread.pushingNullsNanos"); + private static final Counter responsesCompletingFutures = Metrics.counter("HostQueue.responseThread.completingFuturesNanos"); + private void processResponses() { + NanoDiff nanoDiff = new NanoDiff(); while (running) { final CompletableFuture> future; try { @@ -133,6 +157,7 @@ public class HostQueue { } catch (final InterruptedException e) { continue; } + responsesReceivingRequest.increment(nanoDiff.nanosDelta()); final List responses = new ArrayList<>(); try { @@ -144,9 +169,11 @@ public class HostQueue { } catch (InterruptedException e) { continue; } + responsesReceivingRef.increment(nanoDiff.nanosDelta()); logger.trace("Waiting for {}", ref); final Response response = Response.fromBytes(connection.receive(ref)); logger.trace("Received {}", response); + responsesReceivingMessage.increment(nanoDiff.nanosDelta()); if (response instanceof ResponseMessages) { expectedResponses = ((ResponseMessages) response).getResponseMessageCount(); for (int i = 0; i < expectedResponses; i++) { @@ -159,6 +186,7 @@ public class HostQueue { break; } } + responsesPushingNulls.increment(nanoDiff.nanosDelta()); } else { responses.add(response); expectedResponses--; @@ -187,6 +215,7 @@ public class HostQueue { logger.trace("Responding with {}", responses); future.complete(responses); + responsesCompletingFutures.increment(nanoDiff.nanosDelta()); } } } diff --git a/service/src/main/java/org/signal/hsmenclave/util/NanoDiff.java b/service/src/main/java/org/signal/hsmenclave/util/NanoDiff.java new file mode 100644 index 0000000..c0e5b3b --- /dev/null +++ b/service/src/main/java/org/signal/hsmenclave/util/NanoDiff.java @@ -0,0 +1,17 @@ +/* + * Copyright 2022 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.signal.hsmenclave.util; + +import java.util.concurrent.atomic.AtomicLong; + +public class NanoDiff { + private AtomicLong last = new AtomicLong(System.nanoTime()); + public long nanosDelta() { + Long now = System.nanoTime(); + Long last = this.last.getAndSet(now); + return now - last; + } +} diff --git a/service/src/test/java/org/signal/hsmenclave/queue/HostQueueTest.java b/service/src/test/java/org/signal/hsmenclave/queue/HostQueueTest.java index 7f4f1b8..f34a1c4 100644 --- a/service/src/test/java/org/signal/hsmenclave/queue/HostQueueTest.java +++ b/service/src/test/java/org/signal/hsmenclave/queue/HostQueueTest.java @@ -27,6 +27,7 @@ class HostQueueTest { void testHostJobQueue() throws Exception { OsConnection conn = Mockito.mock(OsConnection.class); HostQueue h = new HostQueue(conn); + h.start(); Request req1 = Request.buildChannelMessageRequest(1, 1, new byte[0]); Response resp1n = new ResponseMessages(0, 0, 3); Response resp11 = new ChannelMessage(1, 2, ByteBuffer.wrap(new byte[]{1})); @@ -61,11 +62,13 @@ class HostQueueTest { void testHsmReturnsInvalidData() throws Exception { OsConnection conn = Mockito.mock(OsConnection.class); HostQueue h = new HostQueue(conn); + h.start(); Request req1 = Request.buildChannelMessageRequest(1, 1, new byte[0]); when(conn.send(Mockito.any())).thenReturn(Long.valueOf(0)); when(conn.receive(Long.valueOf(0))).thenReturn(new byte[0]); final CompletableFuture> j1 = h.sendRequest(req1); final Exception e = assertThrows(ExecutionException.class, j1::get); assertTrue(e.getCause() instanceof IllegalArgumentException); + h.stop(); } }