From cc6ef53f917441c64725bddc4127730b8edd6c86 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Tue, 16 Jun 2026 11:22:22 -0400 Subject: [PATCH] Retire unused methods in `MessagesManager` --- .../storage/MessagePersister.java | 2 +- .../storage/MessagesManager.java | 31 +++---------------- .../storage/MessagePersisterTest.java | 2 +- 3 files changed, 6 insertions(+), 29 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index fa941a713..b33bdb1f5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -350,7 +350,7 @@ public class MessagePersister implements Managed { cachedMessageBytes.set(estimatedPersistedQueueSize); targetDeleteBytes.set(Math.round(estimatedPersistedQueueSize * extraRoomRatio)); - return Flux.from(messagesManager.getMessagesForDeviceReactive(aci, device, false)) + return Flux.from(messagesManager.getMessagesForDevice(aci, device)) .concatMap(envelope -> { if (bytesDeleted.getAndAdd(envelope.getSerializedSize()) >= targetDeleteBytes.get()) { return Mono.just(Optional.empty()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index b58c82729..1296f680e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.IntStream; -import javax.annotation.Nullable; import org.reactivestreams.Publisher; import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage; import org.slf4j.Logger; @@ -33,7 +32,6 @@ import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; -import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.UUIDUtil; import reactor.core.observability.micrometer.Micrometer; import reactor.core.publisher.Flux; @@ -41,7 +39,6 @@ import reactor.core.publisher.Mono; public class MessagesManager { - private static final int RESULT_SET_CHUNK_SIZE = 100; final String GET_MESSAGES_FOR_DEVICE_FLUX_NAME = name(MessagesManager.class, "getMessagesForDevice"); private static final Logger logger = LoggerFactory.getLogger(MessagesManager.class); @@ -217,34 +214,14 @@ public class MessagesManager { return messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice); } - public Mono, Boolean>> getMessagesForDevice(UUID destinationUuid, Device destinationDevice, - boolean cachedMessagesOnly) { - - return Flux.from( - getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly)) - .take(RESULT_SET_CHUNK_SIZE) - .collectList() - .map(envelopes -> new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE)); - } - - public Publisher getMessagesForDeviceReactive(UUID destinationUuid, Device destinationDevice, - final boolean cachedMessagesOnly) { - - return getMessagesForDevice(destinationUuid, destinationDevice, null, cachedMessagesOnly); - } - public MessageStream getMessages(final UUID destinationUuid, final Device destinationDevice) { return new RedisDynamoDbMessageStream(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, destinationUuid, destinationDevice); } - private Publisher getMessagesForDevice(UUID destinationUuid, Device destinationDevice, - @Nullable Integer limit, final boolean cachedMessagesOnly) { - - final Publisher dynamoPublisher = - cachedMessagesOnly ? Flux.empty() : messagesDynamoDb.load(destinationUuid, destinationDevice, limit); - final Publisher cachePublisher = messagesCache.get(destinationUuid, destinationDevice.getId()); - - return Flux.concat(dynamoPublisher, cachePublisher) + Publisher getMessagesForDevice(final UUID destinationUuid, final Device destinationDevice) { + return Flux.concat( + messagesDynamoDb.load(destinationUuid, destinationDevice, null), + messagesCache.get(destinationUuid, destinationDevice.getId())) .name(GET_MESSAGES_FOR_DEVICE_FLUX_NAME) .tap(Micrometer.metrics(Metrics.globalRegistry)); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java index 4729de06e..5cd88e371 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java @@ -504,7 +504,7 @@ class MessagePersisterTest { when(messagesManager.persistMessages(any(UUID.class), any(), anyList())) .thenThrow(ItemCollectionSizeLimitExceededException.builder().build()); - when(messagesManager.getMessagesForDeviceReactive(DESTINATION_ACCOUNT_UUID, primary, false)) + when(messagesManager.getMessagesForDevice(DESTINATION_ACCOUNT_UUID, primary)) .thenReturn(Flux.concat( Flux.fromIterable(persistedMessages), Flux.fromIterable(cachedMessages)));