Mirror message writes/deletions to FoundationDB

This commit is contained in:
Jon Chambers 2026-06-18 08:27:03 -04:00 committed by GitHub
parent 691a4162e1
commit 3cfe8b9f6c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 941 additions and 117 deletions

View File

@ -61,7 +61,7 @@ jobs:
- name: Download and install FoundationDB client
run: |
./mvnw -e -B -Pexclude-spam-filter clean prepare-package -DskipTests=true
cp service/target/jib-extra/usr/lib/libfdb_c.x86_64.so /usr/lib/libfdb_c.x86_64.so
cp service/target/jib-extra/usr/lib/libfdb_c.so /usr/lib/libfdb_c.x86_64.so
ldconfig
- name: Build with Maven
run: ./mvnw -e -B clean verify -DfoundationDb.serviceContainerNamePrefix=foundationdb

View File

@ -100,3 +100,5 @@ tlsKeyStore.password: unset
hlrLookup.apiKey: AAAAAAAAAAA
hlrLookup.apiSecret: AAAAAAAAAAA
foundationDbMessages.versionstampCipherKey.0: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=

View File

@ -538,3 +538,27 @@ callQualitySurvey:
hlrLookup:
apiKey: secret://hlrLookup.apiKey
apiSecret: secret://hlrLookup.apiSecret
foundationDbMessages:
maxWatchesPerClient: 10000
versionstampCipherKeys:
0: secret://foundationDbMessages.versionstampCipherKey.0
currentVersionstampCipherKey: 0
clusters:
"messages-0":
clusterFileUrl: http://clusterfiles.example.com/messages-0
"messages-1":
clusterFileUrl: http://clusterfiles.example.com/messages-1
"messages-2":
clusterFileUrl: http://clusterfiles.example.com/messages-2
"messages-3":
clusterFileUrl: http://clusterfiles.example.com/messages-3
epochs:
0:
- messages-0
- messages-1
1:
- messages-0
- messages-1
- messages-2
- messages-3

View File

@ -617,7 +617,7 @@
<plugin>
<groupId>io.github.download-maven-plugin</groupId>
<artifactId>download-maven-plugin</artifactId>
<version>2.0.0</version>
<version>2.1.0</version>
<executions>
<execution>
@ -632,6 +632,7 @@
<configuration>
<url>https://github.com/apple/foundationdb/releases/download/${foundationdb.version}/libfdb_c.x86_64.so</url>
<outputDirectory>${project.build.directory}/jib-extra/usr/lib</outputDirectory>
<outputFileName>libfdb_c.so</outputFileName>
<sha256>${foundationdb.client-library-sha256}</sha256>
</configuration>
</plugin>

View File

@ -34,6 +34,7 @@ import org.whispersystems.textsecuregcm.configuration.ExternalRequestFilterConfi
import org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClientFactory;
import org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClusterFactory;
import org.whispersystems.textsecuregcm.configuration.FcmConfiguration;
import org.whispersystems.textsecuregcm.configuration.FoundationDbMessagesConfiguration;
import org.whispersystems.textsecuregcm.configuration.GcpAttachmentsConfiguration;
import org.whispersystems.textsecuregcm.configuration.GenericZkConfig;
import org.whispersystems.textsecuregcm.configuration.GooglePlayBillingConfiguration;
@ -361,6 +362,11 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private ChangeNumberConfiguration changeNumber = new ChangeNumberConfiguration(Duration.ofHours(1));
@Valid
@NotNull
@JsonProperty
private FoundationDbMessagesConfiguration foundationDbMessages;
public TlsKeyStoreConfiguration getTlsKeyStoreConfiguration() {
return tlsKeyStore;
}
@ -600,4 +606,8 @@ public class WhisperServerConfiguration extends Configuration {
public ChangeNumberConfiguration getChangeNumber() {
return changeNumber;
}
public FoundationDbMessagesConfiguration getFoundationDbMessagesConfiguration() {
return foundationDbMessages;
}
}

View File

