Simplify MessagesCache#getEarliestUndeliveredTimestamp

This commit is contained in:
Jon Chambers 2026-03-17 11:54:41 -04:00 committed by Jon Chambers
parent 81f2499889
commit 69d9fc12ca

View File

@ -359,10 +359,26 @@ public class MessagesCache {
.tap(Micrometer.metrics(Metrics.globalRegistry));
}
public Mono<Long> getEarliestUndeliveredTimestamp(final UUID destinationUuid, final byte destinationDevice) {
return getAllMessages(destinationUuid, destinationDevice, -1, 1, true)
/// Returns the server-issued timestamp of the earliest message in the queue for the given account/device, regardless
/// of the type of message. Note that this method may return a timestamp for a message that would ultimately be
/// discarded were a caller to actually fetch messages (i.e. an expired ephemeral message).
///
/// @param destinationUuid the account identifier for the queue for which to get a timestamp
/// @param destinationDeviceId the device identifier for the queue for which to get a timestamp
///
/// @return a `Mono` that publishes the timestamp (in milliseconds since the epoch) of the earliest message in the
/// given queue or yields no value if the queue is empty or does not exist
public Mono<Long> getEarliestUndeliveredTimestamp(final UUID destinationUuid, final byte destinationDeviceId) {
return redisCluster.withBinaryCluster(connection ->
connection.reactive().zrange(getMessageQueueKey(destinationUuid, destinationDeviceId), 0, 0))
.next()
.map(MessageProtos.Envelope::getServerTimestamp);
.flatMap(serializedEnvelope -> {
try {
return Mono.just(parseEnvelope(serializedEnvelope).getServerTimestamp());
} catch (final InvalidProtocolBufferException e) {
return Mono.error(e);
}
});
}
private static boolean isStaleEphemeralMessage(final MessageProtos.Envelope message,