Add basic read/acknowledgement counters to RedisDynamoDbMessageStream

This commit is contained in:
Jon Chambers 2026-06-24 11:54:28 -04:00 committed by Jon Chambers
parent 4743abcfbd
commit 808bb16103

View File

@ -9,10 +9,14 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import com.google.common.annotations.VisibleForTesting;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.adapter.JdkFlowAdapter;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
/// A [MessageStream] implementation that produces message from a joint DynamoDB/Redis message store.
public class RedisDynamoDbMessageStream implements MessageStream {
@ -25,6 +29,14 @@ public class RedisDynamoDbMessageStream implements MessageStream {
private final RedisDynamoDbMessagePublisher messagePublisher;
private static final String MESSAGE_READ_COUNTER_NAME =
name(RedisDynamoDbMessageStream.class, "messagesRead");
private static final String MESSAGE_ACKNOWLEDGED_COUNTER_NAME =
name(RedisDynamoDbMessageStream.class, "messagesAcknowledged");
private static final String UUID_VERSION_TAG = "uuidVersion";
public RedisDynamoDbMessageStream(final MessagesDynamoDb messagesDynamoDb,
final MessagesCache messagesCache,
final RedisMessageAvailabilityManager redisMessageAvailabilityManager,
@ -54,11 +66,22 @@ public class RedisDynamoDbMessageStream implements MessageStream {
@Override
public Flow.Publisher<MessageStreamEntry> getMessages() {
return messagePublisher;
return JdkFlowAdapter.publisherToFlowPublisher(JdkFlowAdapter.flowPublisherToFlux(messagePublisher)
.doOnNext(messageStreamEntry -> {
if (messageStreamEntry instanceof MessageStreamEntry.Envelope(org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope message)) {
final UUID messageGuid = UUIDUtil.fromByteString(message.getServerGuid());
Metrics.counter(MESSAGE_READ_COUNTER_NAME, UUID_VERSION_TAG, String.valueOf(messageGuid.version()))
.increment();
}
}));
}
@Override
public CompletableFuture<Void> acknowledgeMessage(final UUID messageGuid, final long serverTimestamp) {
Metrics.counter(MESSAGE_ACKNOWLEDGED_COUNTER_NAME, UUID_VERSION_TAG, String.valueOf(messageGuid.version()))
.increment();
return messagesCache.remove(accountIdentifier, device.getId(), messageGuid)
.thenCompose(removed -> removed.map(_ -> CompletableFuture.<Void>completedFuture(null))
.orElseGet(() ->