From 45f96cd7020cd8cb722980e7bfc975012f33c796 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 18 Jun 2026 13:49:51 -0400 Subject: [PATCH] Delete messages from FoundationDB asynchronously --- ...onMirroringRedisDynamoDbMessageStream.java | 21 +++++++------------ .../storage/MessagesManager.java | 1 - .../FoundationDbMessageStore.java | 21 ++++++++++--------- ...rroringRedisDynamoDbMessageStreamTest.java | 1 - .../FoundationDbMessageStoreTest.java | 2 +- 5 files changed, 19 insertions(+), 27 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStream.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStream.java index 629867a27..08e3afbef 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStream.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStream.java @@ -7,9 +7,7 @@ package org.whispersystems.textsecuregcm.storage; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; import java.util.concurrent.Flow; -import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; @@ -21,9 +19,7 @@ public class DeletionMirroringRedisDynamoDbMessageStream implements MessageStrea private final RedisDynamoDbMessageStream redisDynamoDbMessageStream; private final FoundationDbMessageStore foundationDbMessageStore; - private final ExperimentEnrollmentManager experimentEnrollmentManager; - private final Executor messageDeletionExecutor; private final AciServiceIdentifier accountIdentifier; private final byte deviceId; @@ -33,15 +29,12 @@ public class DeletionMirroringRedisDynamoDbMessageStream implements MessageStrea public DeletionMirroringRedisDynamoDbMessageStream(final RedisDynamoDbMessageStream redisDynamoDbMessageStream, final FoundationDbMessageStore foundationDbMessageStore, final ExperimentEnrollmentManager experimentEnrollmentManager, - final Executor messageDeletionExecutor, final UUID accountIdentifier, final byte deviceId) { this.redisDynamoDbMessageStream = redisDynamoDbMessageStream; this.foundationDbMessageStore = foundationDbMessageStore; - this.experimentEnrollmentManager = experimentEnrollmentManager; - this.messageDeletionExecutor = messageDeletionExecutor; this.accountIdentifier = new AciServiceIdentifier(accountIdentifier); this.deviceId = deviceId; @@ -59,13 +52,13 @@ public class DeletionMirroringRedisDynamoDbMessageStream implements MessageStrea if (messageGuid.version() == 8 && experimentEnrollmentManager.isEnrolled(accountIdentifier.uuid(), MessagesManager.MIRROR_DELETIONS_EXPERIMENT_NAME)) { - messageDeletionExecutor.execute(() -> { - try { - foundationDbMessageStore.delete(accountIdentifier, deviceId, messageGuid); - } catch (final Exception e) { - logger.warn("Failed to delete message {}/{}/{} from FoundationDb", accountIdentifier.uuid(), deviceId, messageGuid, e); - } - }); + foundationDbMessageStore.delete(accountIdentifier, deviceId, messageGuid) + .whenComplete((_, throwable) -> { + if (throwable != null) { + logger.warn("Failed to delete message {}/{}/{} from FoundationDb", accountIdentifier.uuid(), deviceId, + messageGuid, throwable); + } + }); } return redisDynamoDbMessageStream.acknowledgeMessage(messageGuid, serverTimestamp); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index a1f3e269b..9a49cc0a5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -283,7 +283,6 @@ public class MessagesManager { new RedisDynamoDbMessageStream(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, destinationUuid, destinationDevice), foundationDbMessageStore, experimentEnrollmentManager, - messageDeletionExecutor, destinationUuid, destinationDevice.getId()); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStore.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStore.java index 2b08fbeef..d59cf7949 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStore.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStore.java @@ -326,22 +326,23 @@ public class FoundationDbMessageStore { // Note that this method is intended only for initial migration support; in general, callers should clear messages // by acknowledging messages via a `FoundationDbMessageStream`. - public void delete(final AciServiceIdentifier aci, final byte deviceId, final UUID messageGuid) { - delete(aci, deviceId, versionstampUUIDCipher.decryptVersionstamp(messageGuid, aci.uuid(), deviceId)); + public CompletableFuture delete(final AciServiceIdentifier aci, final byte deviceId, final UUID messageGuid) { + return delete(aci, deviceId, versionstampUUIDCipher.decryptVersionstamp(messageGuid, aci.uuid(), deviceId)); } - private void delete(final AciServiceIdentifier aci, final byte deviceId, final Versionstamp versionstamp) { + private CompletableFuture delete(final AciServiceIdentifier aci, final byte deviceId, final Versionstamp versionstamp) { final Timer.Sample sample = Timer.start(); final byte[] messageKey = getDeviceQueueSubspace(aci, deviceId).pack(Tuple.from(versionstamp)); - databasesByEpoch[getConfigurationEpoch(versionstamp)][getShardId(versionstamp)].run(transaction -> { - transaction.clear(messageKey); - return null; - }); - - sample.stop(DELETE_MESSAGE_TIMER); - DELETE_MESSAGE_COUNTER.increment(); + return databasesByEpoch[getConfigurationEpoch(versionstamp)][getShardId(versionstamp)].runAsync(transaction -> { + transaction.clear(messageKey); + return CompletableFuture.completedFuture(null); + }) + .thenRun(() -> { + sample.stop(DELETE_MESSAGE_TIMER); + DELETE_MESSAGE_COUNTER.increment(); + }); } public void clearAll(final AciServiceIdentifier aci) { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStreamTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStreamTest.java index c7f5c9acc..08414df6b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStreamTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStreamTest.java @@ -41,7 +41,6 @@ class DeletionMirroringRedisDynamoDbMessageStreamTest { mock(RedisDynamoDbMessageStream.class), foundationDbMessageStore, experimentEnrollmentManager, - Runnable::run, ACCOUNT_IDENTIFIER.uuid(), DEVICE_ID); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStoreTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStoreTest.java index 03a603286..1d7303ffe 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStoreTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStoreTest.java @@ -474,7 +474,7 @@ class FoundationDbMessageStoreTest { assertArrayEquals(retainedMessage.toByteArray(), getItemsInDeviceQueue(retainedMessageAci, retainedMessageDeviceId).getFirst().getValue()); - foundationDbMessageStore.delete(deletedMessageAci, deletedMessageDeviceId, deletedMessageGuid); + foundationDbMessageStore.delete(deletedMessageAci, deletedMessageDeviceId, deletedMessageGuid).join(); assertTrue(getItemsInDeviceQueue(deletedMessageAci, deletedMessageDeviceId).isEmpty());