@ -7,6 +7,8 @@ package org.whispersystems.textsecuregcm;
import static java.util.Objects.requireNonNull;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
import com.google.common.collect.Lists;
import com.webauthn4j.appattest.DeviceCheckManager;
import io.dropwizard.auth.AuthDynamicFeature;
@ -44,6 +46,8 @@ import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.Mapping;
import jakarta.servlet.DispatcherType;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.http.HttpClient;
@ -53,6 +57,7 @@ import java.time.Duration;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
@ -63,7 +68,9 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.jetty.ee10.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.server.WebSocketServerComponents;
@ -174,8 +181,8 @@ import org.whispersystems.textsecuregcm.grpc.ProfileAnonymousGrpcService;
import org.whispersystems.textsecuregcm.grpc.ProfileGrpcService;
import org.whispersystems.textsecuregcm.grpc.RequestAttributesInterceptor;
import org.whispersystems.textsecuregcm.grpc.ValidatingInterceptor;
import org.whispersystems.textsecuregcm.grpc.net.ManagedGrpcServer;
import org.whispersystems.textsecuregcm.grpc.net.ManagedEventLoopGroup;
import org.whispersystems.textsecuregcm.grpc.net.ManagedGrpcServer;
import org.whispersystems.textsecuregcm.grpc.net.OmnibusH2Server;
import org.whispersystems.textsecuregcm.grpc.net.OmnibusRouter;
import org.whispersystems.textsecuregcm.grpc.net.SniMapper;
@ -245,6 +252,7 @@ import org.whispersystems.textsecuregcm.storage.ChangeNumberWaitingPeriods;
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
import org.whispersystems.textsecuregcm.storage.ClientReleases;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.FoundationDbVersion;
import org.whispersystems.textsecuregcm.storage.IssuedReceiptsManager;
import org.whispersystems.textsecuregcm.storage.KeysManager;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
@ -275,6 +283,8 @@ import org.whispersystems.textsecuregcm.storage.VerificationSessions;
import org.whispersystems.textsecuregcm.storage.devicecheck.AppleDeviceCheckManager;
import org.whispersystems.textsecuregcm.storage.devicecheck.AppleDeviceCheckTrustAnchor;
import org.whispersystems.textsecuregcm.storage.devicecheck.AppleDeviceChecks;
import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
import org.whispersystems.textsecuregcm.storage.foundationdb.VersionstampUUIDCipher;
import org.whispersystems.textsecuregcm.subscriptions.AppleAppStoreClient;
import org.whispersystems.textsecuregcm.subscriptions.AppleAppStoreManager;
import org.whispersystems.textsecuregcm.subscriptions.BankMandateTranslator;
@ -331,7 +341,6 @@ import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import javax.annotation.Nullable;
public class WhisperServerService extends Application<WhisperServerConfiguration> {
@ -460,6 +469,39 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final DynamoDbClient dynamoDbClient = config.getDynamoDbClientConfiguration()
.buildSyncClient(awsCredentialsProvider, new MicrometerAwsSdkMetricPublisher(awsSdkMetricsExecutor, "dynamoDbSync"));
final FDB fdb = FDB.selectAPIVersion(FoundationDbVersion.getFoundationDbApiVersion());
// Jetty and the FoundationDB client both register shutdown hooks to begin shutdown/cleanup operations. There isn't
// a good way to coordinate or enforce ordering between shutdown hooks, and so the two processes will race.
// Generally, FoundationDB will shut down before Jetty does, meaning we'll still be trying to serve requests that
// require talking to FoundationDB even though FoundationDB has shut down. To avoid that scenario, we disabled
// FoundationDB's shutdown hook and let the JVM terminate its (daemon) threads at exit. This isn't as graceful as
// we'd like, but is the least bad option given current constraints.
fdb.disableShutdownHook();
final Map<Integer, List<Database>> messageDatabasesByEpoch;
{
final Map<String, Database> databasesByName =
config.getFoundationDbMessagesConfiguration().clusters().entrySet().stream()
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey,
entry -> {
try {
final Database database = entry.getValue().build(fdb);
database.options().setMaxWatches(config.getFoundationDbMessagesConfiguration().maxWatchesPerClient());
return database;
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}));
messageDatabasesByEpoch = config.getFoundationDbMessagesConfiguration().epochs().entrySet().stream()
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey,
entry -> entry.getValue().stream()
.map(databasesByName::get)
.toList()));
}
final AwsCredentialsProvider cdnCredentialsProvider = config.getCdnConfiguration().credentials().build();
final S3AsyncClient asyncCdnS3Client = S3AsyncClient.builder()
.credentialsProvider(cdnCredentialsProvider)
@ -703,6 +745,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
config.getCdnConfiguration().bucket());
MessagesCache messagesCache = new MessagesCache(messagesCluster, messageDeliveryScheduler,
messageDeletionAsyncExecutor, retryExecutor, clock);
final FoundationDbMessageStore foundationDbMessageStore = new FoundationDbMessageStore(messageDatabasesByEpoch,
config.getFoundationDbMessagesConfiguration().activeEpoch(),
new VersionstampUUIDCipher(config.getFoundationDbMessagesConfiguration().currentVersionstampCipherKey(),
config.getFoundationDbMessagesConfiguration().versionstampCipherKeys().get(config.getFoundationDbMessagesConfiguration().currentVersionstampCipherKey()).value()),
Clock.systemUTC());
ClientReleaseManager clientReleaseManager = new ClientReleaseManager(clientReleases,
recurringJobExecutor,
config.getClientReleaseConfiguration().refreshInterval(),
@ -711,8 +758,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
config.getReportMessageConfiguration().getCounterTtl());
RedisMessageAvailabilityManager redisMessageAvailabilityManager =
new RedisMessageAvailabilityManager(messagesCluster, clientEventExecutor, asyncOperationQueueingExecutor);
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager,
reportMessageManager, messageDeletionAsyncExecutor, Clock.systemUTC());
MessagesManager messagesManager =
new MessagesManager(messagesDynamoDb, messagesCache, foundationDbMessageStore, redisMessageAvailabilityManager,
reportMessageManager, messageDeletionAsyncExecutor, Clock.systemUTC(), experimentEnrollmentManager);
final ChangeNumberWaitingPeriods changeNumberWaitingPeriods = new ChangeNumberWaitingPeriods(
config.getDynamoDbTables().getChangeNumberWaitingPeriods().getTableName(), dynamoDbClient);
final ChangeNumberWaitingPeriodManager changeNumberWaitingPeriodManager = new ChangeNumberWaitingPeriodManager(

View File

@ -5,15 +5,61 @@
package org.whispersystems.textsecuregcm.configuration;
import jakarta.validation.constraints.AssertTrue;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.Nullable;
import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
import com.fasterxml.jackson.annotation.JsonTypeName;
import jakarta.validation.constraints.NotBlank;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
public record FoundationDbClusterConfiguration(@Nullable String clusterFileUrl,
@Nullable String clusterFileContents) {
@JsonTypeName("default")
public record FoundationDbClusterConfiguration(@NotBlank String clusterFileUrl) implements FoundationDbDatabaseFactory {
@AssertTrue
public boolean isSingleClusterFileSourceSpecified() {
return StringUtils.isBlank(clusterFileUrl) ^ StringUtils.isBlank(clusterFileContents);
@Override
public Database build(final FDB fdb) throws IOException {
final URI clusterFileUri = URI.create(clusterFileUrl());
final File clusterFile = switch (clusterFileUri.getScheme()) {
case "file" -> new File(clusterFileUri);
case "http", "https" -> {
try (final HttpClient clusterFileClient = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(10))
.build()) {
final HttpResponse<String> response = clusterFileClient.send(HttpRequest.newBuilder()
.uri(URI.create(clusterFileUrl()))
.timeout(Duration.ofSeconds(10))
.GET()
.build(), HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
throw new IOException("Could not load cluster file (status " + response.statusCode() + ")");
}
final File tempClusterFile = File.createTempFile("fdb.cluster-", "");
tempClusterFile.deleteOnExit();
try (final FileWriter fileWriter = new FileWriter(tempClusterFile)) {
fileWriter.write(response.body());
}
yield tempClusterFile;
} catch (final InterruptedException e) {
throw new IOException("Interrupted while waiting for cluster file response", e);
}
}
default -> throw new IllegalArgumentException("Unrecognized cluster file URI scheme: " + clusterFileUri.getScheme());
};
return fdb.open(clusterFile.getAbsolutePath());
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration;
import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.dropwizard.jackson.Discoverable;
import java.io.IOException;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FoundationDbClusterConfiguration.class)
public interface FoundationDbDatabaseFactory extends Discoverable {
Database build(final FDB fdb) throws IOException;
}

View File

@ -14,11 +14,17 @@ import jakarta.validation.constraints.Size;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.whispersystems.textsecuregcm.configuration.secrets.SecretBytes;
import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
public record FoundationDbMessagesConfiguration(@NotEmpty Map<String, @Valid FoundationDbClusterConfiguration> clusters,
public record FoundationDbMessagesConfiguration(@NotEmpty Map<String, @Valid FoundationDbDatabaseFactory> clusters,
@NotEmpty Map<@PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) Integer, @Size(min = 1, max = FoundationDbMessageStore.MAX_SHARDS - 1) List<String>> epochs,
@PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) int activeEpoch) {
@PositiveOrZero @Max(FoundationDbMessageStore.MAX_EPOCHS - 1) int activeEpoch,
@NotEmpty Map<@PositiveOrZero @Max(63) Integer, SecretBytes> versionstampCipherKeys,
@PositiveOrZero @Max(63) int currentVersionstampCipherKey,
@PositiveOrZero @Max(1_000_000) long maxWatchesPerClient) {
public static final long DEFAULT_MAX_WATCHES_PER_CLIENT = 10_000;
@AssertTrue
boolean isEveryEpochClusterConfigured() {
@ -48,4 +54,9 @@ public record FoundationDbMessagesConfiguration(@NotEmpty Map<String, @Valid Fou
boolean isActiveEpochConfigured() {
return epochs().containsKey(activeEpoch());
}
@AssertTrue
boolean isCurrentVersionstampCipherKeyConfigured() {
return versionstampCipherKeys().containsKey(currentVersionstampCipherKey());
}
}

View File

@ -71,6 +71,10 @@ public class Experiment {
this.experimentNullMismatchTimer = experimentNullMismatchTimer;
}
public <T> void compareResult(final T expected, final T experimentResult) {
recordResult(expected, experimentResult, Timer.start());
}
public <T> void compareMonoResult(final T expected, final Mono<T> experimentMono) {
final Timer.Sample sample = Timer.start();

View File

@ -0,0 +1,73 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
/// A temporary message stream that can mirror message acknowledgements (deletion requests) to FoundationDB
public class DeletionMirroringRedisDynamoDbMessageStream implements MessageStream {
private final RedisDynamoDbMessageStream redisDynamoDbMessageStream;
private final FoundationDbMessageStore foundationDbMessageStore;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
private final Executor messageDeletionExecutor;
private final AciServiceIdentifier accountIdentifier;
private final byte deviceId;
private static final Logger logger = LoggerFactory.getLogger(DeletionMirroringRedisDynamoDbMessageStream.class);
public DeletionMirroringRedisDynamoDbMessageStream(final RedisDynamoDbMessageStream redisDynamoDbMessageStream,
final FoundationDbMessageStore foundationDbMessageStore,
final ExperimentEnrollmentManager experimentEnrollmentManager,
final Executor messageDeletionExecutor,
final UUID accountIdentifier,
final byte deviceId) {
this.redisDynamoDbMessageStream = redisDynamoDbMessageStream;
this.foundationDbMessageStore = foundationDbMessageStore;
this.experimentEnrollmentManager = experimentEnrollmentManager;
this.messageDeletionExecutor = messageDeletionExecutor;
this.accountIdentifier = new AciServiceIdentifier(accountIdentifier);
this.deviceId = deviceId;
}
@Override
public Flow.Publisher<MessageStreamEntry> getMessages() {
return redisDynamoDbMessageStream.getMessages();
}
@Override
public CompletableFuture<Void> acknowledgeMessage(final UUID messageGuid, final long serverTimestamp) {
// All messages stored in FoundationDB use version 8 UUIDs; if a message has a version 4 UUID, then it only exists
// in Redis/DynamoDB
if (messageGuid.version() == 8 &&
experimentEnrollmentManager.isEnrolled(accountIdentifier.uuid(), MessagesManager.MIRROR_DELETIONS_EXPERIMENT_NAME)) {
messageDeletionExecutor.execute(() -> {
try {
foundationDbMessageStore.delete(accountIdentifier, deviceId, messageGuid);
} catch (final Exception e) {
logger.warn("Failed to delete message {}/{}/{} from FoundationDb", accountIdentifier.uuid(), deviceId, messageGuid, e);
}
});
}
return redisDynamoDbMessageStream.acknowledgeMessage(messageGuid, serverTimestamp);
}
}

View File

@ -6,11 +6,14 @@ package org.whispersystems.textsecuregcm.storage;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -28,10 +31,13 @@ import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager;
import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Flux;
@ -52,27 +58,44 @@ public class MessagesManager {
private static final String MAY_HAVE_MESSAGES_COUNTER_NAME =
MetricsUtil.name(MessagesManager.class, "mayHaveMessages");
private static final String PRESENCE_MATCH_COUNTER_NAME =
MetricsUtil.name(MessagesManager.class, "presenceMatch");
@VisibleForTesting
static final String MIRROR_INSERTS_EXPERIMENT_NAME = "foundationDbMirrorInserts";
@VisibleForTesting
static final String MIRROR_DELETIONS_EXPERIMENT_NAME = "foundationDbMirrorDeletions";
private static final long FOUNDATIONDB_INSERT_TIMEOUT_MILLIS = Duration.ofSeconds(2).toMillis();
private final MessagesDynamoDb messagesDynamoDb;
private final MessagesCache messagesCache;
private final FoundationDbMessageStore foundationDbMessageStore;
private final RedisMessageAvailabilityManager redisMessageAvailabilityManager;
private final ReportMessageManager reportMessageManager;
private final ExecutorService messageDeletionExecutor;
private final Clock clock;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
public MessagesManager(
final MessagesDynamoDb messagesDynamoDb,
final MessagesCache messagesCache,
final FoundationDbMessageStore foundationDbMessageStore,
final RedisMessageAvailabilityManager redisMessageAvailabilityManager,
final ReportMessageManager reportMessageManager,
final ExecutorService messageDeletionExecutor,
final Clock clock) {
final Clock clock,
final ExperimentEnrollmentManager experimentEnrollmentManager) {
this.messagesDynamoDb = messagesDynamoDb;
this.messagesCache = messagesCache;
this.foundationDbMessageStore = foundationDbMessageStore;
this.redisMessageAvailabilityManager = redisMessageAvailabilityManager;
this.reportMessageManager = reportMessageManager;
this.messageDeletionExecutor = messageDeletionExecutor;
this.clock = clock;
this.experimentEnrollmentManager = experimentEnrollmentManager;
}
/**
@ -92,28 +115,68 @@ public class MessagesManager {
private CompletableFuture<Map<Byte, Boolean>> insertAsync(final UUID accountIdentifier, final Map<Byte, Envelope> messagesByDeviceId) {
final Map<Byte, Boolean> devicePresenceById = new ConcurrentHashMap<>();
return CompletableFuture.allOf(messagesByDeviceId.entrySet().stream()
.map(deviceIdAndMessage -> {
final byte deviceId = deviceIdAndMessage.getKey();
final Envelope message = deviceIdAndMessage.getValue();
final UUID messageGuid = UUID.randomUUID();
final CompletableFuture<Map<Byte, FoundationDbMessageStore.InsertResult>> foundationDbInsertFuture;
return messagesCache.insert(messageGuid, accountIdentifier, deviceId, message)
.thenAccept(present -> {
if (message.hasSourceServiceId()) {
final ServiceIdentifier sourceServiceIdentifier =
ServiceIdentifier.fromByteString(message.getSourceServiceId());
if (experimentEnrollmentManager.isEnrolled(accountIdentifier, MIRROR_INSERTS_EXPERIMENT_NAME)) {
// Multi-recipient messages will have both a "shared MRM key" and actual message content; we only need/want the
// latter for FoundationDB
final Map<Byte, Envelope> minimizedMessagesByDeviceId = messagesByDeviceId.entrySet().stream()
.collect(Collectors.toUnmodifiableMap(
Map.Entry::getKey,
entry -> entry.getValue().toBuilder().clearSharedMrmKey().build()));
if (!accountIdentifier.equals(sourceServiceIdentifier.uuid())) {
// Note that this is an asynchronous, best-effort, fire-and-forget operation
reportMessageManager.store(sourceServiceIdentifier.toServiceIdentifierString(), messageGuid);
}
}
foundationDbInsertFuture =
foundationDbMessageStore.insert(new AciServiceIdentifier(accountIdentifier), minimizedMessagesByDeviceId)
.orTimeout(FOUNDATIONDB_INSERT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
.exceptionally(e -> {
logger.warn("Failed to insert {} message(s) for {} into FoundationDB",
minimizedMessagesByDeviceId.size(),
accountIdentifier,
e);
devicePresenceById.put(deviceId, present);
});
})
.toArray(CompletableFuture[]::new))
return Collections.emptyMap();
});
} else {
foundationDbInsertFuture = CompletableFuture.completedFuture(Collections.emptyMap());
}
return foundationDbInsertFuture.thenCompose(foundationDbInsertResults ->
CompletableFuture.allOf(messagesByDeviceId.entrySet().stream()
.map(deviceIdAndMessage -> {
final byte deviceId = deviceIdAndMessage.getKey();
// Multi-recipient messages will have both a "shared MRM key" and actual message content; we only
// need/want the former for Redis
final Envelope message = deviceIdAndMessage.getValue().hasSharedMrmKey()
? deviceIdAndMessage.getValue().toBuilder().clearContent().build()
: deviceIdAndMessage.getValue();
final UUID messageGuid = Optional.ofNullable(foundationDbInsertResults.get(deviceId))
.flatMap(FoundationDbMessageStore.InsertResult::messageGuid)
.orElseGet(UUID::randomUUID);
return messagesCache.insert(messageGuid, accountIdentifier, deviceId, message)
.thenAccept(present -> {
if (message.hasSourceServiceId()) {
final ServiceIdentifier sourceServiceIdentifier =
ServiceIdentifier.fromByteString(message.getSourceServiceId());
if (!accountIdentifier.equals(sourceServiceIdentifier.uuid())) {
// Note that this is an asynchronous, best-effort, fire-and-forget operation
reportMessageManager.store(sourceServiceIdentifier.toServiceIdentifierString(), messageGuid);
}
}
devicePresenceById.put(deviceId, present);
if (foundationDbInsertResults.containsKey(deviceId)) {
Metrics.counter(PRESENCE_MATCH_COUNTER_NAME,
"match", String.valueOf(present == foundationDbInsertResults.get(deviceId).present()))
.increment();
}
});
})
.toArray(CompletableFuture[]::new)))
.thenApply(ignored -> devicePresenceById);
}
@ -173,6 +236,7 @@ public class MessagesManager {
return insertAsync(resolvedRecipients.get(recipient).getIdentifier(IdentityType.ACI),
IntStream.range(0, devices.length).mapToObj(i -> devices[i])
.collect(Collectors.toMap(deviceId -> deviceId, _ -> prototypeMessage.toBuilder()
.setContent(ByteString.copyFrom(multiRecipientMessage.messageForRecipient(serviceIdAndRecipient.getValue())))
.setDestinationServiceId(serviceIdentifier.toCompactByteString())
.build())))
.thenAccept(clientPresenceByDeviceId ->
@ -215,7 +279,13 @@ public class MessagesManager {
}
public MessageStream getMessages(final UUID destinationUuid, final Device destinationDevice) {
return new RedisDynamoDbMessageStream(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, destinationUuid, destinationDevice);
return new DeletionMirroringRedisDynamoDbMessageStream(
new RedisDynamoDbMessageStream(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, destinationUuid, destinationDevice),
foundationDbMessageStore,
experimentEnrollmentManager,
messageDeletionExecutor,
destinationUuid,
destinationDevice.getId());
}
Publisher<Envelope> getMessagesForDevice(final UUID destinationUuid, final Device destinationDevice) {
@ -226,11 +296,31 @@ public class MessagesManager {
.tap(Micrometer.metrics(Metrics.globalRegistry));
}
public CompletableFuture<Void> clear(UUID destinationUuid) {
public CompletableFuture<Void> clear(final UUID destinationUuid) {
if (experimentEnrollmentManager.isEnrolled(destinationUuid, MessagesManager.MIRROR_DELETIONS_EXPERIMENT_NAME)) {
messageDeletionExecutor.execute(() -> {
try {
foundationDbMessageStore.clearAll(new AciServiceIdentifier(destinationUuid));
} catch (final Exception e) {
logger.warn("Failed to clear messages for {}", destinationUuid, e);
}
});
}
return messagesCache.clear(destinationUuid);
}
public CompletableFuture<Void> clear(UUID destinationUuid, byte deviceId) {
public CompletableFuture<Void> clear(final UUID destinationUuid, final byte deviceId) {
if (experimentEnrollmentManager.isEnrolled(destinationUuid, MessagesManager.MIRROR_DELETIONS_EXPERIMENT_NAME)) {
messageDeletionExecutor.execute(() -> {
try {
foundationDbMessageStore.clearAll(new AciServiceIdentifier(destinationUuid), deviceId);
} catch (final Exception e) {
logger.warn("Failed to clear messages for {}:{}", destinationUuid, deviceId, e);
}
});
}
return messagesCache.clear(destinationUuid, deviceId);
}

View File

@ -16,11 +16,18 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessageStream;
import org.whispersystems.textsecuregcm.util.Conversions;
@ -58,12 +65,27 @@ public class FoundationDbMessageStore {
public static final int MAX_EPOCHS = 4;
public static final int MAX_SHARDS = 64;
private static final Counter INSERT_MESSAGE_COUNTER =
Metrics.counter(MetricsUtil.name(FoundationDbMessageStore.class, "insertMessage"));
private static final Timer INSERT_MESSAGE_BATCH_TIMER =
Metrics.timer(MetricsUtil.name(FoundationDbMessageStore.class, "insertMessageBatchTimer"));
private static final Counter DELETE_MESSAGE_COUNTER =
Metrics.counter(MetricsUtil.name(FoundationDbMessageStore.class, "deleteMessage"));
private static final Timer DELETE_MESSAGE_TIMER =
Metrics.timer(MetricsUtil.name(FoundationDbMessageStore.class, "deleteMessageTimer"));
/// Result of inserting a message for a particular device
///
/// @param versionstamp the versionstamp of the transaction in which this device's message was inserted, empty
/// otherwise
/// @param messageGuid the versionstamp encrypted/encoded as a version 8 UUID
/// @param present whether the device is online
public record InsertResult(Optional<Versionstamp> versionstamp, boolean present) {
public record InsertResult(Optional<Versionstamp> versionstamp,
Optional<UUID> messageGuid,
boolean present) {
}
public FoundationDbMessageStore(final Map<Integer, List<Database>> databasesByEpoch,
@ -129,6 +151,8 @@ public class FoundationDbMessageStore {
final Map<AciServiceIdentifier, Map<Byte, MessageProtos.Envelope>> messagesByServiceIdentifier,
final int epoch) {
final Timer.Sample sample = Timer.start();
if (messagesByServiceIdentifier.entrySet()
.stream()
.anyMatch(entry -> entry.getValue().isEmpty())) {
@ -180,7 +204,13 @@ public class FoundationDbMessageStore {
.reduce(new HashMap<>(), (a, b) -> {
a.putAll(b);
return a;
}));
}))
.whenComplete((_, throwable) -> {
if (throwable == null) {
sample.stop(INSERT_MESSAGE_BATCH_TIMER);
INSERT_MESSAGE_COUNTER.increment(messagesByServiceIdentifier.values().stream().mapToInt(Map::size).sum());
}
});
}
private CompletableFuture<Map<AciServiceIdentifier, Map<Byte, InsertResult>>> insertChunk(
@ -231,7 +261,11 @@ public class FoundationDbMessageStore {
} else {
insertResultVersionstamp = Optional.empty();
}
return new InsertResult(insertResultVersionstamp, presenceEntry.getValue());
return new InsertResult(insertResultVersionstamp,
insertResultVersionstamp.map(versionstamp ->
versionstampUUIDCipher.encryptVersionstamp(versionstamp, entry.getKey().uuid(), presenceEntry.getKey())),
presenceEntry.getValue());
}));
})));
}
@ -290,6 +324,48 @@ public class FoundationDbMessageStore {
});
}
// Note that this method is intended only for initial migration support; in general, callers should clear messages
// by acknowledging messages via a `FoundationDbMessageStream`.
public void delete(final AciServiceIdentifier aci, final byte deviceId, final UUID messageGuid) {
delete(aci, deviceId, versionstampUUIDCipher.decryptVersionstamp(messageGuid, aci.uuid(), deviceId));
}
private void delete(final AciServiceIdentifier aci, final byte deviceId, final Versionstamp versionstamp) {
final Timer.Sample sample = Timer.start();
final byte[] messageKey = getDeviceQueueSubspace(aci, deviceId).pack(Tuple.from(versionstamp));
databasesByEpoch[getConfigurationEpoch(versionstamp)][getShardId(versionstamp)].run(transaction -> {
transaction.clear(messageKey);
return null;
});
sample.stop(DELETE_MESSAGE_TIMER);
DELETE_MESSAGE_COUNTER.increment();
}
public void clearAll(final AciServiceIdentifier aci) {
doForAllDatabasesWithMessages(aci, database -> database.run(transaction -> {
transaction.clear(getAccountSubspace(aci).range());
return null;
}));
}
public void clearAll(final AciServiceIdentifier aci, final byte deviceId) {
doForAllDatabasesWithMessages(aci, database -> database.run(transaction -> {
transaction.clear(getDeviceSubspace(aci, deviceId).range());
return null;
}));
}
private void doForAllDatabasesWithMessages(final AciServiceIdentifier aci, final Consumer<Database> action) {
IntStream.range(0, databasesByEpoch.length)
.filter(epoch -> databasesByEpoch[epoch] != null)
.mapToObj(epoch -> databasesByEpoch[epoch][hashAciToShardNumber(aci, epoch)])
.distinct()
.forEach(action);
}
public MessageStream getMessages(final AciServiceIdentifier aci, final Device destinationDevice) {
return getMessages(aci, destinationDevice, FoundationDbMessageStream.DEFAULT_MAX_MESSAGES_PER_SCAN,
FoundationDbMessageStream.DEFAULT_MAX_UNACKNOWLEDGED_MESSAGES, Util.NOOP);

View File

@ -5,18 +5,23 @@
package org.whispersystems.textsecuregcm.workers;
import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
import com.fasterxml.jackson.databind.DeserializationFeature;
import io.dropwizard.core.setup.Environment;
import io.lettuce.core.resource.ClientResources;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.time.Clock;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.stream.Collectors;
import org.signal.libsignal.zkgroup.GenericServerSecretParams;
import org.signal.libsignal.zkgroup.InvalidInputException;
import org.signal.libsignal.zkgroup.ServerSecretParams;
@ -56,6 +61,7 @@ import org.whispersystems.textsecuregcm.storage.ChangeNumberWaitingPeriodManager
import org.whispersystems.textsecuregcm.storage.ChangeNumberWaitingPeriods;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.DynamoDbRecoveryManager;
import org.whispersystems.textsecuregcm.storage.FoundationDbVersion;
import org.whispersystems.textsecuregcm.storage.IssuedReceiptsManager;
import org.whispersystems.textsecuregcm.storage.KeysManager;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
@ -75,6 +81,8 @@ import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
import org.whispersystems.textsecuregcm.storage.SingleUseECPreKeyStore;
import org.whispersystems.textsecuregcm.storage.SubscriptionManager;
import org.whispersystems.textsecuregcm.storage.Subscriptions;
import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
import org.whispersystems.textsecuregcm.storage.foundationdb.VersionstampUUIDCipher;
import org.whispersystems.textsecuregcm.subscriptions.AppleAppStoreClient;
import org.whispersystems.textsecuregcm.subscriptions.AppleAppStoreManager;
import org.whispersystems.textsecuregcm.subscriptions.GooglePlayBillingManager;
@ -127,6 +135,39 @@ public record CommandDependencies(
environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
final FDB fdb = FDB.selectAPIVersion(FoundationDbVersion.getFoundationDbApiVersion());
// Jetty and the FoundationDB client both register shutdown hooks to begin shutdown/cleanup operations. There isn't
// a good way to coordinate or enforce ordering between shutdown hooks, and so the two processes will race.
// Generally, FoundationDB will shut down before Jetty does, meaning we'll still be trying to serve requests that
// require talking to FoundationDB even though FoundationDB has shut down. To avoid that scenario, we disabled
// FoundationDB's shutdown hook and let the JVM terminate its (daemon) threads at exit. This isn't as graceful as
// we'd like, but is the least bad option given current constraints.
fdb.disableShutdownHook();
final Map<Integer, List<Database>> messageDatabasesByEpoch;
{
final Map<String, Database> databasesByName =
configuration.getFoundationDbMessagesConfiguration().clusters().entrySet().stream()
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey,
entry -> {
try {
final Database database = entry.getValue().build(fdb);
database.options().setMaxWatches(configuration.getFoundationDbMessagesConfiguration().maxWatchesPerClient());
return database;
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}));
messageDatabasesByEpoch = configuration.getFoundationDbMessagesConfiguration().epochs().entrySet().stream()
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey,
entry -> entry.getValue().stream()
.map(databasesByName::get)
.toList()));
}
final AwsCredentialsProvider awsCredentialsProvider = configuration.getAwsCredentialsConfiguration().build();
ScheduledExecutorService dynamicConfigurationExecutor = ScheduledExecutorServiceBuilder.of(environment, "dynamicConfiguration")
@ -268,6 +309,11 @@ public record CommandDependencies(
disconnectionRequestListenerExecutor, retryExecutor);
MessagesCache messagesCache = new MessagesCache(messagesCluster,
messageDeliveryScheduler, messageDeletionExecutor, retryExecutor, Clock.systemUTC());
final FoundationDbMessageStore foundationDbMessageStore = new FoundationDbMessageStore(messageDatabasesByEpoch,
configuration.getFoundationDbMessagesConfiguration().activeEpoch(),
new VersionstampUUIDCipher(configuration.getFoundationDbMessagesConfiguration().currentVersionstampCipherKey(),
configuration.getFoundationDbMessagesConfiguration().versionstampCipherKeys().get(configuration.getFoundationDbMessagesConfiguration().currentVersionstampCipherKey()).value()),
Clock.systemUTC());
ProfilesManager profilesManager = new ProfilesManager(profilesV1, profiles, cacheCluster, retryExecutor, asyncCdnS3Client,
configuration.getCdnConfiguration().bucket());
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
@ -277,8 +323,9 @@ public record CommandDependencies(
configuration.getReportMessageConfiguration().getCounterTtl());
RedisMessageAvailabilityManager redisMessageAvailabilityManager =
new RedisMessageAvailabilityManager(messagesCluster, clientEventExecutor, asyncOperationQueueingExecutor);
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager,
reportMessageManager, messageDeletionExecutor, Clock.systemUTC());
final MessagesManager messagesManager =
new MessagesManager(messagesDynamoDb, messagesCache, foundationDbMessageStore, redisMessageAvailabilityManager,
reportMessageManager, messageDeletionExecutor, Clock.systemUTC(), experimentEnrollmentManager);
AccountLockManager accountLockManager = new AccountLockManager(dynamoDbClient,
configuration.getDynamoDbTables().getDeletedAccountsLock().getTableName());
RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager =

View File

@ -2,6 +2,7 @@ org.whispersystems.textsecuregcm.configuration.AwsCredentialsProviderFactory
org.whispersystems.textsecuregcm.configuration.DynamoDbClientFactory
org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClusterFactory
org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClientFactory
org.whispersystems.textsecuregcm.configuration.FoundationDbDatabaseFactory
org.whispersystems.textsecuregcm.configuration.PaymentsServiceClientsFactory
org.whispersystems.textsecuregcm.configuration.PubSubPublisherFactory
org.whispersystems.textsecuregcm.configuration.RegistrationServiceClientFactory

View File

@ -1,40 +0,0 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.List;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
class FoundationDbClusterConfigurationTest {
@ParameterizedTest
@MethodSource
void isSingleClusterFileSourceSpecified(final FoundationDbClusterConfiguration clusterConfiguration,
final boolean expectSingleClusterFileSourceSpecified) {
assertEquals(expectSingleClusterFileSourceSpecified, clusterConfiguration.isSingleClusterFileSourceSpecified());
}
private static List<Arguments> isSingleClusterFileSourceSpecified() {
return List.of(
Arguments.argumentSet("Cluster file URL only",
new FoundationDbClusterConfiguration("test-url", null), true),
Arguments.argumentSet("Cluster file contents only",
new FoundationDbClusterConfiguration(null, "test-contents"), true),
Arguments.argumentSet("Both",
new FoundationDbClusterConfiguration("test-url", "test-contents"), false),
Arguments.argumentSet("Neither",
new FoundationDbClusterConfiguration(null, null), false)
);
}
}

View File

@ -6,6 +6,8 @@
package org.whispersystems.textsecuregcm.configuration;
import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.configuration.secrets.SecretBytes;
import org.whispersystems.textsecuregcm.util.TestRandomUtil;
import java.util.List;
import java.util.Map;
@ -17,45 +19,84 @@ class FoundationDbMessagesConfigurationTest {
@Test
void isEveryEpochClusterConfigured() {
assertTrue(new FoundationDbMessagesConfiguration(
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)),
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")),
Map.of(0, List.of("messages-0")),
0
0,
Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))),
0,
FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT
).isEveryEpochClusterConfigured());
assertFalse(new FoundationDbMessagesConfiguration(
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)),
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")),
Map.of(0, List.of("messages-0", "unconfigured-cluster")),
0
0,
Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))),
0,
FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT
).isEveryEpochClusterConfigured());
}
@Test
void isEveryEpochFreeOfDuplicates() {
assertTrue(new FoundationDbMessagesConfiguration(
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)),
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")),
Map.of(0, List.of("messages-0")),
0
0,
Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))),
0,
FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT
).isEveryEpochFreeOfDuplicates());
assertFalse(new FoundationDbMessagesConfiguration(
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)),
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")),
Map.of(0, List.of("messages-0", "messages-0")),
0
0,
Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))),
0,
FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT
).isEveryEpochFreeOfDuplicates());
}
@Test
void isActiveEpochConfigured() {
assertTrue(new FoundationDbMessagesConfiguration(
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)),
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")),
Map.of(0, List.of("messages-0")),
0
0,
Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))),
0,
FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT
).isActiveEpochConfigured());
assertFalse(new FoundationDbMessagesConfiguration(
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url", null)),
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")),
Map.of(0, List.of("messages-0")),
1
1,
Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))),
0,
FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT
).isActiveEpochConfigured());
}
@Test
void isCurrentVersionstampCipherKeyConfigured() {
assertTrue(new FoundationDbMessagesConfiguration(
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")),
Map.of(0, List.of("messages-0")),
0,
Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))),
0,
FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT
).isCurrentVersionstampCipherKeyConfigured());
assertFalse(new FoundationDbMessagesConfiguration(
Map.of("messages-0", new FoundationDbClusterConfiguration("test-url")),
Map.of(0, List.of("messages-0")),
0,
Map.of(0, new SecretBytes(TestRandomUtil.nextBytes(16))),
1,
FoundationDbMessagesConfiguration.DEFAULT_MAX_WATCHES_PER_CLIENT
).isCurrentVersionstampCipherKeyConfigured());
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration;
import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeName;
import java.io.IOException;
import org.whispersystems.textsecuregcm.storage.TestcontainersFoundationDbDatabaseLifecycleManager;
@JsonTypeName("local")
public class LocalFoundationDbDatabaseFactory implements FoundationDbDatabaseFactory {
@JsonIgnore
private final TestcontainersFoundationDbDatabaseLifecycleManager testcontainersFoundationDbDatabaseLifecycleManager;
@JsonIgnore
private Database database;
private LocalFoundationDbDatabaseFactory() {
this.testcontainersFoundationDbDatabaseLifecycleManager = new TestcontainersFoundationDbDatabaseLifecycleManager();
}
@Override
public synchronized Database build(final FDB fdb) throws IOException {
if (database == null) {
Runtime.getRuntime().addShutdownHook(new Thread(testcontainersFoundationDbDatabaseLifecycleManager::closeDatabase));
testcontainersFoundationDbDatabaseLifecycleManager.initializeDatabase(fdb);
database = testcontainersFoundationDbDatabaseLifecycleManager.getDatabase();
}
return database;
}
}

View File

@ -38,7 +38,6 @@ import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.signal.chat.messages.GetMessagesRequest;
import org.signal.chat.messages.GetMessagesResponse;
import org.whispersystems.textsecuregcm.auth.DisconnectionRequestManager;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
@ -63,6 +62,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
@ -127,8 +127,8 @@ class MessageDispatcherIntegrationTest {
messageDispatcher = new MessageDispatcher(
mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, reportMessageManager,
sharedExecutorService, Clock.systemUTC()),
new MessagesManager(messagesDynamoDb, messagesCache, mock(FoundationDbMessageStore.class), redisMessageAvailabilityManager, reportMessageManager,
sharedExecutorService, Clock.systemUTC(), mock(ExperimentEnrollmentManager.class)),
new MessageMetrics(),
mock(PushNotificationManager.class),
mock(PushNotificationScheduler.class),

View File

@ -0,0 +1,69 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
class DeletionMirroringRedisDynamoDbMessageStreamTest {
private FoundationDbMessageStore foundationDbMessageStore;
private ExperimentEnrollmentManager experimentEnrollmentManager;
private DeletionMirroringRedisDynamoDbMessageStream deletionMirroringRedisDynamoDbMessageStream;
private static final AciServiceIdentifier ACCOUNT_IDENTIFIER = new AciServiceIdentifier(UUID.randomUUID());
private static final byte DEVICE_ID = Device.PRIMARY_ID;
@BeforeEach
void setUp() {
foundationDbMessageStore = mock(FoundationDbMessageStore.class);
experimentEnrollmentManager = mock(ExperimentEnrollmentManager.class);
deletionMirroringRedisDynamoDbMessageStream = new DeletionMirroringRedisDynamoDbMessageStream(
mock(RedisDynamoDbMessageStream.class),
foundationDbMessageStore,
experimentEnrollmentManager,
Runnable::run,
ACCOUNT_IDENTIFIER.uuid(),
DEVICE_ID);
}
@ParameterizedTest
@MethodSource
void acknowledgeMessage(final boolean enrolled, final UUID messageGuid, final boolean expectFoundationDbDeletion) {
when(experimentEnrollmentManager.isEnrolled(any(UUID.class), eq(MessagesManager.MIRROR_DELETIONS_EXPERIMENT_NAME)))
.thenReturn(enrolled);
deletionMirroringRedisDynamoDbMessageStream.acknowledgeMessage(messageGuid, System.currentTimeMillis());
verify(foundationDbMessageStore, times(expectFoundationDbDeletion ? 1 : 0))
.delete(ACCOUNT_IDENTIFIER, DEVICE_ID, messageGuid);
}
private static List<Arguments> acknowledgeMessage() {
return List.of(
Arguments.argumentSet("Not enrolled, v4 UUID", false, UUID.randomUUID(), false),
Arguments.argumentSet("Not enrolled, v8 UUID", false, MessageGuidUtil.generateRandomV8UUID(), false),
Arguments.argumentSet("Enrolled, v4 UUID", true, UUID.randomUUID(), false),
Arguments.argumentSet("Enrolled, v8 UUID", true, MessageGuidUtil.generateRandomV8UUID(), true)
);
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
public class MessageGuidUtil {
private static final long VERSION_MASK = 0x0000_0000_0000_f000L;
private static final long VERSION_8_BITS = 0x0000_0000_0000_8000L;
public static UUID generateRandomV8UUID() {
long mostSignificantBits = ThreadLocalRandom.current().nextLong();
// Clear any bits in the version field
mostSignificantBits &= ~VERSION_MASK;
// Set the version to 8
mostSignificantBits |= VERSION_8_BITS;
return new UUID(mostSignificantBits, ThreadLocalRandom.current().nextLong());
}
}

View File

@ -36,11 +36,13 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagePersisterConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
import org.whispersystems.textsecuregcm.push.MessageAvailabilityListener;
import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables;
import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
import org.whispersystems.textsecuregcm.tests.util.DevicesHelper;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import reactor.core.scheduler.Scheduler;
@ -95,10 +97,12 @@ class MessagePersisterIntegrationTest {
final MessagesManager messagesManager = new MessagesManager(messagesDynamoDb,
messagesCache,
mock(FoundationDbMessageStore.class),
mock(RedisMessageAvailabilityManager.class),
mock(ReportMessageManager.class),
messageDeletionExecutorService,
Clock.systemUTC());
Clock.systemUTC(),
mock(ExperimentEnrollmentManager.class));
websocketConnectionEventExecutor = Executors.newVirtualThreadPerTaskExecutor();
asyncOperationQueueingExecutor = Executors.newSingleThreadExecutor();

View File

@ -8,14 +8,17 @@ package org.whispersystems.textsecuregcm.storage;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyByte;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.apple.foundationdb.tuple.Versionstamp;
import com.google.protobuf.ByteString;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
@ -28,19 +31,23 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.signal.libsignal.protocol.InvalidMessageException;
import org.signal.libsignal.protocol.InvalidVersionException;
import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
import org.signal.libsignal.protocol.ServiceId;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.identity.PniServiceIdentifier;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager;
import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
import org.whispersystems.textsecuregcm.tests.util.MultiRecipientMessageHelper;
import org.whispersystems.textsecuregcm.tests.util.TestRecipient;
import org.whispersystems.textsecuregcm.util.TestClock;
@ -48,27 +55,55 @@ import reactor.core.publisher.Mono;
class MessagesManagerTest {
private final MessagesDynamoDb messagesDynamoDb = mock(MessagesDynamoDb.class);
private final MessagesCache messagesCache = mock(MessagesCache.class);
private final ReportMessageManager reportMessageManager = mock(ReportMessageManager.class);
private MessagesDynamoDb messagesDynamoDb;
private MessagesCache messagesCache;
private FoundationDbMessageStore foundationDbMessageStore;
private ReportMessageManager reportMessageManager;
private ExperimentEnrollmentManager experimentEnrollmentManager;
private static final TestClock CLOCK = TestClock.pinned(Instant.now());
private final MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache,
mock(RedisMessageAvailabilityManager.class), reportMessageManager, Executors.newSingleThreadExecutor(), CLOCK);
private MessagesManager messagesManager;
@BeforeEach
void setUp() {
messagesDynamoDb = mock(MessagesDynamoDb.class);
messagesCache = mock(MessagesCache.class);
foundationDbMessageStore = mock(FoundationDbMessageStore.class);
reportMessageManager = mock(ReportMessageManager.class);
experimentEnrollmentManager = mock(ExperimentEnrollmentManager.class);
when(messagesCache.insert(any(), any(), anyByte(), any())).thenReturn(CompletableFuture.completedFuture(true));
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, foundationDbMessageStore,
mock(RedisMessageAvailabilityManager.class), reportMessageManager, Executors.newSingleThreadExecutor(), CLOCK,
experimentEnrollmentManager);
}
@Test
void insert() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void insert(final boolean mirrorInsert) {
final UUID sourceAci = UUID.randomUUID();
final Envelope message = Envelope.newBuilder()
.setSourceServiceId(new AciServiceIdentifier(sourceAci).toCompactByteString())
.build();
if (mirrorInsert) {
when(experimentEnrollmentManager.isEnrolled(any(UUID.class), eq(MessagesManager.MIRROR_INSERTS_EXPERIMENT_NAME)))
.thenReturn(true);
when(foundationDbMessageStore.insert(any(), any()))
.thenAnswer(invocation -> {
final Map<Byte, Envelope> messagesByDeviceId = invocation.getArgument(1);
return CompletableFuture.completedFuture(messagesByDeviceId.keySet().stream()
.collect(Collectors.toMap(deviceId -> deviceId,
_ -> new FoundationDbMessageStore.InsertResult(Optional.of(Versionstamp.fromBytes(new byte[12])),
Optional.of(MessageGuidUtil.generateRandomV8UUID()),
true))));
});
}
final UUID destinationUuid = UUID.randomUUID();
messagesManager.insert(destinationUuid, Map.of(Device.PRIMARY_ID, message));
@ -82,14 +117,78 @@ class MessagesManagerTest {
messagesManager.insert(destinationUuid, Map.of(Device.PRIMARY_ID, syncMessage));
verifyNoMoreInteractions(reportMessageManager);
verify(foundationDbMessageStore, times(mirrorInsert ? 2 : 0)).insert(any(), any());
verify(messagesCache, times(2))
.insert(argThat(messageGuid -> messageGuid.version() == (mirrorInsert ? 8 : 4)),
eq(destinationUuid),
eq(Device.PRIMARY_ID),
any());
}
@Test
void insertMultiRecipientMessage() throws InvalidMessageException, InvalidVersionException {
final ServiceIdentifier singleDeviceAccountAciServiceIdentifier = new AciServiceIdentifier(UUID.randomUUID());
final ServiceIdentifier singleDeviceAccountPniServiceIdentifier = new PniServiceIdentifier(UUID.randomUUID());
final ServiceIdentifier multiDeviceAccountAciServiceIdentifier = new AciServiceIdentifier(UUID.randomUUID());
final ServiceIdentifier unresolvedAccountAciServiceIdentifier = new AciServiceIdentifier(UUID.randomUUID());
void insertFoundationDbException() {
final UUID sourceAci = UUID.randomUUID();
final Envelope message = Envelope.newBuilder()
.setSourceServiceId(new AciServiceIdentifier(sourceAci).toCompactByteString())
.build();
when(experimentEnrollmentManager.isEnrolled(any(UUID.class), eq(MessagesManager.MIRROR_INSERTS_EXPERIMENT_NAME)))
.thenReturn(true);
when(foundationDbMessageStore.insert(any(), any()))
.thenReturn(CompletableFuture.failedFuture(new RuntimeException()));
final UUID destinationUuid = UUID.randomUUID();
messagesManager.insert(destinationUuid, Map.of(Device.PRIMARY_ID, message));
verify(foundationDbMessageStore).insert(any(), any());
verify(messagesCache)
.insert(argThat(messageGuid -> messageGuid.version() == 4),
eq(destinationUuid),
eq(Device.PRIMARY_ID),
any());
}
private static UUID generateRandomV8UUID() {
final long versionMask = 0x0000_0000_0000_f000L;
final long v8Version = 0x0000_0000_0000_8000L;
long mostSignificantBits = ThreadLocalRandom.current().nextLong();
// Clear any bits in the version field
mostSignificantBits &= ~versionMask;
// Set the version to 8
mostSignificantBits |= v8Version;
return new UUID(mostSignificantBits, ThreadLocalRandom.current().nextLong());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
void insertMultiRecipientMessage(final boolean mirrorInsert) throws InvalidMessageException, InvalidVersionException {
if (mirrorInsert) {
when(experimentEnrollmentManager.isEnrolled(any(UUID.class), eq(MessagesManager.MIRROR_INSERTS_EXPERIMENT_NAME)))
.thenReturn(true);
when(foundationDbMessageStore.insert(any(), any()))
.thenAnswer(invocation -> {
final Map<Byte, Envelope> messagesByDeviceId = invocation.getArgument(1);
return CompletableFuture.completedFuture(messagesByDeviceId.keySet().stream()
.collect(Collectors.toMap(deviceId -> deviceId,
_ -> new FoundationDbMessageStore.InsertResult(Optional.of(Versionstamp.fromBytes(new byte[12])),
Optional.of(generateRandomV8UUID()),
true))));
});
}
final AciServiceIdentifier singleDeviceAccountAciServiceIdentifier = new AciServiceIdentifier(UUID.randomUUID());
final PniServiceIdentifier singleDeviceAccountPniServiceIdentifier = new PniServiceIdentifier(UUID.randomUUID());
final AciServiceIdentifier multiDeviceAccountAciServiceIdentifier = new AciServiceIdentifier(UUID.randomUUID());
final AciServiceIdentifier unresolvedAccountAciServiceIdentifier = new AciServiceIdentifier(UUID.randomUUID());
final Account singleDeviceAccount = mock(Account.class);
final Account multiDeviceAccount = mock(Account.class);
@ -186,6 +285,38 @@ class MessagesManagerTest {
eq(unresolvedAccountAciServiceIdentifier.uuid()),
anyByte(),
any());
if (mirrorInsert) {
final Envelope prototypeExpectedFoundationDbMessage = prototypeExpectedMessage.toBuilder().clearSharedMrmKey().build();
verify(foundationDbMessageStore).insert(singleDeviceAccountAciServiceIdentifier,
Map.of(Device.PRIMARY_ID, prototypeExpectedFoundationDbMessage.toBuilder()
.setDestinationServiceId(singleDeviceAccountAciServiceIdentifier.toCompactByteString())
.setContent(ByteString.copyFrom(multiRecipientMessage.messageForRecipient(multiRecipientMessage.getRecipients().get(new ServiceId.Aci(singleDeviceAccountAciServiceIdentifier.uuid())))))
.build()));
verify(foundationDbMessageStore).insert(singleDeviceAccountAciServiceIdentifier,
Map.of(Device.PRIMARY_ID, prototypeExpectedFoundationDbMessage.toBuilder()
.setDestinationServiceId(singleDeviceAccountPniServiceIdentifier.toCompactByteString())
.setContent(ByteString.copyFrom(multiRecipientMessage.messageForRecipient(multiRecipientMessage.getRecipients().get(new ServiceId.Pni(singleDeviceAccountPniServiceIdentifier.uuid())))))
.build()));
verify(foundationDbMessageStore).insert(multiDeviceAccountAciServiceIdentifier,
Map.of(
Device.PRIMARY_ID, prototypeExpectedFoundationDbMessage.toBuilder()
.setDestinationServiceId(multiDeviceAccountAciServiceIdentifier.toCompactByteString())
.setContent(ByteString.copyFrom(multiRecipientMessage.messageForRecipient(multiRecipientMessage.getRecipients().get(new ServiceId.Aci(multiDeviceAccountAciServiceIdentifier.uuid())))))
.build(),
(byte) (Device.PRIMARY_ID + 1), prototypeExpectedFoundationDbMessage.toBuilder()
.setDestinationServiceId(multiDeviceAccountAciServiceIdentifier.toCompactByteString())
.setContent(ByteString.copyFrom(multiRecipientMessage.messageForRecipient(multiRecipientMessage.getRecipients().get(new ServiceId.Aci(multiDeviceAccountAciServiceIdentifier.uuid())))))
.build())
);
verify(foundationDbMessageStore, never()).insert(eq(unresolvedAccountAciServiceIdentifier), any());
} else {
verifyNoInteractions(foundationDbMessageStore);
}
}
@ParameterizedTest

View File

@ -12,7 +12,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.utility.DockerImageName;
class TestcontainersFoundationDbDatabaseLifecycleManager implements FoundationDbDatabaseLifecycleManager {
public class TestcontainersFoundationDbDatabaseLifecycleManager implements FoundationDbDatabaseLifecycleManager {
private FoundationDBContainer foundationDBContainer;
private Database database;

View File

@ -452,6 +452,94 @@ class FoundationDbMessageStoreTest {
Map.of(generateRandomAciForShard(0), Collections.emptyMap())));
}
@Test
void delete() {
final AciServiceIdentifier deletedMessageAci = new AciServiceIdentifier(UUID.randomUUID());
final byte deletedMessageDeviceId = Device.PRIMARY_ID;
final MessageProtos.Envelope deletedMessage = generateRandomMessage(false);
final AciServiceIdentifier retainedMessageAci = new AciServiceIdentifier(UUID.randomUUID());
final byte retainedMessageDeviceId = Device.PRIMARY_ID;
final MessageProtos.Envelope retainedMessage = generateRandomMessage(false);
final UUID deletedMessageGuid =
foundationDbMessageStore.insert(deletedMessageAci, Map.of(deletedMessageDeviceId, deletedMessage)).join()
.get(deletedMessageDeviceId).messageGuid().orElseThrow();
foundationDbMessageStore.insert(retainedMessageAci, Map.of(retainedMessageDeviceId, retainedMessage)).join();
assertArrayEquals(deletedMessage.toByteArray(),
getItemsInDeviceQueue(deletedMessageAci, deletedMessageDeviceId).getFirst().getValue());
assertArrayEquals(retainedMessage.toByteArray(),
getItemsInDeviceQueue(retainedMessageAci, retainedMessageDeviceId).getFirst().getValue());
foundationDbMessageStore.delete(deletedMessageAci, deletedMessageDeviceId, deletedMessageGuid);
assertTrue(getItemsInDeviceQueue(deletedMessageAci, deletedMessageDeviceId).isEmpty());
assertArrayEquals(retainedMessage.toByteArray(),
getItemsInDeviceQueue(retainedMessageAci, retainedMessageDeviceId).getFirst().getValue());
}
@Test
void clearAllForAccount() {
final AciServiceIdentifier deletedAccountIdentifier = new AciServiceIdentifier(UUID.randomUUID());
final byte deletedPrimaryDeviceId = Device.PRIMARY_ID;
final byte deletedLinkedDeviceId = deletedPrimaryDeviceId + 1;
final AciServiceIdentifier retainedAccountIdentifier = new AciServiceIdentifier(UUID.randomUUID());
final byte retainedPrimaryDeviceId = Device.PRIMARY_ID;
foundationDbMessageStore.insert(deletedAccountIdentifier, Map.of(
deletedPrimaryDeviceId, generateRandomMessage(false),
deletedLinkedDeviceId, generateRandomMessage(false)
)).join();
foundationDbMessageStore.insert(retainedAccountIdentifier, Map.of(
retainedPrimaryDeviceId, generateRandomMessage(false)
)).join();
assertEquals(1, getItemsInDeviceQueue(deletedAccountIdentifier, deletedPrimaryDeviceId).size());
assertEquals(1, getItemsInDeviceQueue(deletedAccountIdentifier, deletedLinkedDeviceId).size());
assertEquals(1, getItemsInDeviceQueue(retainedAccountIdentifier, retainedPrimaryDeviceId).size());
foundationDbMessageStore.clearAll(deletedAccountIdentifier);
assertEquals(0, getItemsInDeviceQueue(deletedAccountIdentifier, deletedPrimaryDeviceId).size());
assertEquals(0, getItemsInDeviceQueue(deletedAccountIdentifier, deletedLinkedDeviceId).size());
assertEquals(1, getItemsInDeviceQueue(retainedAccountIdentifier, retainedPrimaryDeviceId).size());
}
@Test
void clearAllForDevice() {
final AciServiceIdentifier targetedAccountIdentifier = new AciServiceIdentifier(UUID.randomUUID());
final byte targetedAccountRetainedDeviceId = Device.PRIMARY_ID;
final byte targetedAccountDeletedDeviceId = targetedAccountRetainedDeviceId + 1;
final AciServiceIdentifier untargetedAccountIdentifier = new AciServiceIdentifier(UUID.randomUUID());
final byte untargetedAccountPrimaryDeviceId = Device.PRIMARY_ID;
foundationDbMessageStore.insert(targetedAccountIdentifier, Map.of(
targetedAccountRetainedDeviceId, generateRandomMessage(false),
targetedAccountDeletedDeviceId, generateRandomMessage(false)
)).join();
foundationDbMessageStore.insert(untargetedAccountIdentifier, Map.of(
untargetedAccountPrimaryDeviceId, generateRandomMessage(false)
)).join();
assertEquals(1, getItemsInDeviceQueue(targetedAccountIdentifier, targetedAccountRetainedDeviceId).size());
assertEquals(1, getItemsInDeviceQueue(targetedAccountIdentifier, targetedAccountDeletedDeviceId).size());
assertEquals(1, getItemsInDeviceQueue(untargetedAccountIdentifier, untargetedAccountPrimaryDeviceId).size());
foundationDbMessageStore.clearAll(targetedAccountIdentifier, targetedAccountDeletedDeviceId);
assertEquals(1, getItemsInDeviceQueue(targetedAccountIdentifier, targetedAccountRetainedDeviceId).size());
assertEquals(0, getItemsInDeviceQueue(targetedAccountIdentifier, targetedAccountDeletedDeviceId).size());
assertEquals(1, getItemsInDeviceQueue(untargetedAccountIdentifier, untargetedAccountPrimaryDeviceId).size());
}
@ParameterizedTest
@MethodSource
void getMessages(final int numMessages, final int batchSize) {

View File

@ -68,6 +68,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
import org.whispersystems.textsecuregcm.storage.foundationdb.FoundationDbMessageStore;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
@ -149,7 +150,7 @@ class WebSocketConnectionIntegrationTest {
void testProcessStoredMessages(final int persistedMessageCount, final int cachedMessageCount) {
final WebSocketConnection webSocketConnection = new WebSocketConnection(
mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, reportMessageManager, sharedExecutorService, Clock.systemUTC()),
new MessagesManager(messagesDynamoDb, messagesCache, mock(FoundationDbMessageStore.class), redisMessageAvailabilityManager, reportMessageManager, sharedExecutorService, Clock.systemUTC(), mock(ExperimentEnrollmentManager.class)),
new MessageMetrics(),
mock(PushNotificationManager.class),
mock(PushNotificationScheduler.class),
@ -224,7 +225,7 @@ class WebSocketConnectionIntegrationTest {
void testProcessStoredMessagesMultipleSegments() {
final WebSocketConnection webSocketConnection = new WebSocketConnection(
mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, reportMessageManager, sharedExecutorService, Clock.systemUTC()),
new MessagesManager(messagesDynamoDb, messagesCache, mock(FoundationDbMessageStore.class), redisMessageAvailabilityManager, reportMessageManager, sharedExecutorService, Clock.systemUTC(), mock(ExperimentEnrollmentManager.class)),
new MessageMetrics(),
mock(PushNotificationManager.class),
mock(PushNotificationScheduler.class),
@ -319,7 +320,7 @@ class WebSocketConnectionIntegrationTest {
void testProcessStoredMessagesClientClosed() {
final WebSocketConnection webSocketConnection = new WebSocketConnection(
mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, reportMessageManager, sharedExecutorService, Clock.systemUTC()),
new MessagesManager(messagesDynamoDb, messagesCache, mock(FoundationDbMessageStore.class), redisMessageAvailabilityManager, reportMessageManager, sharedExecutorService, Clock.systemUTC(), mock(ExperimentEnrollmentManager.class)),
new MessageMetrics(),
mock(PushNotificationManager.class),
mock(PushNotificationScheduler.class),

View File

@ -0,0 +1 @@
org.whispersystems.textsecuregcm.configuration.LocalFoundationDbDatabaseFactory

View File

@ -175,3 +175,5 @@ tlsKeyStore.password: unset
hlrLookup.apiKey: AAAAAAAAAAA
hlrLookup.apiSecret: AAAAAAAAAAA
foundationDbMessages.versionstampCipherKey.0: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=

View File

@ -555,3 +555,15 @@ server:
- type: h2c
port: 8080
useForwardedHeaders: true
foundationDbMessages:
maxWatchesPerClient: 10000
versionstampCipherKeys:
0: secret://foundationDbMessages.versionstampCipherKey.0
currentVersionstampCipherKey: 0
clusters:
"messages-0":
type: local
epochs:
0:
- messages-0