Delete messages from FoundationDB asynchronously
This commit is contained in:
parent
4a3275ad63
commit
45f96cd702
@ -7,9 +7,7 @@ package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Flow;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
|
||||
@ -21,9 +19,7 @@ public class DeletionMirroringRedisDynamoDbMessageStream implements MessageStrea
|
||||
|
||||
private final RedisDynamoDbMessageStream redisDynamoDbMessageStream;
|
||||
private final FoundationDbMessageStore foundationDbMessageStore;
|
||||
|
||||
private final ExperimentEnrollmentManager experimentEnrollmentManager;
|
||||
private final Executor messageDeletionExecutor;
|
||||
|
||||
private final AciServiceIdentifier accountIdentifier;
|
||||
private final byte deviceId;
|
||||
@ -33,15 +29,12 @@ public class DeletionMirroringRedisDynamoDbMessageStream implements MessageStrea
|
||||
public DeletionMirroringRedisDynamoDbMessageStream(final RedisDynamoDbMessageStream redisDynamoDbMessageStream,
|
||||
final FoundationDbMessageStore foundationDbMessageStore,
|
||||
final ExperimentEnrollmentManager experimentEnrollmentManager,
|
||||
final Executor messageDeletionExecutor,
|
||||
final UUID accountIdentifier,
|
||||
final byte deviceId) {
|
||||
|
||||
this.redisDynamoDbMessageStream = redisDynamoDbMessageStream;
|
||||
this.foundationDbMessageStore = foundationDbMessageStore;
|
||||
|
||||
this.experimentEnrollmentManager = experimentEnrollmentManager;
|
||||
this.messageDeletionExecutor = messageDeletionExecutor;
|
||||
|
||||
this.accountIdentifier = new AciServiceIdentifier(accountIdentifier);
|
||||
this.deviceId = deviceId;
|
||||
@ -59,13 +52,13 @@ public class DeletionMirroringRedisDynamoDbMessageStream implements MessageStrea
|
||||
if (messageGuid.version() == 8 &&
|
||||
experimentEnrollmentManager.isEnrolled(accountIdentifier.uuid(), MessagesManager.MIRROR_DELETIONS_EXPERIMENT_NAME)) {
|
||||
|
||||
messageDeletionExecutor.execute(() -> {
|
||||
try {
|
||||
foundationDbMessageStore.delete(accountIdentifier, deviceId, messageGuid);
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Failed to delete message {}/{}/{} from FoundationDb", accountIdentifier.uuid(), deviceId, messageGuid, e);
|
||||
}
|
||||
});
|
||||
foundationDbMessageStore.delete(accountIdentifier, deviceId, messageGuid)
|
||||
.whenComplete((_, throwable) -> {
|
||||
if (throwable != null) {
|
||||
logger.warn("Failed to delete message {}/{}/{} from FoundationDb", accountIdentifier.uuid(), deviceId,
|
||||
messageGuid, throwable);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return redisDynamoDbMessageStream.acknowledgeMessage(messageGuid, serverTimestamp);
|
||||
|
||||
@ -283,7 +283,6 @@ public class MessagesManager {
|
||||
new RedisDynamoDbMessageStream(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, destinationUuid, destinationDevice),
|
||||
foundationDbMessageStore,
|
||||
experimentEnrollmentManager,
|
||||
messageDeletionExecutor,
|
||||
destinationUuid,
|
||||
destinationDevice.getId());
|
||||
}
|
||||
|
||||
@ -326,22 +326,23 @@ public class FoundationDbMessageStore {
|
||||
|
||||
// Note that this method is intended only for initial migration support; in general, callers should clear messages
|
||||
// by acknowledging messages via a `FoundationDbMessageStream`.
|
||||
public void delete(final AciServiceIdentifier aci, final byte deviceId, final UUID messageGuid) {
|
||||
delete(aci, deviceId, versionstampUUIDCipher.decryptVersionstamp(messageGuid, aci.uuid(), deviceId));
|
||||
public CompletableFuture<Void> delete(final AciServiceIdentifier aci, final byte deviceId, final UUID messageGuid) {
|
||||
return delete(aci, deviceId, versionstampUUIDCipher.decryptVersionstamp(messageGuid, aci.uuid(), deviceId));
|
||||
}
|
||||
|
||||
private void delete(final AciServiceIdentifier aci, final byte deviceId, final Versionstamp versionstamp) {
|
||||
private CompletableFuture<Void> delete(final AciServiceIdentifier aci, final byte deviceId, final Versionstamp versionstamp) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
final byte[] messageKey = getDeviceQueueSubspace(aci, deviceId).pack(Tuple.from(versionstamp));
|
||||
|
||||
databasesByEpoch[getConfigurationEpoch(versionstamp)][getShardId(versionstamp)].run(transaction -> {
|
||||
transaction.clear(messageKey);
|
||||
return null;
|
||||
});
|
||||
|
||||
sample.stop(DELETE_MESSAGE_TIMER);
|
||||
DELETE_MESSAGE_COUNTER.increment();
|
||||
return databasesByEpoch[getConfigurationEpoch(versionstamp)][getShardId(versionstamp)].runAsync(transaction -> {
|
||||
transaction.clear(messageKey);
|
||||
return CompletableFuture.completedFuture(null);
|
||||
})
|
||||
.thenRun(() -> {
|
||||
sample.stop(DELETE_MESSAGE_TIMER);
|
||||
DELETE_MESSAGE_COUNTER.increment();
|
||||
});
|
||||
}
|
||||
|
||||
public void clearAll(final AciServiceIdentifier aci) {
|
||||
|
||||
@ -41,7 +41,6 @@ class DeletionMirroringRedisDynamoDbMessageStreamTest {
|
||||
mock(RedisDynamoDbMessageStream.class),
|
||||
foundationDbMessageStore,
|
||||
experimentEnrollmentManager,
|
||||
Runnable::run,
|
||||
ACCOUNT_IDENTIFIER.uuid(),
|
||||
DEVICE_ID);
|
||||
}
|
||||
|
||||
@ -474,7 +474,7 @@ class FoundationDbMessageStoreTest {
|
||||
assertArrayEquals(retainedMessage.toByteArray(),
|
||||
getItemsInDeviceQueue(retainedMessageAci, retainedMessageDeviceId).getFirst().getValue());
|
||||
|
||||
foundationDbMessageStore.delete(deletedMessageAci, deletedMessageDeviceId, deletedMessageGuid);
|
||||
foundationDbMessageStore.delete(deletedMessageAci, deletedMessageDeviceId, deletedMessageGuid).join();
|
||||
|
||||
assertTrue(getItemsInDeviceQueue(deletedMessageAci, deletedMessageDeviceId).isEmpty());
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user