diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2034586b5..f2fc93b12 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -61,7 +61,7 @@ jobs: - name: Download and install FoundationDB client run: | ./mvnw -e -B -Pexclude-spam-filter clean prepare-package -DskipTests=true - cp service/target/jib-extra/usr/lib/libfdb_c.x86_64.so /usr/lib/libfdb_c.x86_64.so + cp service/target/jib-extra/usr/lib/libfdb_c.so /usr/lib/libfdb_c.x86_64.so ldconfig - name: Build with Maven run: ./mvnw -e -B clean verify -DfoundationDb.serviceContainerNamePrefix=foundationdb diff --git a/service/config/sample-secrets-bundle.yml b/service/config/sample-secrets-bundle.yml index f8c3c4a7d..c0612bd26 100644 --- a/service/config/sample-secrets-bundle.yml +++ b/service/config/sample-secrets-bundle.yml @@ -100,3 +100,5 @@ tlsKeyStore.password: unset hlrLookup.apiKey: AAAAAAAAAAA hlrLookup.apiSecret: AAAAAAAAAAA + +foundationDbMessages.versionstampCipherKey.0: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= diff --git a/service/config/sample.yml b/service/config/sample.yml index ae44836d9..1f507db64 100644 --- a/service/config/sample.yml +++ b/service/config/sample.yml @@ -538,3 +538,27 @@ callQualitySurvey: hlrLookup: apiKey: secret://hlrLookup.apiKey apiSecret: secret://hlrLookup.apiSecret + +foundationDbMessages: + maxWatchesPerClient: 10000 + versionstampCipherKeys: + 0: secret://foundationDbMessages.versionstampCipherKey.0 + currentVersionstampCipherKey: 0 + clusters: + "messages-0": + clusterFileUrl: http://clusterfiles.example.com/messages-0 + "messages-1": + clusterFileUrl: http://clusterfiles.example.com/messages-1 + "messages-2": + clusterFileUrl: http://clusterfiles.example.com/messages-2 + "messages-3": + clusterFileUrl: http://clusterfiles.example.com/messages-3 + epochs: + 0: + - messages-0 + - messages-1 + 1: + - messages-0 + - messages-1 + - messages-2 + - messages-3 diff --git a/service/pom.xml b/service/pom.xml index ff14a20aa..e1e2f1436 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -617,7 +617,7 @@ io.github.download-maven-plugin download-maven-plugin - 2.0.0 + 2.1.0 @@ -632,6 +632,7 @@ https://github.com/apple/foundationdb/releases/download/${foundationdb.version}/libfdb_c.x86_64.so ${project.build.directory}/jib-extra/usr/lib + libfdb_c.so ${foundationdb.client-library-sha256} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 7f0dc2d0f..c0098ebaa 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -34,6 +34,7 @@ import org.whispersystems.textsecuregcm.configuration.ExternalRequestFilterConfi import org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClientFactory; import org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClusterFactory; import org.whispersystems.textsecuregcm.configuration.FcmConfiguration; +import org.whispersystems.textsecuregcm.configuration.FoundationDbMessagesConfiguration; import org.whispersystems.textsecuregcm.configuration.GcpAttachmentsConfiguration; import org.whispersystems.textsecuregcm.configuration.GenericZkConfig; import org.whispersystems.textsecuregcm.configuration.GooglePlayBillingConfiguration; @@ -361,6 +362,11 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private ChangeNumberConfiguration changeNumber = new ChangeNumberConfiguration(Duration.ofHours(1)); + @Valid + @NotNull + @JsonProperty + private FoundationDbMessagesConfiguration foundationDbMessages; + public TlsKeyStoreConfiguration getTlsKeyStoreConfiguration() { return tlsKeyStore; } @@ -600,4 +606,8 @@ public class WhisperServerConfiguration extends Configuration { public ChangeNumberConfiguration getChangeNumber() { return changeNumber; } + + public FoundationDbMessagesConfiguration getFoundationDbMessagesConfiguration() { + return foundationDbMessages; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index fd6753d92..d082d9241 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -7,6 +7,8 @@ package org.whispersystems.textsecuregcm; import static java.util.Objects.requireNonNull; import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; +import com.apple.foundationdb.Database; +import com.apple.foundationdb.FDB; import com.google.common.collect.Lists; import com.webauthn4j.appattest.DeviceCheckManager; import io.dropwizard.auth.AuthDynamicFeature; @@ -44,6 +46,8 @@ import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.util.Mapping; import jakarta.servlet.DispatcherType; import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.http.HttpClient; @@ -53,6 +57,7 @@ import java.time.Duration; import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.ServiceLoader; import java.util.Set; @@ -63,7 +68,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.annotation.Nullable; import org.eclipse.jetty.ee10.websocket.server.config.JettyWebSocketServletContainerInitializer; import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry; import org.eclipse.jetty.websocket.core.server.WebSocketServerComponents; @@ -174,8 +181,8 @@ import org.whispersystems.textsecuregcm.grpc.ProfileAnonymousGrpcService; import org.whispersystems.textsecuregcm.grpc.ProfileGrpcService; import org.whispersystems.textsecuregcm.grpc.RequestAttributesInterceptor; import org.whispersystems.textsecuregcm.grpc.ValidatingInterceptor; -import org.whispersystems.textsecuregcm.grpc.net.ManagedGrpcServer; import org.whispersystems.textsecuregcm.grpc.net.ManagedEventLoopGroup; +import org.whispersystems.textsecuregcm.grpc.net.ManagedGrpcServer; import org.whispersystems.textsecuregcm.grpc.net.OmnibusH2Server; import org.whispersystems.textsecuregcm.grpc.net.OmnibusRouter; import org.whispersystems.textsecuregcm.grpc.net.SniMapper; @@ -245,6 +252,7 @@ import org.whispersystems.textsecuregcm.storage.ChangeNumberWaitingPeriods; import org.whispersystems.textsecuregcm.storage.ClientReleaseManager; import org.whispersystems.textsecuregcm.storage.ClientReleases; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; +import org.whispersystems.textsecuregcm.storage.FoundationDbVersion; import org.whispersystems.textsecuregcm.storage.IssuedReceiptsManager; import org.whispersystems.textsecuregcm.storage.KeysManager; import org.whispersystems.textsecuregcm.storage.MessagesCache; @@ -275,6 +283,8 @@ import org.whispersystems.textsecuregcm.storage.VerificationSessions; import org.whispersystems.textsecuregcm.storage.devicecheck.AppleDeviceCheckManager; import org.whispersystems.textsecuregcm.storage.devicecheck.AppleDeviceCheckTrustAnchor; import org.whispersystems.textsecuregcm.storage.devicecheck.AppleDeviceChecks; +import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore; +import org.whispersystems.textsecuregcm.storage.foundationdb.VersionstampUUIDCipher; import org.whispersystems.textsecuregcm.subscriptions.AppleAppStoreClient; import org.whispersystems.textsecuregcm.subscriptions.AppleAppStoreManager; import org.whispersystems.textsecuregcm.subscriptions.BankMandateTranslator; @@ -331,7 +341,6 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.s3.S3AsyncClient; -import javax.annotation.Nullable; public class WhisperServerService extends Application { @@ -460,6 +469,39 @@ public class WhisperServerService extends Application> messageDatabasesByEpoch; + { + final Map databasesByName = + config.getFoundationDbMessagesConfiguration().clusters().entrySet().stream() + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, + entry -> { + try { + final Database database = entry.getValue().build(fdb); + database.options().setMaxWatches(config.getFoundationDbMessagesConfiguration().maxWatchesPerClient()); + + return database; + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + })); + + messageDatabasesByEpoch = config.getFoundationDbMessagesConfiguration().epochs().entrySet().stream() + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, + entry -> entry.getValue().stream() + .map(databasesByName::get) + .toList())); + } + final AwsCredentialsProvider cdnCredentialsProvider = config.getCdnConfiguration().credentials().build(); final S3AsyncClient asyncCdnS3Client = S3AsyncClient.builder() .credentialsProvider(cdnCredentialsProvider) @@ -703,6 +745,11 @@ public class WhisperServerService extends Application new File(clusterFileUri); + + case "http", "https" -> { + try (final HttpClient clusterFileClient = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_1_1) + .connectTimeout(Duration.ofSeconds(10)) + .build()) { + + final HttpResponse response = clusterFileClient.send(HttpRequest.newBuilder() + .uri(URI.create(clusterFileUrl())) + .timeout(Duration.ofSeconds(10)) + .GET() + .build(), HttpResponse.BodyHandlers.ofString()); + + if (response.statusCode() != 200) { + throw new IOException("Could not load cluster file (status " + response.statusCode() + ")"); + } + + final File tempClusterFile = File.createTempFile("fdb.cluster-", ""); + tempClusterFile.deleteOnExit(); + + try (final FileWriter fileWriter = new FileWriter(tempClusterFile)) { + fileWriter.write(response.body()); + } + + yield tempClusterFile; + } catch (final InterruptedException e) { + throw new IOException("Interrupted while waiting for cluster file response", e); + } + } + + default -> throw new IllegalArgumentException("Unrecognized cluster file URI scheme: " + clusterFileUri.getScheme()); + }; + + return fdb.open(clusterFile.getAbsolutePath()); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbDatabaseFactory.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbDatabaseFactory.java new file mode 100644 index 000000000..4b5e76204 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbDatabaseFactory.java @@ -0,0 +1,18 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.configuration; + +import com.apple.foundationdb.Database; +import com.apple.foundationdb.FDB; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.dropwizard.jackson.Discoverable; +import java.io.IOException; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FoundationDbClusterConfiguration.class) +public interface FoundationDbDatabaseFactory extends Discoverable { + + Database build(final FDB fdb) throws IOException; +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfiguration.java index d9d020344..b28ccb355 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfiguration.java @@ -14,11 +14,17 @@ import jakarta.validation.constraints.Size; import java.util.HashSet; import java.util.List; import java.util.Map; +import org.whispersystems.textsecuregcm.configuration.secrets.SecretBytes; import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore; -public record FoundationDbMessagesConfiguration(@NotEmpty Map clusters, +public record FoundationDbMessagesConfiguration(@NotEmpty Map clusters, @NotEmpty Map<@PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) Integer, @Size(min = 1, max = FoundationDbMessageStore.MAX_SHARDS - 1) List> epochs, - @PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) int activeEpoch) { + @PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) int activeEpoch, + @NotEmpty Map<@PositiveOrZero @Max(63) Integer, SecretBytes> versionstampCipherKeys, + @PositiveOrZero @Max(63) int currentVersionstampCipherKey, + @PositiveOrZero @Max(1_000_000) long maxWatchesPerClient) { + + public static final long DEFAULT_MAX_WATCHES_PER_CLIENT = 10_000; @AssertTrue boolean isEveryEpochClusterConfigured() { @@ -48,4 +54,9 @@ public record FoundationDbMessagesConfiguration(@NotEmpty Map void compareResult(final T expected, final T experimentResult) { + recordResult(expected, experimentResult, Timer.start()); + } + public void compareMonoResult(final T expected, final Mono experimentMono) { final Timer.Sample sample = Timer.start(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStream.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStream.java new file mode 100644 index 000000000..629867a27 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStream.java @@ -0,0 +1,73 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Flow; +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; +import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; +import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore; + +/// A temporary message stream that can mirror message acknowledgements (deletion requests) to FoundationDB +public class DeletionMirroringRedisDynamoDbMessageStream implements MessageStream { + + private final RedisDynamoDbMessageStream redisDynamoDbMessageStream; + private final FoundationDbMessageStore foundationDbMessageStore; + + private final ExperimentEnrollmentManager experimentEnrollmentManager; + private final Executor messageDeletionExecutor; + + private final AciServiceIdentifier accountIdentifier; + private final byte deviceId; + + private static final Logger logger = LoggerFactory.getLogger(DeletionMirroringRedisDynamoDbMessageStream.class); + + public DeletionMirroringRedisDynamoDbMessageStream(final RedisDynamoDbMessageStream redisDynamoDbMessageStream, + final FoundationDbMessageStore foundationDbMessageStore, + final ExperimentEnrollmentManager experimentEnrollmentManager, + final Executor messageDeletionExecutor, + final UUID accountIdentifier, + final byte deviceId) { + + this.redisDynamoDbMessageStream = redisDynamoDbMessageStream; + this.foundationDbMessageStore = foundationDbMessageStore; + + this.experimentEnrollmentManager = experimentEnrollmentManager; + this.messageDeletionExecutor = messageDeletionExecutor; + + this.accountIdentifier = new AciServiceIdentifier(accountIdentifier); + this.deviceId = deviceId; + } + + @Override + public Flow.Publisher getMessages() { + return redisDynamoDbMessageStream.getMessages(); + } + + @Override + public CompletableFuture acknowledgeMessage(final UUID messageGuid, final long serverTimestamp) { + // All messages stored in FoundationDB use version 8 UUIDs; if a message has a version 4 UUID, then it only exists + // in Redis/DynamoDB + if (messageGuid.version() == 8 && + experimentEnrollmentManager.isEnrolled(accountIdentifier.uuid(), MessagesManager.MIRROR_DELETIONS_EXPERIMENT_NAME)) { + + messageDeletionExecutor.execute(() -> { + try { + foundationDbMessageStore.delete(accountIdentifier, deviceId, messageGuid); + } catch (final Exception e) { + logger.warn("Failed to delete message {}/{}/{} from FoundationDb", accountIdentifier.uuid(), deviceId, messageGuid, e); + } + }); + } + + return redisDynamoDbMessageStream.acknowledgeMessage(messageGuid, serverTimestamp); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 1296f680e..a1f3e269b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -6,11 +6,14 @@ package org.whispersystems.textsecuregcm.storage; import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import java.time.Clock; +import java.time.Duration; import java.time.Instant; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -28,10 +31,13 @@ import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; +import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; +import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore; import org.whispersystems.textsecuregcm.util.UUIDUtil; import reactor.core.observability.micrometer.Micrometer; import reactor.core.publisher.Flux; @@ -52,27 +58,44 @@ public class MessagesManager { private static final String MAY_HAVE_MESSAGES_COUNTER_NAME = MetricsUtil.name(MessagesManager.class, "mayHaveMessages"); + private static final String PRESENCE_MATCH_COUNTER_NAME = + MetricsUtil.name(MessagesManager.class, "presenceMatch"); + + @VisibleForTesting + static final String MIRROR_INSERTS_EXPERIMENT_NAME = "foundationDbMirrorInserts"; + + @VisibleForTesting + static final String MIRROR_DELETIONS_EXPERIMENT_NAME = "foundationDbMirrorDeletions"; + + private static final long FOUNDATIONDB_INSERT_TIMEOUT_MILLIS = Duration.ofSeconds(2).toMillis(); + private final MessagesDynamoDb messagesDynamoDb; private final MessagesCache messagesCache; + private final FoundationDbMessageStore foundationDbMessageStore; private final RedisMessageAvailabilityManager redisMessageAvailabilityManager; private final ReportMessageManager reportMessageManager; private final ExecutorService messageDeletionExecutor; private final Clock clock; + private final ExperimentEnrollmentManager experimentEnrollmentManager; public MessagesManager( final MessagesDynamoDb messagesDynamoDb, final MessagesCache messagesCache, + final FoundationDbMessageStore foundationDbMessageStore, final RedisMessageAvailabilityManager redisMessageAvailabilityManager, final ReportMessageManager reportMessageManager, final ExecutorService messageDeletionExecutor, - final Clock clock) { + final Clock clock, + final ExperimentEnrollmentManager experimentEnrollmentManager) { this.messagesDynamoDb = messagesDynamoDb; this.messagesCache = messagesCache; + this.foundationDbMessageStore = foundationDbMessageStore; this.redisMessageAvailabilityManager = redisMessageAvailabilityManager; this.reportMessageManager = reportMessageManager; this.messageDeletionExecutor = messageDeletionExecutor; this.clock = clock; + this.experimentEnrollmentManager = experimentEnrollmentManager; } /** @@ -92,28 +115,68 @@ public class MessagesManager { private CompletableFuture> insertAsync(final UUID accountIdentifier, final Map messagesByDeviceId) { final Map devicePresenceById = new ConcurrentHashMap<>(); - return CompletableFuture.allOf(messagesByDeviceId.entrySet().stream() - .map(deviceIdAndMessage -> { - final byte deviceId = deviceIdAndMessage.getKey(); - final Envelope message = deviceIdAndMessage.getValue(); - final UUID messageGuid = UUID.randomUUID(); + final CompletableFuture> foundationDbInsertFuture; - return messagesCache.insert(messageGuid, accountIdentifier, deviceId, message) - .thenAccept(present -> { - if (message.hasSourceServiceId()) { - final ServiceIdentifier sourceServiceIdentifier = - ServiceIdentifier.fromByteString(message.getSourceServiceId()); + if (experimentEnrollmentManager.isEnrolled(accountIdentifier, MIRROR_INSERTS_EXPERIMENT_NAME)) { + // Multi-recipient messages will have both a "shared MRM key" and actual message content; we only need/want the + // latter for FoundationDB + final Map minimizedMessagesByDeviceId = messagesByDeviceId.entrySet().stream() + .collect(Collectors.toUnmodifiableMap( + Map.Entry::getKey, + entry -> entry.getValue().toBuilder().clearSharedMrmKey().build())); - if (!accountIdentifier.equals(sourceServiceIdentifier.uuid())) { - // Note that this is an asynchronous, best-effort, fire-and-forget operation - reportMessageManager.store(sourceServiceIdentifier.toServiceIdentifierString(), messageGuid); - } - } + foundationDbInsertFuture = + foundationDbMessageStore.insert(new AciServiceIdentifier(accountIdentifier), minimizedMessagesByDeviceId) + .orTimeout(FOUNDATIONDB_INSERT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS) + .exceptionally(e -> { + logger.warn("Failed to insert {} message(s) for {} into FoundationDB", + minimizedMessagesByDeviceId.size(), + accountIdentifier, + e); - devicePresenceById.put(deviceId, present); - }); - }) - .toArray(CompletableFuture[]::new)) + return Collections.emptyMap(); + }); + } else { + foundationDbInsertFuture = CompletableFuture.completedFuture(Collections.emptyMap()); + } + + return foundationDbInsertFuture.thenCompose(foundationDbInsertResults -> + CompletableFuture.allOf(messagesByDeviceId.entrySet().stream() + .map(deviceIdAndMessage -> { + final byte deviceId = deviceIdAndMessage.getKey(); + + // Multi-recipient messages will have both a "shared MRM key" and actual message content; we only + // need/want the former for Redis + final Envelope message = deviceIdAndMessage.getValue().hasSharedMrmKey() + ? deviceIdAndMessage.getValue().toBuilder().clearContent().build() + : deviceIdAndMessage.getValue(); + + final UUID messageGuid = Optional.ofNullable(foundationDbInsertResults.get(deviceId)) + .flatMap(FoundationDbMessageStore.InsertResult::messageGuid) + .orElseGet(UUID::randomUUID); + + return messagesCache.insert(messageGuid, accountIdentifier, deviceId, message) + .thenAccept(present -> { + if (message.hasSourceServiceId()) { + final ServiceIdentifier sourceServiceIdentifier = + ServiceIdentifier.fromByteString(message.getSourceServiceId()); + + if (!accountIdentifier.equals(sourceServiceIdentifier.uuid())) { + // Note that this is an asynchronous, best-effort, fire-and-forget operation + reportMessageManager.store(sourceServiceIdentifier.toServiceIdentifierString(), messageGuid); + } + } + + devicePresenceById.put(deviceId, present); + + if (foundationDbInsertResults.containsKey(deviceId)) { + Metrics.counter(PRESENCE_MATCH_COUNTER_NAME, + "match", String.valueOf(present == foundationDbInsertResults.get(deviceId).present())) + .increment(); + } + }); + }) + .toArray(CompletableFuture[]::new))) .thenApply(ignored -> devicePresenceById); } @@ -173,6 +236,7 @@ public class MessagesManager { return insertAsync(resolvedRecipients.get(recipient).getIdentifier(IdentityType.ACI), IntStream.range(0, devices.length).mapToObj(i -> devices[i]) .collect(Collectors.toMap(deviceId -> deviceId, _ -> prototypeMessage.toBuilder() + .setContent(ByteString.copyFrom(multiRecipientMessage.messageForRecipient(serviceIdAndRecipient.getValue()))) .setDestinationServiceId(serviceIdentifier.toCompactByteString()) .build()))) .thenAccept(clientPresenceByDeviceId -> @@ -215,7 +279,13 @@ public class MessagesManager { } public MessageStream getMessages(final UUID destinationUuid, final Device destinationDevice) { - return new RedisDynamoDbMessageStream(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, destinationUuid, destinationDevice); + return new DeletionMirroringRedisDynamoDbMessageStream( + new RedisDynamoDbMessageStream(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, destinationUuid, destinationDevice), + foundationDbMessageStore, + experimentEnrollmentManager, + messageDeletionExecutor, + destinationUuid, + destinationDevice.getId()); } Publisher getMessagesForDevice(final UUID destinationUuid, final Device destinationDevice) { @@ -226,11 +296,31 @@ public class MessagesManager { .tap(Micrometer.metrics(Metrics.globalRegistry)); } - public CompletableFuture clear(UUID destinationUuid) { + public CompletableFuture clear(final UUID destinationUuid) { + if (experimentEnrollmentManager.isEnrolled(destinationUuid, MessagesManager.MIRROR_DELETIONS_EXPERIMENT_NAME)) { + messageDeletionExecutor.execute(() -> { + try { + foundationDbMessageStore.clearAll(new AciServiceIdentifier(destinationUuid)); + } catch (final Exception e) { + logger.warn("Failed to clear messages for {}", destinationUuid, e); + } + }); + } + return messagesCache.clear(destinationUuid); } - public CompletableFuture clear(UUID destinationUuid, byte deviceId) { + public CompletableFuture clear(final UUID destinationUuid, final byte deviceId) { + if (experimentEnrollmentManager.isEnrolled(destinationUuid, MessagesManager.MIRROR_DELETIONS_EXPERIMENT_NAME)) { + messageDeletionExecutor.execute(() -> { + try { + foundationDbMessageStore.clearAll(new AciServiceIdentifier(destinationUuid), deviceId); + } catch (final Exception e) { + logger.warn("Failed to clear messages for {}:{}", destinationUuid, deviceId, e); + } + }); + } + return messagesCache.clear(destinationUuid, deviceId); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStore.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStore.java index 3548a59a3..2b08fbeef 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStore.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStore.java @@ -16,11 +16,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.MessageStream; import org.whispersystems.textsecuregcm.util.Conversions; @@ -58,12 +65,27 @@ public class FoundationDbMessageStore { public static final int MAX_EPOCHS = 4; public static final int MAX_SHARDS = 64; + private static final Counter INSERT_MESSAGE_COUNTER = + Metrics.counter(MetricsUtil.name(FoundationDbMessageStore.class, "insertMessage")); + + private static final Timer INSERT_MESSAGE_BATCH_TIMER = + Metrics.timer(MetricsUtil.name(FoundationDbMessageStore.class, "insertMessageBatchTimer")); + + private static final Counter DELETE_MESSAGE_COUNTER = + Metrics.counter(MetricsUtil.name(FoundationDbMessageStore.class, "deleteMessage")); + + private static final Timer DELETE_MESSAGE_TIMER = + Metrics.timer(MetricsUtil.name(FoundationDbMessageStore.class, "deleteMessageTimer")); + /// Result of inserting a message for a particular device /// /// @param versionstamp the versionstamp of the transaction in which this device's message was inserted, empty /// otherwise + /// @param messageGuid the versionstamp encrypted/encoded as a version 8 UUID /// @param present whether the device is online - public record InsertResult(Optional versionstamp, boolean present) { + public record InsertResult(Optional versionstamp, + Optional messageGuid, + boolean present) { } public FoundationDbMessageStore(final Map> databasesByEpoch, @@ -129,6 +151,8 @@ public class FoundationDbMessageStore { final Map> messagesByServiceIdentifier, final int epoch) { + final Timer.Sample sample = Timer.start(); + if (messagesByServiceIdentifier.entrySet() .stream() .anyMatch(entry -> entry.getValue().isEmpty())) { @@ -180,7 +204,13 @@ public class FoundationDbMessageStore { .reduce(new HashMap<>(), (a, b) -> { a.putAll(b); return a; - })); + })) + .whenComplete((_, throwable) -> { + if (throwable == null) { + sample.stop(INSERT_MESSAGE_BATCH_TIMER); + INSERT_MESSAGE_COUNTER.increment(messagesByServiceIdentifier.values().stream().mapToInt(Map::size).sum()); + } + }); } private CompletableFuture>> insertChunk( @@ -231,7 +261,11 @@ public class FoundationDbMessageStore { } else { insertResultVersionstamp = Optional.empty(); } - return new InsertResult(insertResultVersionstamp, presenceEntry.getValue()); + + return new InsertResult(insertResultVersionstamp, + insertResultVersionstamp.map(versionstamp -> + versionstampUUIDCipher.encryptVersionstamp(versionstamp, entry.getKey().uuid(), presenceEntry.getKey())), + presenceEntry.getValue()); })); }))); } @@ -290,6 +324,48 @@ public class FoundationDbMessageStore { }); } + // Note that this method is intended only for initial migration support; in general, callers should clear messages + // by acknowledging messages via a `FoundationDbMessageStream`. + public void delete(final AciServiceIdentifier aci, final byte deviceId, final UUID messageGuid) { + delete(aci, deviceId, versionstampUUIDCipher.decryptVersionstamp(messageGuid, aci.uuid(), deviceId)); + } + + private void delete(final AciServiceIdentifier aci, final byte deviceId, final Versionstamp versionstamp) { + final Timer.Sample sample = Timer.start(); + + final byte[] messageKey = getDeviceQueueSubspace(aci, deviceId).pack(Tuple.from(versionstamp)); + + databasesByEpoch[getConfigurationEpoch(versionstamp)][getShardId(versionstamp)].run(transaction -> { + transaction.clear(messageKey); + return null; + }); + + sample.stop(DELETE_MESSAGE_TIMER); + DELETE_MESSAGE_COUNTER.increment(); + } + + public void clearAll(final AciServiceIdentifier aci) { + doForAllDatabasesWithMessages(aci, database -> database.run(transaction -> { + transaction.clear(getAccountSubspace(aci).range()); + return null; + })); + } + + public void clearAll(final AciServiceIdentifier aci, final byte deviceId) { + doForAllDatabasesWithMessages(aci, database -> database.run(transaction -> { + transaction.clear(getDeviceSubspace(aci, deviceId).range()); + return null; + })); + } + + private void doForAllDatabasesWithMessages(final AciServiceIdentifier aci, final Consumer action) { + IntStream.range(0, databasesByEpoch.length) + .filter(epoch -> databasesByEpoch[epoch] != null) + .mapToObj(epoch -> databasesByEpoch[epoch][hashAciToShardNumber(aci, epoch)]) + .distinct() + .forEach(action); + } + public MessageStream getMessages(final AciServiceIdentifier aci, final Device destinationDevice) { return getMessages(aci, destinationDevice, FoundationDbMessageStream.DEFAULT_MAX_MESSAGES_PER_SCAN, FoundationDbMessageStream.DEFAULT_MAX_UNACKNOWLEDGED_MESSAGES, Util.NOOP); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index 97d25bf26..af3716e6a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -5,18 +5,23 @@ package org.whispersystems.textsecuregcm.workers; +import com.apple.foundationdb.Database; +import com.apple.foundationdb.FDB; import com.fasterxml.jackson.databind.DeserializationFeature; import io.dropwizard.core.setup.Environment; import io.lettuce.core.resource.ClientResources; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.security.GeneralSecurityException; import java.time.Clock; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.SynchronousQueue; +import java.util.stream.Collectors; import org.signal.libsignal.zkgroup.GenericServerSecretParams; import org.signal.libsignal.zkgroup.InvalidInputException; import org.signal.libsignal.zkgroup.ServerSecretParams; @@ -56,6 +61,7 @@ import org.whispersystems.textsecuregcm.storage.ChangeNumberWaitingPeriodManager import org.whispersystems.textsecuregcm.storage.ChangeNumberWaitingPeriods; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.DynamoDbRecoveryManager; +import org.whispersystems.textsecuregcm.storage.FoundationDbVersion; import org.whispersystems.textsecuregcm.storage.IssuedReceiptsManager; import org.whispersystems.textsecuregcm.storage.KeysManager; import org.whispersystems.textsecuregcm.storage.MessagesCache; @@ -75,6 +81,8 @@ import org.whispersystems.textsecuregcm.storage.ReportMessageManager; import org.whispersystems.textsecuregcm.storage.SingleUseECPreKeyStore; import org.whispersystems.textsecuregcm.storage.SubscriptionManager; import org.whispersystems.textsecuregcm.storage.Subscriptions; +import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore; +import org.whispersystems.textsecuregcm.storage.foundationdb.VersionstampUUIDCipher; import org.whispersystems.textsecuregcm.subscriptions.AppleAppStoreClient; import org.whispersystems.textsecuregcm.subscriptions.AppleAppStoreManager; import org.whispersystems.textsecuregcm.subscriptions.GooglePlayBillingManager; @@ -127,6 +135,39 @@ public record CommandDependencies( environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + final FDB fdb = FDB.selectAPIVersion(FoundationDbVersion.getFoundationDbApiVersion()); + + // Jetty and the FoundationDB client both register shutdown hooks to begin shutdown/cleanup operations. There isn't + // a good way to coordinate or enforce ordering between shutdown hooks, and so the two processes will race. + // Generally, FoundationDB will shut down before Jetty does, meaning we'll still be trying to serve requests that + // require talking to FoundationDB even though FoundationDB has shut down. To avoid that scenario, we disabled + // FoundationDB's shutdown hook and let the JVM terminate its (daemon) threads at exit. This isn't as graceful as + // we'd like, but is the least bad option given current constraints. + fdb.disableShutdownHook(); + + final Map> messageDatabasesByEpoch; + { + final Map databasesByName = + configuration.getFoundationDbMessagesConfiguration().clusters().entrySet().stream() + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, + entry -> { + try { + final Database database = entry.getValue().build(fdb); + database.options().setMaxWatches(configuration.getFoundationDbMessagesConfiguration().maxWatchesPerClient()); + + return database; + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + })); + + messageDatabasesByEpoch = configuration.getFoundationDbMessagesConfiguration().epochs().entrySet().stream() + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, + entry -> entry.getValue().stream() + .map(databasesByName::get) + .toList())); + } + final AwsCredentialsProvider awsCredentialsProvider = configuration.getAwsCredentialsConfiguration().build(); ScheduledExecutorService dynamicConfigurationExecutor = ScheduledExecutorServiceBuilder.of(environment, "dynamicConfiguration") @@ -268,6 +309,11 @@ public record CommandDependencies( disconnectionRequestListenerExecutor, retryExecutor); MessagesCache messagesCache = new MessagesCache(messagesCluster, messageDeliveryScheduler, messageDeletionExecutor, retryExecutor, Clock.systemUTC()); + final FoundationDbMessageStore foundationDbMessageStore = new FoundationDbMessageStore(messageDatabasesByEpoch, + configuration.getFoundationDbMessagesConfiguration().activeEpoch(), + new VersionstampUUIDCipher(configuration.getFoundationDbMessagesConfiguration().currentVersionstampCipherKey(), + configuration.getFoundationDbMessagesConfiguration().versionstampCipherKeys().get(configuration.getFoundationDbMessagesConfiguration().currentVersionstampCipherKey()).value()), + Clock.systemUTC()); ProfilesManager profilesManager = new ProfilesManager(profilesV1, profiles, cacheCluster, retryExecutor, asyncCdnS3Client, configuration.getCdnConfiguration().bucket()); ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient, dynamoDbAsyncClient, @@ -277,8 +323,9 @@ public record CommandDependencies( configuration.getReportMessageConfiguration().getCounterTtl()); RedisMessageAvailabilityManager redisMessageAvailabilityManager = new RedisMessageAvailabilityManager(messagesCluster, clientEventExecutor, asyncOperationQueueingExecutor); - MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, - reportMessageManager, messageDeletionExecutor, Clock.systemUTC()); + final MessagesManager messagesManager = + new MessagesManager(messagesDynamoDb, messagesCache, foundationDbMessageStore, redisMessageAvailabilityManager, + reportMessageManager, messageDeletionExecutor, Clock.systemUTC(), experimentEnrollmentManager); AccountLockManager accountLockManager = new AccountLockManager(dynamoDbClient, configuration.getDynamoDbTables().getDeletedAccountsLock().getTableName()); RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager = diff --git a/service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable b/service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable index 190a2779a..902b3c659 100644 --- a/service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable +++ b/service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable @@ -2,6 +2,7 @@ org.whispersystems.textsecuregcm.configuration.AwsCredentialsProviderFactory org.whispersystems.textsecuregcm.configuration.DynamoDbClientFactory org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClusterFactory org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClientFactory +org.whispersystems.textsecuregcm.configuration.FoundationDbDatabaseFactory org.whispersystems.textsecuregcm.configuration.PaymentsServiceClientsFactory org.whispersystems.textsecuregcm.configuration.PubSubPublisherFactory org.whispersystems.textsecuregcm.configuration.RegistrationServiceClientFactory diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/FoundationDbClusterConfigurationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/FoundationDbClusterConfigurationTest.java deleted file mode 100644 index 999ac0758..000000000 --- a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/FoundationDbClusterConfigurationTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2026 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.configuration; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.util.List; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -class FoundationDbClusterConfigurationTest { - - @ParameterizedTest - @MethodSource - void isSingleClusterFileSourceSpecified(final FoundationDbClusterConfiguration clusterConfiguration, - final boolean expectSingleClusterFileSourceSpecified) { - - assertEquals(expectSingleClusterFileSourceSpecified, clusterConfiguration.isSingleClusterFileSourceSpecified()); - } - - private static List isSingleClusterFileSourceSpecified() { - return List.of( - Arguments.argumentSet("Cluster file URL only", - new FoundationDbClusterConfiguration("test-url", null), true), - - Arguments.argumentSet("Cluster file contents only", - new FoundationDbClusterConfiguration(null, "test-contents"), true), - - Arguments.argumentSet("Both", - new FoundationDbClusterConfiguration("test-url", "test-contents"), false), - - Arguments.argumentSet("Neither", - new FoundationDbClusterConfiguration(null, null), false) - ); - } -} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfigurationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfigurationTest.java index 5a3944a7c..b1e67ed8a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfigurationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfigurationTest.java @@ -6,6 +6,8 @@ package org.whispersystems.textsecuregcm.configuration; import org.junit.jupiter.api.Test; +import org.whispersystems.textsecuregcm.configuration.secrets.SecretBytes; +import org.whispersystems.textsecuregcm.util.TestRandomUtil; import java.util.List; import java.util.Map; @@ -17,45 +19,84 @@ class FoundationDbMessagesConfigurationTest { @Test void isEveryEpochClusterConfigured() { assertTrue(new FoundationDbMessagesConfiguration( - Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)), + Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")), Map.of(0, List.of("messages-0")), - 0 + 0, + Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))), + 0, + FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT ).isEveryEpochClusterConfigured()); assertFalse(new FoundationDbMessagesConfiguration( - Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)), + Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")), Map.of(0, List.of("messages-0", "unconfigured-cluster")), - 0 + 0, + Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))), + 0, + FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT ).isEveryEpochClusterConfigured()); } @Test void isEveryEpochFreeOfDuplicates() { assertTrue(new FoundationDbMessagesConfiguration( - Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)), + Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")), Map.of(0, List.of("messages-0")), - 0 + 0, + Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))), + 0, + FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT ).isEveryEpochFreeOfDuplicates()); assertFalse(new FoundationDbMessagesConfiguration( - Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)), + Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")), Map.of(0, List.of("messages-0", "messages-0")), - 0 + 0, + Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))), + 0, + FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT ).isEveryEpochFreeOfDuplicates()); } @Test void isActiveEpochConfigured() { assertTrue(new FoundationDbMessagesConfiguration( - Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)), + Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")), Map.of(0, List.of("messages-0")), - 0 + 0, + Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))), + 0, + FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT ).isActiveEpochConfigured()); assertFalse(new FoundationDbMessagesConfiguration( - Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)), + Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")), Map.of(0, List.of("messages-0")), - 1 + 1, + Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))), + 0, + FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT ).isActiveEpochConfigured()); } + + @Test + void isCurrentVersionstampCipherKeyConfigured() { + assertTrue(new FoundationDbMessagesConfiguration( + Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")), + Map.of(0, List.of("messages-0")), + 0, + Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))), + 0, + FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT + ).isCurrentVersionstampCipherKeyConfigured()); + + assertFalse(new FoundationDbMessagesConfiguration( + Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")), + Map.of(0, List.of("messages-0")), + 0, + Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))), + 1, + FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT + ).isCurrentVersionstampCipherKeyConfigured()); + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFoundationDbDatabaseFactory.java b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFoundationDbDatabaseFactory.java new file mode 100644 index 000000000..12f3e5bc0 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFoundationDbDatabaseFactory.java @@ -0,0 +1,38 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.configuration; + +import com.apple.foundationdb.Database; +import com.apple.foundationdb.FDB; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonTypeName; +import java.io.IOException; +import org.whispersystems.textsecuregcm.storage.TestcontainersFoundationDbDatabaseLifecycleManager; + +@JsonTypeName("local") +public class LocalFoundationDbDatabaseFactory implements FoundationDbDatabaseFactory { + + @JsonIgnore + private final TestcontainersFoundationDbDatabaseLifecycleManager testcontainersFoundationDbDatabaseLifecycleManager; + + @JsonIgnore + private Database database; + + private LocalFoundationDbDatabaseFactory() { + this.testcontainersFoundationDbDatabaseLifecycleManager = new TestcontainersFoundationDbDatabaseLifecycleManager(); + } + + @Override + public synchronized Database build(final FDB fdb) throws IOException { + if (database == null) { + Runtime.getRuntime().addShutdownHook(new Thread(testcontainersFoundationDbDatabaseLifecycleManager::closeDatabase)); + testcontainersFoundationDbDatabaseLifecycleManager.initializeDatabase(fdb); + database = testcontainersFoundationDbDatabaseLifecycleManager.getDatabase(); + } + + return database; + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/MessageDispatcherIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/MessageDispatcherIntegrationTest.java index 845e88444..6ed00539e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/MessageDispatcherIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/MessageDispatcherIntegrationTest.java @@ -38,7 +38,6 @@ import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; -import org.signal.chat.messages.GetMessagesRequest; import org.signal.chat.messages.GetMessagesResponse; import org.whispersystems.textsecuregcm.auth.DisconnectionRequestManager; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; @@ -63,6 +62,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.ReportMessageManager; +import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore; import org.whispersystems.textsecuregcm.util.UUIDUtil; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -127,8 +127,8 @@ class MessageDispatcherIntegrationTest { messageDispatcher = new MessageDispatcher( mock(ReceiptSender.class), - new MessagesManager(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, reportMessageManager, - sharedExecutorService, Clock.systemUTC()), + new MessagesManager(messagesDynamoDb, messagesCache, mock(FoundationDbMessageStore.class), redisMessageAvailabilityManager, reportMessageManager, + sharedExecutorService, Clock.systemUTC(), mock(ExperimentEnrollmentManager.class)), new MessageMetrics(), mock(PushNotificationManager.class), mock(PushNotificationScheduler.class), diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStreamTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStreamTest.java new file mode 100644 index 000000000..c7f5c9acc --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStreamTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; +import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; +import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore; + +class DeletionMirroringRedisDynamoDbMessageStreamTest { + + private FoundationDbMessageStore foundationDbMessageStore; + private ExperimentEnrollmentManager experimentEnrollmentManager; + + private DeletionMirroringRedisDynamoDbMessageStream deletionMirroringRedisDynamoDbMessageStream; + + private static final AciServiceIdentifier ACCOUNT_IDENTIFIER = new AciServiceIdentifier(UUID.randomUUID()); + private static final byte DEVICE_ID = Device.PRIMARY_ID; + + @BeforeEach + void setUp() { + foundationDbMessageStore = mock(FoundationDbMessageStore.class); + experimentEnrollmentManager = mock(ExperimentEnrollmentManager.class); + + deletionMirroringRedisDynamoDbMessageStream = new DeletionMirroringRedisDynamoDbMessageStream( + mock(RedisDynamoDbMessageStream.class), + foundationDbMessageStore, + experimentEnrollmentManager, + Runnable::run, + ACCOUNT_IDENTIFIER.uuid(), + DEVICE_ID); + } + + @ParameterizedTest + @MethodSource + void acknowledgeMessage(final boolean enrolled, final UUID messageGuid, final boolean expectFoundationDbDeletion) { + when(experimentEnrollmentManager.isEnrolled(any(UUID.class), eq(MessagesManager.MIRROR_DELETIONS_EXPERIMENT_NAME))) + .thenReturn(enrolled); + + deletionMirroringRedisDynamoDbMessageStream.acknowledgeMessage(messageGuid, System.currentTimeMillis()); + + verify(foundationDbMessageStore, times(expectFoundationDbDeletion ? 1 : 0)) + .delete(ACCOUNT_IDENTIFIER, DEVICE_ID, messageGuid); + } + + private static List acknowledgeMessage() { + return List.of( + Arguments.argumentSet("Not enrolled, v4 UUID", false, UUID.randomUUID(), false), + Arguments.argumentSet("Not enrolled, v8 UUID", false, MessageGuidUtil.generateRandomV8UUID(), false), + Arguments.argumentSet("Enrolled, v4 UUID", true, UUID.randomUUID(), false), + Arguments.argumentSet("Enrolled, v8 UUID", true, MessageGuidUtil.generateRandomV8UUID(), true) + ); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessageGuidUtil.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessageGuidUtil.java new file mode 100644 index 000000000..f024e6309 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessageGuidUtil.java @@ -0,0 +1,26 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +public class MessageGuidUtil { + private static final long VERSION_MASK = 0x0000_0000_0000_f000L; + private static final long VERSION_8_BITS = 0x0000_0000_0000_8000L; + + public static UUID generateRandomV8UUID() { + long mostSignificantBits = ThreadLocalRandom.current().nextLong(); + + // Clear any bits in the version field + mostSignificantBits &= ~VERSION_MASK; + + // Set the version to 8 + mostSignificantBits |= VERSION_8_BITS; + + return new UUID(mostSignificantBits, ThreadLocalRandom.current().nextLong()); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index 833675da5..f815d42d0 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -36,11 +36,13 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagePersisterConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; import org.whispersystems.textsecuregcm.push.MessageAvailabilityListener; import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; +import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore; import org.whispersystems.textsecuregcm.tests.util.DevicesHelper; import org.whispersystems.textsecuregcm.util.UUIDUtil; import reactor.core.scheduler.Scheduler; @@ -95,10 +97,12 @@ class MessagePersisterIntegrationTest { final MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, + mock(FoundationDbMessageStore.class), mock(RedisMessageAvailabilityManager.class), mock(ReportMessageManager.class), messageDeletionExecutorService, - Clock.systemUTC()); + Clock.systemUTC(), + mock(ExperimentEnrollmentManager.class)); websocketConnectionEventExecutor = Executors.newVirtualThreadPerTaskExecutor(); asyncOperationQueueingExecutor = Executors.newSingleThreadExecutor(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java index 4e82e4160..29130d5e2 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java @@ -8,14 +8,17 @@ package org.whispersystems.textsecuregcm.storage; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyByte; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import com.apple.foundationdb.tuple.Versionstamp; import com.google.protobuf.ByteString; import java.nio.charset.StandardCharsets; import java.time.Instant; @@ -28,19 +31,23 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.signal.libsignal.protocol.InvalidMessageException; import org.signal.libsignal.protocol.InvalidVersionException; import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage; +import org.signal.libsignal.protocol.ServiceId; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.identity.PniServiceIdentifier; -import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; +import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore; import org.whispersystems.textsecuregcm.tests.util.MultiRecipientMessageHelper; import org.whispersystems.textsecuregcm.tests.util.TestRecipient; import org.whispersystems.textsecuregcm.util.TestClock; @@ -48,27 +55,55 @@ import reactor.core.publisher.Mono; class MessagesManagerTest { - private final MessagesDynamoDb messagesDynamoDb = mock(MessagesDynamoDb.class); - private final MessagesCache messagesCache = mock(MessagesCache.class); - private final ReportMessageManager reportMessageManager = mock(ReportMessageManager.class); + private MessagesDynamoDb messagesDynamoDb; + private MessagesCache messagesCache; + private FoundationDbMessageStore foundationDbMessageStore; + private ReportMessageManager reportMessageManager; + private ExperimentEnrollmentManager experimentEnrollmentManager; private static final TestClock CLOCK = TestClock.pinned(Instant.now()); - private final MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, - mock(RedisMessageAvailabilityManager.class), reportMessageManager, Executors.newSingleThreadExecutor(), CLOCK); + private MessagesManager messagesManager; @BeforeEach void setUp() { + messagesDynamoDb = mock(MessagesDynamoDb.class); + messagesCache = mock(MessagesCache.class); + foundationDbMessageStore = mock(FoundationDbMessageStore.class); + reportMessageManager = mock(ReportMessageManager.class); + experimentEnrollmentManager = mock(ExperimentEnrollmentManager.class); + when(messagesCache.insert(any(), any(), anyByte(), any())).thenReturn(CompletableFuture.completedFuture(true)); + + messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, foundationDbMessageStore, + mock(RedisMessageAvailabilityManager.class), reportMessageManager, Executors.newSingleThreadExecutor(), CLOCK, + experimentEnrollmentManager); } - @Test - void insert() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void insert(final boolean mirrorInsert) { final UUID sourceAci = UUID.randomUUID(); final Envelope message = Envelope.newBuilder() .setSourceServiceId(new AciServiceIdentifier(sourceAci).toCompactByteString()) .build(); + if (mirrorInsert) { + when(experimentEnrollmentManager.isEnrolled(any(UUID.class), eq(MessagesManager.MIRROR_INSERTS_EXPERIMENT_NAME))) + .thenReturn(true); + + when(foundationDbMessageStore.insert(any(), any())) + .thenAnswer(invocation -> { + final Map messagesByDeviceId = invocation.getArgument(1); + + return CompletableFuture.completedFuture(messagesByDeviceId.keySet().stream() + .collect(Collectors.toMap(deviceId -> deviceId, + _ -> new FoundationDbMessageStore.InsertResult(Optional.of(Versionstamp.fromBytes(new byte[12])), + Optional.of(MessageGuidUtil.generateRandomV8UUID()), + true)))); + }); + } + final UUID destinationUuid = UUID.randomUUID(); messagesManager.insert(destinationUuid, Map.of(Device.PRIMARY_ID, message)); @@ -82,14 +117,78 @@ class MessagesManagerTest { messagesManager.insert(destinationUuid, Map.of(Device.PRIMARY_ID, syncMessage)); verifyNoMoreInteractions(reportMessageManager); + + verify(foundationDbMessageStore, times(mirrorInsert ? 2 : 0)).insert(any(), any()); + verify(messagesCache, times(2)) + .insert(argThat(messageGuid -> messageGuid.version() == (mirrorInsert ? 8 : 4)), + eq(destinationUuid), + eq(Device.PRIMARY_ID), + any()); } @Test - void insertMultiRecipientMessage() throws InvalidMessageException, InvalidVersionException { - final ServiceIdentifier singleDeviceAccountAciServiceIdentifier = new AciServiceIdentifier(UUID.randomUUID()); - final ServiceIdentifier singleDeviceAccountPniServiceIdentifier = new PniServiceIdentifier(UUID.randomUUID()); - final ServiceIdentifier multiDeviceAccountAciServiceIdentifier = new AciServiceIdentifier(UUID.randomUUID()); - final ServiceIdentifier unresolvedAccountAciServiceIdentifier = new AciServiceIdentifier(UUID.randomUUID()); + void insertFoundationDbException() { + final UUID sourceAci = UUID.randomUUID(); + final Envelope message = Envelope.newBuilder() + .setSourceServiceId(new AciServiceIdentifier(sourceAci).toCompactByteString()) + .build(); + + when(experimentEnrollmentManager.isEnrolled(any(UUID.class), eq(MessagesManager.MIRROR_INSERTS_EXPERIMENT_NAME))) + .thenReturn(true); + + when(foundationDbMessageStore.insert(any(), any())) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException())); + + final UUID destinationUuid = UUID.randomUUID(); + + messagesManager.insert(destinationUuid, Map.of(Device.PRIMARY_ID, message)); + + verify(foundationDbMessageStore).insert(any(), any()); + verify(messagesCache) + .insert(argThat(messageGuid -> messageGuid.version() == 4), + eq(destinationUuid), + eq(Device.PRIMARY_ID), + any()); + } + + private static UUID generateRandomV8UUID() { + final long versionMask = 0x0000_0000_0000_f000L; + final long v8Version = 0x0000_0000_0000_8000L; + + long mostSignificantBits = ThreadLocalRandom.current().nextLong(); + + // Clear any bits in the version field + mostSignificantBits &= ~versionMask; + + // Set the version to 8 + mostSignificantBits |= v8Version; + + return new UUID(mostSignificantBits, ThreadLocalRandom.current().nextLong()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void insertMultiRecipientMessage(final boolean mirrorInsert) throws InvalidMessageException, InvalidVersionException { + if (mirrorInsert) { + when(experimentEnrollmentManager.isEnrolled(any(UUID.class), eq(MessagesManager.MIRROR_INSERTS_EXPERIMENT_NAME))) + .thenReturn(true); + + when(foundationDbMessageStore.insert(any(), any())) + .thenAnswer(invocation -> { + final Map messagesByDeviceId = invocation.getArgument(1); + + return CompletableFuture.completedFuture(messagesByDeviceId.keySet().stream() + .collect(Collectors.toMap(deviceId -> deviceId, + _ -> new FoundationDbMessageStore.InsertResult(Optional.of(Versionstamp.fromBytes(new byte[12])), + Optional.of(generateRandomV8UUID()), + true)))); + }); + } + + final AciServiceIdentifier singleDeviceAccountAciServiceIdentifier = new AciServiceIdentifier(UUID.randomUUID()); + final PniServiceIdentifier singleDeviceAccountPniServiceIdentifier = new PniServiceIdentifier(UUID.randomUUID()); + final AciServiceIdentifier multiDeviceAccountAciServiceIdentifier = new AciServiceIdentifier(UUID.randomUUID()); + final AciServiceIdentifier unresolvedAccountAciServiceIdentifier = new AciServiceIdentifier(UUID.randomUUID()); final Account singleDeviceAccount = mock(Account.class); final Account multiDeviceAccount = mock(Account.class); @@ -186,6 +285,38 @@ class MessagesManagerTest { eq(unresolvedAccountAciServiceIdentifier.uuid()), anyByte(), any()); + + if (mirrorInsert) { + final Envelope prototypeExpectedFoundationDbMessage = prototypeExpectedMessage.toBuilder().clearSharedMrmKey().build(); + + verify(foundationDbMessageStore).insert(singleDeviceAccountAciServiceIdentifier, + Map.of(Device.PRIMARY_ID, prototypeExpectedFoundationDbMessage.toBuilder() + .setDestinationServiceId(singleDeviceAccountAciServiceIdentifier.toCompactByteString()) + .setContent(ByteString.copyFrom(multiRecipientMessage.messageForRecipient(multiRecipientMessage.getRecipients().get(new ServiceId.Aci(singleDeviceAccountAciServiceIdentifier.uuid()))))) + .build())); + + verify(foundationDbMessageStore).insert(singleDeviceAccountAciServiceIdentifier, + Map.of(Device.PRIMARY_ID, prototypeExpectedFoundationDbMessage.toBuilder() + .setDestinationServiceId(singleDeviceAccountPniServiceIdentifier.toCompactByteString()) + .setContent(ByteString.copyFrom(multiRecipientMessage.messageForRecipient(multiRecipientMessage.getRecipients().get(new ServiceId.Pni(singleDeviceAccountPniServiceIdentifier.uuid()))))) + .build())); + + verify(foundationDbMessageStore).insert(multiDeviceAccountAciServiceIdentifier, + Map.of( + Device.PRIMARY_ID, prototypeExpectedFoundationDbMessage.toBuilder() + .setDestinationServiceId(multiDeviceAccountAciServiceIdentifier.toCompactByteString()) + .setContent(ByteString.copyFrom(multiRecipientMessage.messageForRecipient(multiRecipientMessage.getRecipients().get(new ServiceId.Aci(multiDeviceAccountAciServiceIdentifier.uuid()))))) + .build(), + (byte) (Device.PRIMARY_ID + 1), prototypeExpectedFoundationDbMessage.toBuilder() + .setDestinationServiceId(multiDeviceAccountAciServiceIdentifier.toCompactByteString()) + .setContent(ByteString.copyFrom(multiRecipientMessage.messageForRecipient(multiRecipientMessage.getRecipients().get(new ServiceId.Aci(multiDeviceAccountAciServiceIdentifier.uuid()))))) + .build()) + ); + + verify(foundationDbMessageStore, never()).insert(eq(unresolvedAccountAciServiceIdentifier), any()); + } else { + verifyNoInteractions(foundationDbMessageStore); + } } @ParameterizedTest diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/TestcontainersFoundationDbDatabaseLifecycleManager.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/TestcontainersFoundationDbDatabaseLifecycleManager.java index 18b57ff96..053f17c9f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/TestcontainersFoundationDbDatabaseLifecycleManager.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/TestcontainersFoundationDbDatabaseLifecycleManager.java @@ -12,7 +12,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.utility.DockerImageName; -class TestcontainersFoundationDbDatabaseLifecycleManager implements FoundationDbDatabaseLifecycleManager { +public class TestcontainersFoundationDbDatabaseLifecycleManager implements FoundationDbDatabaseLifecycleManager { private FoundationDBContainer foundationDBContainer; private Database database; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStoreTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStoreTest.java index 1d10c207e..03a603286 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStoreTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessageStoreTest.java @@ -452,6 +452,94 @@ class FoundationDbMessageStoreTest { Map.of(generateRandomAciForShard(0), Collections.emptyMap()))); } + @Test + void delete() { + final AciServiceIdentifier deletedMessageAci = new AciServiceIdentifier(UUID.randomUUID()); + final byte deletedMessageDeviceId = Device.PRIMARY_ID; + final MessageProtos.Envelope deletedMessage = generateRandomMessage(false); + + final AciServiceIdentifier retainedMessageAci = new AciServiceIdentifier(UUID.randomUUID()); + final byte retainedMessageDeviceId = Device.PRIMARY_ID; + final MessageProtos.Envelope retainedMessage = generateRandomMessage(false); + + final UUID deletedMessageGuid = + foundationDbMessageStore.insert(deletedMessageAci, Map.of(deletedMessageDeviceId, deletedMessage)).join() + .get(deletedMessageDeviceId).messageGuid().orElseThrow(); + + foundationDbMessageStore.insert(retainedMessageAci, Map.of(retainedMessageDeviceId, retainedMessage)).join(); + + assertArrayEquals(deletedMessage.toByteArray(), + getItemsInDeviceQueue(deletedMessageAci, deletedMessageDeviceId).getFirst().getValue()); + + assertArrayEquals(retainedMessage.toByteArray(), + getItemsInDeviceQueue(retainedMessageAci, retainedMessageDeviceId).getFirst().getValue()); + + foundationDbMessageStore.delete(deletedMessageAci, deletedMessageDeviceId, deletedMessageGuid); + + assertTrue(getItemsInDeviceQueue(deletedMessageAci, deletedMessageDeviceId).isEmpty()); + + assertArrayEquals(retainedMessage.toByteArray(), + getItemsInDeviceQueue(retainedMessageAci, retainedMessageDeviceId).getFirst().getValue()); + } + + @Test + void clearAllForAccount() { + final AciServiceIdentifier deletedAccountIdentifier = new AciServiceIdentifier(UUID.randomUUID()); + final byte deletedPrimaryDeviceId = Device.PRIMARY_ID; + final byte deletedLinkedDeviceId = deletedPrimaryDeviceId + 1; + + final AciServiceIdentifier retainedAccountIdentifier = new AciServiceIdentifier(UUID.randomUUID()); + final byte retainedPrimaryDeviceId = Device.PRIMARY_ID; + + foundationDbMessageStore.insert(deletedAccountIdentifier, Map.of( + deletedPrimaryDeviceId, generateRandomMessage(false), + deletedLinkedDeviceId, generateRandomMessage(false) + )).join(); + + foundationDbMessageStore.insert(retainedAccountIdentifier, Map.of( + retainedPrimaryDeviceId, generateRandomMessage(false) + )).join(); + + assertEquals(1, getItemsInDeviceQueue(deletedAccountIdentifier, deletedPrimaryDeviceId).size()); + assertEquals(1, getItemsInDeviceQueue(deletedAccountIdentifier, deletedLinkedDeviceId).size()); + assertEquals(1, getItemsInDeviceQueue(retainedAccountIdentifier, retainedPrimaryDeviceId).size()); + + foundationDbMessageStore.clearAll(deletedAccountIdentifier); + + assertEquals(0, getItemsInDeviceQueue(deletedAccountIdentifier, deletedPrimaryDeviceId).size()); + assertEquals(0, getItemsInDeviceQueue(deletedAccountIdentifier, deletedLinkedDeviceId).size()); + assertEquals(1, getItemsInDeviceQueue(retainedAccountIdentifier, retainedPrimaryDeviceId).size()); + } + + @Test + void clearAllForDevice() { + final AciServiceIdentifier targetedAccountIdentifier = new AciServiceIdentifier(UUID.randomUUID()); + final byte targetedAccountRetainedDeviceId = Device.PRIMARY_ID; + final byte targetedAccountDeletedDeviceId = targetedAccountRetainedDeviceId + 1; + + final AciServiceIdentifier untargetedAccountIdentifier = new AciServiceIdentifier(UUID.randomUUID()); + final byte untargetedAccountPrimaryDeviceId = Device.PRIMARY_ID; + + foundationDbMessageStore.insert(targetedAccountIdentifier, Map.of( + targetedAccountRetainedDeviceId, generateRandomMessage(false), + targetedAccountDeletedDeviceId, generateRandomMessage(false) + )).join(); + + foundationDbMessageStore.insert(untargetedAccountIdentifier, Map.of( + untargetedAccountPrimaryDeviceId, generateRandomMessage(false) + )).join(); + + assertEquals(1, getItemsInDeviceQueue(targetedAccountIdentifier, targetedAccountRetainedDeviceId).size()); + assertEquals(1, getItemsInDeviceQueue(targetedAccountIdentifier, targetedAccountDeletedDeviceId).size()); + assertEquals(1, getItemsInDeviceQueue(untargetedAccountIdentifier, untargetedAccountPrimaryDeviceId).size()); + + foundationDbMessageStore.clearAll(targetedAccountIdentifier, targetedAccountDeletedDeviceId); + + assertEquals(1, getItemsInDeviceQueue(targetedAccountIdentifier, targetedAccountRetainedDeviceId).size()); + assertEquals(0, getItemsInDeviceQueue(targetedAccountIdentifier, targetedAccountDeletedDeviceId).size()); + assertEquals(1, getItemsInDeviceQueue(untargetedAccountIdentifier, untargetedAccountPrimaryDeviceId).size()); + } + @ParameterizedTest @MethodSource void getMessages(final int numMessages, final int batchSize) { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 2329419c5..f7d222414 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -68,6 +68,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.ReportMessageManager; +import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore; import org.whispersystems.textsecuregcm.util.UUIDUtil; import org.whispersystems.websocket.WebSocketClient; import org.whispersystems.websocket.messages.WebSocketResponseMessage; @@ -149,7 +150,7 @@ class WebSocketConnectionIntegrationTest { void testProcessStoredMessages(final int persistedMessageCount, final int cachedMessageCount) { final WebSocketConnection webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), - new MessagesManager(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, reportMessageManager, sharedExecutorService, Clock.systemUTC()), + new MessagesManager(messagesDynamoDb, messagesCache, mock(FoundationDbMessageStore.class), redisMessageAvailabilityManager, reportMessageManager, sharedExecutorService, Clock.systemUTC(), mock(ExperimentEnrollmentManager.class)), new MessageMetrics(), mock(PushNotificationManager.class), mock(PushNotificationScheduler.class), @@ -224,7 +225,7 @@ class WebSocketConnectionIntegrationTest { void testProcessStoredMessagesMultipleSegments() { final WebSocketConnection webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), - new MessagesManager(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, reportMessageManager, sharedExecutorService, Clock.systemUTC()), + new MessagesManager(messagesDynamoDb, messagesCache, mock(FoundationDbMessageStore.class), redisMessageAvailabilityManager, reportMessageManager, sharedExecutorService, Clock.systemUTC(), mock(ExperimentEnrollmentManager.class)), new MessageMetrics(), mock(PushNotificationManager.class), mock(PushNotificationScheduler.class), @@ -319,7 +320,7 @@ class WebSocketConnectionIntegrationTest { void testProcessStoredMessagesClientClosed() { final WebSocketConnection webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), - new MessagesManager(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, reportMessageManager, sharedExecutorService, Clock.systemUTC()), + new MessagesManager(messagesDynamoDb, messagesCache, mock(FoundationDbMessageStore.class), redisMessageAvailabilityManager, reportMessageManager, sharedExecutorService, Clock.systemUTC(), mock(ExperimentEnrollmentManager.class)), new MessageMetrics(), mock(PushNotificationManager.class), mock(PushNotificationScheduler.class), diff --git a/service/src/test/resources/META-INF/services/org.whispersystems.textsecuregcm.configuration.FoundationDbDatabaseFactory b/service/src/test/resources/META-INF/services/org.whispersystems.textsecuregcm.configuration.FoundationDbDatabaseFactory new file mode 100644 index 000000000..3643ecb93 --- /dev/null +++ b/service/src/test/resources/META-INF/services/org.whispersystems.textsecuregcm.configuration.FoundationDbDatabaseFactory @@ -0,0 +1 @@ +org.whispersystems.textsecuregcm.configuration.LocalFoundationDbDatabaseFactory diff --git a/service/src/test/resources/config/test-secrets-bundle.yml b/service/src/test/resources/config/test-secrets-bundle.yml index 51a7e3a44..edb79f9a0 100644 --- a/service/src/test/resources/config/test-secrets-bundle.yml +++ b/service/src/test/resources/config/test-secrets-bundle.yml @@ -175,3 +175,5 @@ tlsKeyStore.password: unset hlrLookup.apiKey: AAAAAAAAAAA hlrLookup.apiSecret: AAAAAAAAAAA + +foundationDbMessages.versionstampCipherKey.0: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= diff --git a/service/src/test/resources/config/test.yml b/service/src/test/resources/config/test.yml index 9e2ccfaf2..03d81dd16 100644 --- a/service/src/test/resources/config/test.yml +++ b/service/src/test/resources/config/test.yml @@ -555,3 +555,15 @@ server: - type: h2c port: 8080 useForwardedHeaders: true + +foundationDbMessages: + maxWatchesPerClient: 10000 + versionstampCipherKeys: + 0: secret://foundationDbMessages.versionstampCipherKey.0 + currentVersionstampCipherKey: 0 + clusters: + "messages-0": + type: local + epochs: + 0: + - messages-0