diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/VersionstampClock.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/VersionstampClock.java new file mode 100644 index 000000000..ee31366e3 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/VersionstampClock.java @@ -0,0 +1,96 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage.foundationdb; + +import com.apple.foundationdb.Database; +import com.apple.foundationdb.KeyValue; +import com.apple.foundationdb.MutationType; +import com.apple.foundationdb.subspace.Subspace; +import com.apple.foundationdb.tuple.Tuple; +import com.apple.foundationdb.tuple.Versionstamp; +import com.google.common.annotations.VisibleForTesting; +import java.time.Clock; +import java.time.Instant; +import java.util.Iterator; +import java.util.Optional; + + /// A "versionstamp clock" records versionstamp/time pairs in a FoundationDB database. The purpose of the "clock" is to + /// allow callers to get the versionstamp of a FoundationDB database at a specific real-world time to facilitate + /// time-related range queries (e.g. "clear all keys with a versionstamp less than or equal to the versionstamp from + /// seven days ago, since those keys are now expired"). + /// + /// Some external component should call [#recordVersionstampAndTime()] at regular, application-appropriate + /// intervals to maintain a running mapping of real-world timestamps to FoundationDB versionstamps. +public class VersionstampClock { + + @VisibleForTesting + static final Subspace SUBSPACE = new Subspace(Tuple.from("V")); + + private final Database database; + private final Clock clock; + + public VersionstampClock(final Database database) { + this(database, Clock.systemUTC()); + } + + @VisibleForTesting + public VersionstampClock(final Database database, final Clock clock) { + this.database = database; + this.clock = clock; + } + + /// Make a recording in the database of the current time and associated versionstamp. + /// + /// @return the versionstamp for the newly-recorded entry + public Versionstamp recordVersionstampAndTime() { + final Instant currentTime = clock.instant(); + + return database.run(transaction -> { + transaction.mutate(MutationType.SET_VERSIONSTAMPED_VALUE, + getTimestampKey(currentTime), + Tuple.from(Versionstamp.incomplete()).packWithVersionstamp()); + + return transaction.getVersionstamp(); + }) + .thenApply(Versionstamp::complete) + .join(); + } + + /// Returns the highest versionstamp recorded at or before the given instant. + /// + /// @param timestamp the time for which to retrieve a versionstamp + /// + /// @return the highest versionstamp recorded at or before the given instant, or empty if no versionstamps have been recorded before the given instant + public Optional getVersionstamp(final Instant timestamp) { + return database.read(transaction -> { + + final byte[] rangeStart = SUBSPACE.getKey(); + final byte[] rangeEnd = getTimestampKey(timestamp.plusMillis(1)); + + final Iterator keyValueIterator = transaction.getRange(rangeStart, rangeEnd, 1, true).iterator(); + + if (keyValueIterator.hasNext()) { + return Optional.of(Tuple.fromBytes(keyValueIterator.next().getValue()).getVersionstamp(0)); + } + + return Optional.empty(); + }); + } + + /// Remove any entries from the versionstamp-clock namespace that are strictly older than the given timestamp + /// + /// @param oldestRetainedEntryTimestamp the earliest time for which we still want to be able to obtain a versionstamp + public void clearExpiredEntries(final Instant oldestRetainedEntryTimestamp) { + database.run(transaction -> { + transaction.clear(SUBSPACE.getKey(), getTimestampKey(oldestRetainedEntryTimestamp)); + return null; + }); + } + + private byte[] getTimestampKey(final Instant timestamp) { + return SUBSPACE.pack(timestamp.toEpochMilli()); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/package-info.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/package-info.java new file mode 100644 index 000000000..4f22104e6 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/package-info.java @@ -0,0 +1,40 @@ +/// These classes manage message storage in a set of FoundationDB databases. +/// +/// # Schema +/// +/// ## Sharding +/// The messages database is expected to be larger than a single FoundationDB cluster +/// can handle, and is therefore sharded by recipient ACI. The shard for a +/// given recipient is identified by a [consistent +/// hash][com.google.common.hash.Hashing#consistentHash()] applied to the low +/// 64 bits of the ACI. +/// +/// ## Messages +/// * Root subspace `M` +/// * Managed by [FoundationDbMessageStore] +/// * Sharded by ACI: database `i` of `n` will only contain account subspaces for +/// UUIDs where `consistentHash(u.getLeastSignificantBits(), n) == i` +/// * Contains mailboxes and associated bookkeeping information for each user +/// * Schema: +/// ``` +/// + "M" (messages subspace) +/// | + {aci (UUID)} (account subspace) +/// | | * "l" => messages-available watch key, versionstamp of last message inserted when any recipient +/// | | device was present +/// | | + {device ID (byte)} (device subspace) +/// | | | * "p" => presence: server-id || last-seen-time +/// | | | + "Q" (device message queue subspace) +/// | | | | * versionstamp => serialized envelope +/// ``` +/// +/// ## Versionstamp Clock +/// * Root subspace `V` +/// * Managed by [VersionstampClock] +/// * Exists in every messages database; (necessarily) independent for each +/// * Global metadata tracking a mapping from time to versionstamp for the expired-message cleaner +/// * Schema: +/// ``` +/// + "V" (versionstamp-clock subspace) +/// | + unix time => versionstamp +/// ``` +package org.whispersystems.textsecuregcm.storage.foundationdb; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/VersionstampClockTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/VersionstampClockTest.java new file mode 100644 index 000000000..20338d3d2 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/foundationdb/VersionstampClockTest.java @@ -0,0 +1,85 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage.foundationdb; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.time.Instant; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.whispersystems.textsecuregcm.storage.FoundationDbClusterExtension; +import org.whispersystems.textsecuregcm.util.TestClock; + +import com.apple.foundationdb.tuple.Versionstamp; + +class VersionstampClockTest { + @RegisterExtension + static FoundationDbClusterExtension FOUNDATION_DB_EXTENSION = new FoundationDbClusterExtension(1); + + private final TestClock clock = TestClock.now(); + private final VersionstampClock versionstampClock = new VersionstampClock( + FOUNDATION_DB_EXTENSION.getDatabases()[0], clock); + + @BeforeEach + void setup() { + FOUNDATION_DB_EXTENSION.getDatabases()[0].run(transaction -> { + transaction.clear(VersionstampClock.SUBSPACE.range()); + return null; + }); + } + + @Test + void noEntries() { + assertThat(versionstampClock.getVersionstamp(clock.instant())).isEmpty(); + } + + @Test + void recordVersionstamp() { + final Instant start = clock.instant(); + final Instant firstInsert = start.plus(Duration.ofSeconds(1)); + final Instant betweenInserts = start.plus(Duration.ofMinutes(30)); + final Instant secondInsert = start.plus(Duration.ofHours(1)); + final Instant tomorrow = start.plus(Duration.ofDays(1)); + + clock.pin(firstInsert); + final Versionstamp firstStamp = versionstampClock.recordVersionstampAndTime(); + + clock.pin(secondInsert); + final Versionstamp secondStamp = versionstampClock.recordVersionstampAndTime(); + + assertThat(versionstampClock.getVersionstamp(start)).isEmpty(); + assertThat(versionstampClock.getVersionstamp(firstInsert)).isPresent().hasValue(firstStamp); + assertThat(versionstampClock.getVersionstamp(betweenInserts)).isPresent().hasValue(firstStamp); + assertThat(versionstampClock.getVersionstamp(secondInsert)).isPresent().hasValue(secondStamp); + assertThat(versionstampClock.getVersionstamp(tomorrow)).isPresent().hasValue(secondStamp); + } + + @Test + void clearExpiredEntries() { + final Instant start = clock.instant(); + final Instant veryOld = start.minus(Duration.ofDays(45)); + final Instant old = start.minus(Duration.ofDays(21)); + + clock.pin(veryOld); + final Versionstamp veryOldStamp = versionstampClock.recordVersionstampAndTime(); + + clock.pin(old); + final Versionstamp oldStamp = versionstampClock.recordVersionstampAndTime(); + + clock.pin(start); + final Versionstamp nowStamp = versionstampClock.recordVersionstampAndTime(); + + assertThat(versionstampClock.getVersionstamp(veryOld)).isPresent().hasValue(veryOldStamp); + + versionstampClock.clearExpiredEntries(old); + + assertThat(versionstampClock.getVersionstamp(veryOld)).isEmpty(); + assertThat(versionstampClock.getVersionstamp(old)).isPresent().hasValue(oldStamp); + assertThat(versionstampClock.getVersionstamp(start)).isPresent().hasValue(nowStamp); + } +}