Retire unused methods in MessagesManager
This commit is contained in:
parent
4c0f454209
commit
cc6ef53f91
@ -350,7 +350,7 @@ public class MessagePersister implements Managed {
|
||||
cachedMessageBytes.set(estimatedPersistedQueueSize);
|
||||
targetDeleteBytes.set(Math.round(estimatedPersistedQueueSize * extraRoomRatio));
|
||||
|
||||
return Flux.from(messagesManager.getMessagesForDeviceReactive(aci, device, false))
|
||||
return Flux.from(messagesManager.getMessagesForDevice(aci, device))
|
||||
.concatMap(envelope -> {
|
||||
if (bytesDeleted.getAndAdd(envelope.getSerializedSize()) >= targetDeleteBytes.get()) {
|
||||
return Mono.just(Optional.<MessageProtos.Envelope>empty());
|
||||
|
||||
@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import javax.annotation.Nullable;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
|
||||
import org.slf4j.Logger;
|
||||
@ -33,7 +32,6 @@ import org.whispersystems.textsecuregcm.identity.IdentityType;
|
||||
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager;
|
||||
import org.whispersystems.textsecuregcm.util.Pair;
|
||||
import org.whispersystems.textsecuregcm.util.UUIDUtil;
|
||||
import reactor.core.observability.micrometer.Micrometer;
|
||||
import reactor.core.publisher.Flux;
|
||||
@ -41,7 +39,6 @@ import reactor.core.publisher.Mono;
|
||||
|
||||
public class MessagesManager {
|
||||
|
||||
private static final int RESULT_SET_CHUNK_SIZE = 100;
|
||||
final String GET_MESSAGES_FOR_DEVICE_FLUX_NAME = name(MessagesManager.class, "getMessagesForDevice");
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MessagesManager.class);
|
||||
@ -217,34 +214,14 @@ public class MessagesManager {
|
||||
return messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice);
|
||||
}
|
||||
|
||||
public Mono<Pair<List<Envelope>, Boolean>> getMessagesForDevice(UUID destinationUuid, Device destinationDevice,
|
||||
boolean cachedMessagesOnly) {
|
||||
|
||||
return Flux.from(
|
||||
getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly))
|
||||
.take(RESULT_SET_CHUNK_SIZE)
|
||||
.collectList()
|
||||
.map(envelopes -> new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE));
|
||||
}
|
||||
|
||||
public Publisher<Envelope> getMessagesForDeviceReactive(UUID destinationUuid, Device destinationDevice,
|
||||
final boolean cachedMessagesOnly) {
|
||||
|
||||
return getMessagesForDevice(destinationUuid, destinationDevice, null, cachedMessagesOnly);
|
||||
}
|
||||
|
||||
public MessageStream getMessages(final UUID destinationUuid, final Device destinationDevice) {
|
||||
return new RedisDynamoDbMessageStream(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, destinationUuid, destinationDevice);
|
||||
}
|
||||
|
||||
private Publisher<Envelope> getMessagesForDevice(UUID destinationUuid, Device destinationDevice,
|
||||
@Nullable Integer limit, final boolean cachedMessagesOnly) {
|
||||
|
||||
final Publisher<Envelope> dynamoPublisher =
|
||||
cachedMessagesOnly ? Flux.empty() : messagesDynamoDb.load(destinationUuid, destinationDevice, limit);
|
||||
final Publisher<Envelope> cachePublisher = messagesCache.get(destinationUuid, destinationDevice.getId());
|
||||
|
||||
return Flux.concat(dynamoPublisher, cachePublisher)
|
||||
Publisher<Envelope> getMessagesForDevice(final UUID destinationUuid, final Device destinationDevice) {
|
||||
return Flux.concat(
|
||||
messagesDynamoDb.load(destinationUuid, destinationDevice, null),
|
||||
messagesCache.get(destinationUuid, destinationDevice.getId()))
|
||||
.name(GET_MESSAGES_FOR_DEVICE_FLUX_NAME)
|
||||
.tap(Micrometer.metrics(Metrics.globalRegistry));
|
||||
}
|
||||
|
||||
@ -504,7 +504,7 @@ class MessagePersisterTest {
|
||||
|
||||
when(messagesManager.persistMessages(any(UUID.class), any(), anyList()))
|
||||
.thenThrow(ItemCollectionSizeLimitExceededException.builder().build());
|
||||
when(messagesManager.getMessagesForDeviceReactive(DESTINATION_ACCOUNT_UUID, primary, false))
|
||||
when(messagesManager.getMessagesForDevice(DESTINATION_ACCOUNT_UUID, primary))
|
||||
.thenReturn(Flux.concat(
|
||||
Flux.fromIterable(persistedMessages),
|
||||
Flux.fromIterable(cachedMessages)));
|
||||
|
||||
Loading…
Reference in New Issue
Block a user