Measure time spent for sending/receiving threads in HostQueue.
Some checks failed
Service CI / Build and test enclave C (push) Has been cancelled
Service CI / Build and test web service (push) Has been cancelled

This commit is contained in:
gram-signal 2022-03-10 11:58:28 -07:00 committed by GitHub
parent c07855c202
commit 5a0c39d429
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 4 deletions

View File

@ -5,8 +5,11 @@
package org.signal.hsmenclave.queue;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
@ -21,6 +24,7 @@ import org.signal.hsmenclave.queue.message.Response;
import org.signal.hsmenclave.queue.message.Response.Error;
import org.signal.hsmenclave.queue.message.Response.ResetHsm;
import org.signal.hsmenclave.queue.message.Response.ResponseMessages;
import org.signal.hsmenclave.util.NanoDiff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,12 +57,15 @@ public class HostQueue {
this.running = true;
Gauge.builder("HostQueue.pendingRequestQueueSize", pendingRequests, BlockingQueue::size)
.register(Metrics.globalRegistry);
.register(Metrics.globalRegistry);
Gauge.builder("HostQueue.pendingRefsQueueSize", pendingRefs, BlockingQueue::size)
.register(Metrics.globalRegistry);
.register(Metrics.globalRegistry);
Gauge.builder("HostQueue.pendingResponsesQueueSize", pendingResponses, BlockingQueue::size)
.register(Metrics.globalRegistry);
.register(Metrics.globalRegistry);
}
@PostConstruct
void start() {
this.senderThread.start();
this.receiverThread.start();
}
@ -79,6 +86,7 @@ public class HostQueue {
this.receiverThread.interrupt();
}
@PreDestroy
public void stop() throws InterruptedException {
requestStop();
this.senderThread.join();
@ -96,7 +104,12 @@ public class HostQueue {
});
}
private static final Counter requestsReceiving = Metrics.counter("HostQueue.requestThread.receivingNanos");
private static final Counter requestsSending = Metrics.counter("HostQueue.requestThread.sendingNanos");
private static final Counter requestsQueueing = Metrics.counter("HostQueue.requestThread.queueingNanos");
private void processRequests() {
NanoDiff nanoDiff = new NanoDiff();
while (running) {
final Request request;
final CompletableFuture<List<Response>> future;
@ -109,23 +122,34 @@ public class HostQueue {
} catch (final InterruptedException e) {
continue;
}
requestsReceiving.increment(nanoDiff.nanosDelta());
try {
logger.trace("Sending {}", request);
final Object ref = connection.send(request.toByteArray());
requestsSending.increment(nanoDiff.nanosDelta());
logger.trace("Got ref {}, {} before me", ref, this.pendingRefs.size());
this.pendingRefs.put(ref);
if (future != null) {
this.pendingResponses.put(future);
}
requestsQueueing.increment(nanoDiff.nanosDelta());
} catch (Exception e) {
logger.warn("Error processing request", e);
future.completeExceptionally(e);
continue;
nanoDiff.nanosDelta();
}
}
}
private static final Counter responsesReceivingRequest = Metrics.counter("HostQueue.responseThread.receivingRequestNanos");
private static final Counter responsesReceivingRef = Metrics.counter("HostQueue.responseThread.receivingRefNanos");
private static final Counter responsesReceivingMessage = Metrics.counter("HostQueue.responseThread.receivingMessageNanos");
private static final Counter responsesPushingNulls = Metrics.counter("HostQueue.responseThread.pushingNullsNanos");
private static final Counter responsesCompletingFutures = Metrics.counter("HostQueue.responseThread.completingFuturesNanos");
private void processResponses() {
NanoDiff nanoDiff = new NanoDiff();
while (running) {
final CompletableFuture<List<Response>> future;
try {
@ -133,6 +157,7 @@ public class HostQueue {
} catch (final InterruptedException e) {
continue;
}
responsesReceivingRequest.increment(nanoDiff.nanosDelta());
final List<Response> responses = new ArrayList<>();
try {
@ -144,9 +169,11 @@ public class HostQueue {
} catch (InterruptedException e) {
continue;
}
responsesReceivingRef.increment(nanoDiff.nanosDelta());
logger.trace("Waiting for {}", ref);
final Response response = Response.fromBytes(connection.receive(ref));
logger.trace("Received {}", response);
responsesReceivingMessage.increment(nanoDiff.nanosDelta());
if (response instanceof ResponseMessages) {
expectedResponses = ((ResponseMessages) response).getResponseMessageCount();
for (int i = 0; i < expectedResponses; i++) {
@ -159,6 +186,7 @@ public class HostQueue {
break;
}
}
responsesPushingNulls.increment(nanoDiff.nanosDelta());
} else {
responses.add(response);
expectedResponses--;
@ -187,6 +215,7 @@ public class HostQueue {
logger.trace("Responding with {}", responses);
future.complete(responses);
responsesCompletingFutures.increment(nanoDiff.nanosDelta());
}
}
}

View File

@ -0,0 +1,17 @@
/*
* Copyright 2022 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.signal.hsmenclave.util;
import java.util.concurrent.atomic.AtomicLong;
public class NanoDiff {
private AtomicLong last = new AtomicLong(System.nanoTime());
public long nanosDelta() {
Long now = System.nanoTime();
Long last = this.last.getAndSet(now);
return now - last;
}
}

View File

@ -27,6 +27,7 @@ class HostQueueTest {
void testHostJobQueue() throws Exception {
OsConnection conn = Mockito.mock(OsConnection.class);
HostQueue h = new HostQueue(conn);
h.start();
Request req1 = Request.buildChannelMessageRequest(1, 1, new byte[0]);
Response resp1n = new ResponseMessages(0, 0, 3);
Response resp11 = new ChannelMessage(1, 2, ByteBuffer.wrap(new byte[]{1}));
@ -61,11 +62,13 @@ class HostQueueTest {
void testHsmReturnsInvalidData() throws Exception {
OsConnection conn = Mockito.mock(OsConnection.class);
HostQueue h = new HostQueue(conn);
h.start();
Request req1 = Request.buildChannelMessageRequest(1, 1, new byte[0]);
when(conn.send(Mockito.any())).thenReturn(Long.valueOf(0));
when(conn.receive(Long.valueOf(0))).thenReturn(new byte[0]);
final CompletableFuture<List<Response>> j1 = h.sendRequest(req1);
final Exception e = assertThrows(ExecutionException.class, j1::get);
assertTrue(e.getCause() instanceof IllegalArgumentException);
h.stop();
}
}