From 3cfe8b9f6c092092018a7500f409de34aca0e107 Mon Sep 17 00:00:00 2001
From: Jon Chambers <63609320+jon-signal@users.noreply.github.com>
Date: Thu, 18 Jun 2026 08:27:03 -0400
Subject: [PATCH] Mirror message writes/deletions to FoundationDB
---
.github/workflows/test.yml | 2 +-
service/config/sample-secrets-bundle.yml | 2 +
service/config/sample.yml | 24 +++
service/pom.xml | 3 +-
.../WhisperServerConfiguration.java | 10 ++
.../textsecuregcm/WhisperServerService.java | 56 ++++++-
.../FoundationDbClusterConfiguration.java | 62 ++++++-
.../FoundationDbDatabaseFactory.java | 18 ++
.../FoundationDbMessagesConfiguration.java | 15 +-
.../textsecuregcm/experiment/Experiment.java | 4 +
...onMirroringRedisDynamoDbMessageStream.java | 73 ++++++++
.../storage/MessagesManager.java | 136 ++++++++++++---
.../FoundationDbMessageStore.java | 82 ++++++++-
.../workers/CommandDependencies.java | 51 +++++-
.../io.dropwizard.jackson.Discoverable | 1 +
.../FoundationDbClusterConfigurationTest.java | 40 -----
...FoundationDbMessagesConfigurationTest.java | 65 ++++++--
.../LocalFoundationDbDatabaseFactory.java | 38 +++++
.../MessageDispatcherIntegrationTest.java | 6 +-
...rroringRedisDynamoDbMessageStreamTest.java | 69 ++++++++
.../storage/MessageGuidUtil.java | 26 +++
.../MessagePersisterIntegrationTest.java | 6 +-
.../storage/MessagesManagerTest.java | 157 ++++++++++++++++--
...sFoundationDbDatabaseLifecycleManager.java | 2 +-
.../FoundationDbMessageStoreTest.java | 88 ++++++++++
.../WebSocketConnectionIntegrationTest.java | 7 +-
....configuration.FoundationDbDatabaseFactory | 1 +
.../resources/config/test-secrets-bundle.yml | 2 +
service/src/test/resources/config/test.yml | 12 ++
29 files changed, 941 insertions(+), 117 deletions(-)
create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbDatabaseFactory.java
create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStream.java
delete mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/configuration/FoundationDbClusterConfigurationTest.java
create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFoundationDbDatabaseFactory.java
create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStreamTest.java
create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/storage/MessageGuidUtil.java
create mode 100644 service/src/test/resources/META-INF/services/org.whispersystems.textsecuregcm.configuration.FoundationDbDatabaseFactory
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 2034586b5..f2fc93b12 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -61,7 +61,7 @@ jobs:
- name: Download and install FoundationDB client
run: |
./mvnw -e -B -Pexclude-spam-filter clean prepare-package -DskipTests=true
- cp service/target/jib-extra/usr/lib/libfdb_c.x86_64.so /usr/lib/libfdb_c.x86_64.so
+ cp service/target/jib-extra/usr/lib/libfdb_c.so /usr/lib/libfdb_c.x86_64.so
ldconfig
- name: Build with Maven
run: ./mvnw -e -B clean verify -DfoundationDb.serviceContainerNamePrefix=foundationdb
diff --git a/service/config/sample-secrets-bundle.yml b/service/config/sample-secrets-bundle.yml
index f8c3c4a7d..c0612bd26 100644
--- a/service/config/sample-secrets-bundle.yml
+++ b/service/config/sample-secrets-bundle.yml
@@ -100,3 +100,5 @@ tlsKeyStore.password: unset
hlrLookup.apiKey: AAAAAAAAAAA
hlrLookup.apiSecret: AAAAAAAAAAA
+
+foundationDbMessages.versionstampCipherKey.0: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=
diff --git a/service/config/sample.yml b/service/config/sample.yml
index ae44836d9..1f507db64 100644
--- a/service/config/sample.yml
+++ b/service/config/sample.yml
@@ -538,3 +538,27 @@ callQualitySurvey:
hlrLookup:
apiKey: secret://hlrLookup.apiKey
apiSecret: secret://hlrLookup.apiSecret
+
+foundationDbMessages:
+ maxWatchesPerClient: 10000
+ versionstampCipherKeys:
+ 0: secret://foundationDbMessages.versionstampCipherKey.0
+ currentVersionstampCipherKey: 0
+ clusters:
+ "messages-0":
+ clusterFileUrl: http://clusterfiles.example.com/messages-0
+ "messages-1":
+ clusterFileUrl: http://clusterfiles.example.com/messages-1
+ "messages-2":
+ clusterFileUrl: http://clusterfiles.example.com/messages-2
+ "messages-3":
+ clusterFileUrl: http://clusterfiles.example.com/messages-3
+ epochs:
+ 0:
+ - messages-0
+ - messages-1
+ 1:
+ - messages-0
+ - messages-1
+ - messages-2
+ - messages-3
diff --git a/service/pom.xml b/service/pom.xml
index ff14a20aa..e1e2f1436 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -617,7 +617,7 @@
io.github.download-maven-plugin
download-maven-plugin
- 2.0.0
+ 2.1.0
@@ -632,6 +632,7 @@
https://github.com/apple/foundationdb/releases/download/${foundationdb.version}/libfdb_c.x86_64.so
${project.build.directory}/jib-extra/usr/lib
+ libfdb_c.so
${foundationdb.client-library-sha256}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
index 7f0dc2d0f..c0098ebaa 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
@@ -34,6 +34,7 @@ import org.whispersystems.textsecuregcm.configuration.ExternalRequestFilterConfi
import org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClientFactory;
import org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClusterFactory;
import org.whispersystems.textsecuregcm.configuration.FcmConfiguration;
+import org.whispersystems.textsecuregcm.configuration.FoundationDbMessagesConfiguration;
import org.whispersystems.textsecuregcm.configuration.GcpAttachmentsConfiguration;
import org.whispersystems.textsecuregcm.configuration.GenericZkConfig;
import org.whispersystems.textsecuregcm.configuration.GooglePlayBillingConfiguration;
@@ -361,6 +362,11 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private ChangeNumberConfiguration changeNumber = new ChangeNumberConfiguration(Duration.ofHours(1));
+ @Valid
+ @NotNull
+ @JsonProperty
+ private FoundationDbMessagesConfiguration foundationDbMessages;
+
public TlsKeyStoreConfiguration getTlsKeyStoreConfiguration() {
return tlsKeyStore;
}
@@ -600,4 +606,8 @@ public class WhisperServerConfiguration extends Configuration {
public ChangeNumberConfiguration getChangeNumber() {
return changeNumber;
}
+
+ public FoundationDbMessagesConfiguration getFoundationDbMessagesConfiguration() {
+ return foundationDbMessages;
+ }
}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
index fd6753d92..d082d9241 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
@@ -7,6 +7,8 @@ package org.whispersystems.textsecuregcm;
import static java.util.Objects.requireNonNull;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
+import com.apple.foundationdb.Database;
+import com.apple.foundationdb.FDB;
import com.google.common.collect.Lists;
import com.webauthn4j.appattest.DeviceCheckManager;
import io.dropwizard.auth.AuthDynamicFeature;
@@ -44,6 +46,8 @@ import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.Mapping;
import jakarta.servlet.DispatcherType;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.http.HttpClient;
@@ -53,6 +57,7 @@ import java.time.Duration;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
@@ -63,7 +68,9 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
+import javax.annotation.Nullable;
import org.eclipse.jetty.ee10.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.server.WebSocketServerComponents;
@@ -174,8 +181,8 @@ import org.whispersystems.textsecuregcm.grpc.ProfileAnonymousGrpcService;
import org.whispersystems.textsecuregcm.grpc.ProfileGrpcService;
import org.whispersystems.textsecuregcm.grpc.RequestAttributesInterceptor;
import org.whispersystems.textsecuregcm.grpc.ValidatingInterceptor;
-import org.whispersystems.textsecuregcm.grpc.net.ManagedGrpcServer;
import org.whispersystems.textsecuregcm.grpc.net.ManagedEventLoopGroup;
+import org.whispersystems.textsecuregcm.grpc.net.ManagedGrpcServer;
import org.whispersystems.textsecuregcm.grpc.net.OmnibusH2Server;
import org.whispersystems.textsecuregcm.grpc.net.OmnibusRouter;
import org.whispersystems.textsecuregcm.grpc.net.SniMapper;
@@ -245,6 +252,7 @@ import org.whispersystems.textsecuregcm.storage.ChangeNumberWaitingPeriods;
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
import org.whispersystems.textsecuregcm.storage.ClientReleases;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
+import org.whispersystems.textsecuregcm.storage.FoundationDbVersion;
import org.whispersystems.textsecuregcm.storage.IssuedReceiptsManager;
import org.whispersystems.textsecuregcm.storage.KeysManager;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
@@ -275,6 +283,8 @@ import org.whispersystems.textsecuregcm.storage.VerificationSessions;
import org.whispersystems.textsecuregcm.storage.devicecheck.AppleDeviceCheckManager;
import org.whispersystems.textsecuregcm.storage.devicecheck.AppleDeviceCheckTrustAnchor;
import org.whispersystems.textsecuregcm.storage.devicecheck.AppleDeviceChecks;
+import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
+import org.whispersystems.textsecuregcm.storage.foundationdb.VersionstampUUIDCipher;
import org.whispersystems.textsecuregcm.subscriptions.AppleAppStoreClient;
import org.whispersystems.textsecuregcm.subscriptions.AppleAppStoreManager;
import org.whispersystems.textsecuregcm.subscriptions.BankMandateTranslator;
@@ -331,7 +341,6 @@ import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
-import javax.annotation.Nullable;
public class WhisperServerService extends Application {
@@ -460,6 +469,39 @@ public class WhisperServerService extends Application> messageDatabasesByEpoch;
+ {
+ final Map databasesByName =
+ config.getFoundationDbMessagesConfiguration().clusters().entrySet().stream()
+ .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey,
+ entry -> {
+ try {
+ final Database database = entry.getValue().build(fdb);
+ database.options().setMaxWatches(config.getFoundationDbMessagesConfiguration().maxWatchesPerClient());
+
+ return database;
+ } catch (final IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }));
+
+ messageDatabasesByEpoch = config.getFoundationDbMessagesConfiguration().epochs().entrySet().stream()
+ .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey,
+ entry -> entry.getValue().stream()
+ .map(databasesByName::get)
+ .toList()));
+ }
+
final AwsCredentialsProvider cdnCredentialsProvider = config.getCdnConfiguration().credentials().build();
final S3AsyncClient asyncCdnS3Client = S3AsyncClient.builder()
.credentialsProvider(cdnCredentialsProvider)
@@ -703,6 +745,11 @@ public class WhisperServerService extends Application new File(clusterFileUri);
+
+ case "http", "https" -> {
+ try (final HttpClient clusterFileClient = HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_1_1)
+ .connectTimeout(Duration.ofSeconds(10))
+ .build()) {
+
+ final HttpResponse response = clusterFileClient.send(HttpRequest.newBuilder()
+ .uri(URI.create(clusterFileUrl()))
+ .timeout(Duration.ofSeconds(10))
+ .GET()
+ .build(), HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() != 200) {
+ throw new IOException("Could not load cluster file (status " + response.statusCode() + ")");
+ }
+
+ final File tempClusterFile = File.createTempFile("fdb.cluster-", "");
+ tempClusterFile.deleteOnExit();
+
+ try (final FileWriter fileWriter = new FileWriter(tempClusterFile)) {
+ fileWriter.write(response.body());
+ }
+
+ yield tempClusterFile;
+ } catch (final InterruptedException e) {
+ throw new IOException("Interrupted while waiting for cluster file response", e);
+ }
+ }
+
+ default -> throw new IllegalArgumentException("Unrecognized cluster file URI scheme: " + clusterFileUri.getScheme());
+ };
+
+ return fdb.open(clusterFile.getAbsolutePath());
}
}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbDatabaseFactory.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbDatabaseFactory.java
new file mode 100644
index 000000000..4b5e76204
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbDatabaseFactory.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+package org.whispersystems.textsecuregcm.configuration;
+
+import com.apple.foundationdb.Database;
+import com.apple.foundationdb.FDB;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.dropwizard.jackson.Discoverable;
+import java.io.IOException;
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FoundationDbClusterConfiguration.class)
+public interface FoundationDbDatabaseFactory extends Discoverable {
+
+ Database build(final FDB fdb) throws IOException;
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfiguration.java
index d9d020344..b28ccb355 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfiguration.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/FoundationDbMessagesConfiguration.java
@@ -14,11 +14,17 @@ import jakarta.validation.constraints.Size;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import org.whispersystems.textsecuregcm.configuration.secrets.SecretBytes;
import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
-public record FoundationDbMessagesConfiguration(@NotEmpty Map clusters,
+public record FoundationDbMessagesConfiguration(@NotEmpty Map clusters,
@NotEmpty Map<@PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) Integer, @Size(min = 1, max = FoundationDbMessageStore.MAX_SHARDS - 1) List> epochs,
- @PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) int activeEpoch) {
+ @PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) int activeEpoch,
+ @NotEmpty Map<@PositiveOrZero @Max(63) Integer, SecretBytes> versionstampCipherKeys,
+ @PositiveOrZero @Max(63) int currentVersionstampCipherKey,
+ @PositiveOrZero @Max(1_000_000) long maxWatchesPerClient) {
+
+ public static final long DEFAULT_MAX_WATCHES_PER_CLIENT = 10_000;
@AssertTrue
boolean isEveryEpochClusterConfigured() {
@@ -48,4 +54,9 @@ public record FoundationDbMessagesConfiguration(@NotEmpty Map void compareResult(final T expected, final T experimentResult) {
+ recordResult(expected, experimentResult, Timer.start());
+ }
+
public void compareMonoResult(final T expected, final Mono experimentMono) {
final Timer.Sample sample = Timer.start();
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStream.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStream.java
new file mode 100644
index 000000000..629867a27
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletionMirroringRedisDynamoDbMessageStream.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+package org.whispersystems.textsecuregcm.storage;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Flow;
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
+import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
+import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
+
+/// A temporary message stream that can mirror message acknowledgements (deletion requests) to FoundationDB
+public class DeletionMirroringRedisDynamoDbMessageStream implements MessageStream {
+
+ private final RedisDynamoDbMessageStream redisDynamoDbMessageStream;
+ private final FoundationDbMessageStore foundationDbMessageStore;
+
+ private final ExperimentEnrollmentManager experimentEnrollmentManager;
+ private final Executor messageDeletionExecutor;
+
+ private final AciServiceIdentifier accountIdentifier;
+ private final byte deviceId;
+
+ private static final Logger logger = LoggerFactory.getLogger(DeletionMirroringRedisDynamoDbMessageStream.class);
+
+ public DeletionMirroringRedisDynamoDbMessageStream(final RedisDynamoDbMessageStream redisDynamoDbMessageStream,
+ final FoundationDbMessageStore foundationDbMessageStore,
+ final ExperimentEnrollmentManager experimentEnrollmentManager,
+ final Executor messageDeletionExecutor,
+ final UUID accountIdentifier,
+ final byte deviceId) {
+
+ this.redisDynamoDbMessageStream = redisDynamoDbMessageStream;
+ this.foundationDbMessageStore = foundationDbMessageStore;
+
+ this.experimentEnrollmentManager = experimentEnrollmentManager;
+ this.messageDeletionExecutor = messageDeletionExecutor;
+
+ this.accountIdentifier = new AciServiceIdentifier(accountIdentifier);
+ this.deviceId = deviceId;
+ }
+
+ @Override
+ public Flow.Publisher getMessages() {
+ return redisDynamoDbMessageStream.getMessages();
+ }
+
+ @Override
+ public CompletableFuture acknowledgeMessage(final UUID messageGuid, final long serverTimestamp) {
+ // All messages stored in FoundationDB use version 8 UUIDs; if a message has a version 4 UUID, then it only exists
+ // in Redis/DynamoDB
+ if (messageGuid.version() == 8 &&
+ experimentEnrollmentManager.isEnrolled(accountIdentifier.uuid(), MessagesManager.MIRROR_DELETIONS_EXPERIMENT_NAME)) {
+
+ messageDeletionExecutor.execute(() -> {
+ try {
+ foundationDbMessageStore.delete(accountIdentifier, deviceId, messageGuid);
+ } catch (final Exception e) {
+ logger.warn("Failed to delete message {}/{}/{} from FoundationDb", accountIdentifier.uuid(), deviceId, messageGuid, e);
+ }
+ });
+ }
+
+ return redisDynamoDbMessageStream.acknowledgeMessage(messageGuid, serverTimestamp);
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java
index 1296f680e..a1f3e269b 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java
@@ -6,11 +6,14 @@ package org.whispersystems.textsecuregcm.storage;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.time.Clock;
+import java.time.Duration;
import java.time.Instant;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -28,10 +31,13 @@ import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
+import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
+import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager;
+import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Flux;
@@ -52,27 +58,44 @@ public class MessagesManager {
private static final String MAY_HAVE_MESSAGES_COUNTER_NAME =
MetricsUtil.name(MessagesManager.class, "mayHaveMessages");
+ private static final String PRESENCE_MATCH_COUNTER_NAME =
+ MetricsUtil.name(MessagesManager.class, "presenceMatch");
+
+ @VisibleForTesting
+ static final String MIRROR_INSERTS_EXPERIMENT_NAME = "foundationDbMirrorInserts";
+
+ @VisibleForTesting
+ static final String MIRROR_DELETIONS_EXPERIMENT_NAME = "foundationDbMirrorDeletions";
+
+ private static final long FOUNDATIONDB_INSERT_TIMEOUT_MILLIS = Duration.ofSeconds(2).toMillis();
+
private final MessagesDynamoDb messagesDynamoDb;
private final MessagesCache messagesCache;
+ private final FoundationDbMessageStore foundationDbMessageStore;
private final RedisMessageAvailabilityManager redisMessageAvailabilityManager;
private final ReportMessageManager reportMessageManager;
private final ExecutorService messageDeletionExecutor;
private final Clock clock;
+ private final ExperimentEnrollmentManager experimentEnrollmentManager;
public MessagesManager(
final MessagesDynamoDb messagesDynamoDb,
final MessagesCache messagesCache,
+ final FoundationDbMessageStore foundationDbMessageStore,
final RedisMessageAvailabilityManager redisMessageAvailabilityManager,
final ReportMessageManager reportMessageManager,
final ExecutorService messageDeletionExecutor,
- final Clock clock) {
+ final Clock clock,
+ final ExperimentEnrollmentManager experimentEnrollmentManager) {
this.messagesDynamoDb = messagesDynamoDb;
this.messagesCache = messagesCache;
+ this.foundationDbMessageStore = foundationDbMessageStore;
this.redisMessageAvailabilityManager = redisMessageAvailabilityManager;
this.reportMessageManager = reportMessageManager;
this.messageDeletionExecutor = messageDeletionExecutor;
this.clock = clock;
+ this.experimentEnrollmentManager = experimentEnrollmentManager;
}
/**
@@ -92,28 +115,68 @@ public class MessagesManager {
private CompletableFuture