foundationdb "versionstamp clock"
This commit is contained in:
parent
45f96cd702
commit
8deb5a803a
@ -0,0 +1,96 @@
|
||||
/*
|
||||
* Copyright 2026 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.storage.foundationdb;
|
||||
|
||||
import com.apple.foundationdb.Database;
|
||||
import com.apple.foundationdb.KeyValue;
|
||||
import com.apple.foundationdb.MutationType;
|
||||
import com.apple.foundationdb.subspace.Subspace;
|
||||
import com.apple.foundationdb.tuple.Tuple;
|
||||
import com.apple.foundationdb.tuple.Versionstamp;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.time.Clock;
|
||||
import java.time.Instant;
|
||||
import java.util.Iterator;
|
||||
import java.util.Optional;
|
||||
|
||||
/// A "versionstamp clock" records versionstamp/time pairs in a FoundationDB database. The purpose of the "clock" is to
|
||||
/// allow callers to get the versionstamp of a FoundationDB database at a specific real-world time to facilitate
|
||||
/// time-related range queries (e.g. "clear all keys with a versionstamp less than or equal to the versionstamp from
|
||||
/// seven days ago, since those keys are now expired").
|
||||
///
|
||||
/// Some external component should call [#recordVersionstampAndTime()] at regular, application-appropriate
|
||||
/// intervals to maintain a running mapping of real-world timestamps to FoundationDB versionstamps.
|
||||
public class VersionstampClock {
|
||||
|
||||
@VisibleForTesting
|
||||
static final Subspace SUBSPACE = new Subspace(Tuple.from("V"));
|
||||
|
||||
private final Database database;
|
||||
private final Clock clock;
|
||||
|
||||
public VersionstampClock(final Database database) {
|
||||
this(database, Clock.systemUTC());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public VersionstampClock(final Database database, final Clock clock) {
|
||||
this.database = database;
|
||||
this.clock = clock;
|
||||
}
|
||||
|
||||
/// Make a recording in the database of the current time and associated versionstamp.
|
||||
///
|
||||
/// @return the versionstamp for the newly-recorded entry
|
||||
public Versionstamp recordVersionstampAndTime() {
|
||||
final Instant currentTime = clock.instant();
|
||||
|
||||
return database.run(transaction -> {
|
||||
transaction.mutate(MutationType.SET_VERSIONSTAMPED_VALUE,
|
||||
getTimestampKey(currentTime),
|
||||
Tuple.from(Versionstamp.incomplete()).packWithVersionstamp());
|
||||
|
||||
return transaction.getVersionstamp();
|
||||
})
|
||||
.thenApply(Versionstamp::complete)
|
||||
.join();
|
||||
}
|
||||
|
||||
/// Returns the highest versionstamp recorded at or before the given instant.
|
||||
///
|
||||
/// @param timestamp the time for which to retrieve a versionstamp
|
||||
///
|
||||
/// @return the highest versionstamp recorded at or before the given instant, or empty if no versionstamps have been recorded before the given instant
|
||||
public Optional<Versionstamp> getVersionstamp(final Instant timestamp) {
|
||||
return database.read(transaction -> {
|
||||
|
||||
final byte[] rangeStart = SUBSPACE.getKey();
|
||||
final byte[] rangeEnd = getTimestampKey(timestamp.plusMillis(1));
|
||||
|
||||
final Iterator<KeyValue> keyValueIterator = transaction.getRange(rangeStart, rangeEnd, 1, true).iterator();
|
||||
|
||||
if (keyValueIterator.hasNext()) {
|
||||
return Optional.of(Tuple.fromBytes(keyValueIterator.next().getValue()).getVersionstamp(0));
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
});
|
||||
}
|
||||
|
||||
/// Remove any entries from the versionstamp-clock namespace that are strictly older than the given timestamp
|
||||
///
|
||||
/// @param oldestRetainedEntryTimestamp the earliest time for which we still want to be able to obtain a versionstamp
|
||||
public void clearExpiredEntries(final Instant oldestRetainedEntryTimestamp) {
|
||||
database.run(transaction -> {
|
||||
transaction.clear(SUBSPACE.getKey(), getTimestampKey(oldestRetainedEntryTimestamp));
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private byte[] getTimestampKey(final Instant timestamp) {
|
||||
return SUBSPACE.pack(timestamp.toEpochMilli());
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,40 @@
|
||||
/// These classes manage message storage in a set of FoundationDB databases.
|
||||
///
|
||||
/// # Schema
|
||||
///
|
||||
/// ## Sharding
|
||||
/// The messages database is expected to be larger than a single FoundationDB cluster
|
||||
/// can handle, and is therefore sharded by recipient ACI. The shard for a
|
||||
/// given recipient is identified by a [consistent
|
||||
/// hash][com.google.common.hash.Hashing#consistentHash()] applied to the low
|
||||
/// 64 bits of the ACI.
|
||||
///
|
||||
/// ## Messages
|
||||
/// * Root subspace `M`
|
||||
/// * Managed by [FoundationDbMessageStore]
|
||||
/// * Sharded by ACI: database `i` of `n` will only contain account subspaces for
|
||||
/// UUIDs where `consistentHash(u.getLeastSignificantBits(), n) == i`
|
||||
/// * Contains mailboxes and associated bookkeeping information for each user
|
||||
/// * Schema:
|
||||
/// ```
|
||||
/// + "M" (messages subspace)
|
||||
/// | + {aci (UUID)} (account subspace)
|
||||
/// | | * "l" => messages-available watch key, versionstamp of last message inserted when any recipient
|
||||
/// | | device was present
|
||||
/// | | + {device ID (byte)} (device subspace)
|
||||
/// | | | * "p" => presence: server-id || last-seen-time
|
||||
/// | | | + "Q" (device message queue subspace)
|
||||
/// | | | | * versionstamp => serialized envelope
|
||||
/// ```
|
||||
///
|
||||
/// ## Versionstamp Clock
|
||||
/// * Root subspace `V`
|
||||
/// * Managed by [VersionstampClock]
|
||||
/// * Exists in every messages database; (necessarily) independent for each
|
||||
/// * Global metadata tracking a mapping from time to versionstamp for the expired-message cleaner
|
||||
/// * Schema:
|
||||
/// ```
|
||||
/// + "V" (versionstamp-clock subspace)
|
||||
/// | + unix time => versionstamp
|
||||
/// ```
|
||||
package org.whispersystems.textsecuregcm.storage.foundationdb;
|
||||
@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Copyright 2026 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.storage.foundationdb;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.whispersystems.textsecuregcm.storage.FoundationDbClusterExtension;
|
||||
import org.whispersystems.textsecuregcm.util.TestClock;
|
||||
|
||||
import com.apple.foundationdb.tuple.Versionstamp;
|
||||
|
||||
class VersionstampClockTest {
|
||||
@RegisterExtension
|
||||
static FoundationDbClusterExtension FOUNDATION_DB_EXTENSION = new FoundationDbClusterExtension(1);
|
||||
|
||||
private final TestClock clock = TestClock.now();
|
||||
private final VersionstampClock versionstampClock = new VersionstampClock(
|
||||
FOUNDATION_DB_EXTENSION.getDatabases()[0], clock);
|
||||
|
||||
@BeforeEach
|
||||
void setup() {
|
||||
FOUNDATION_DB_EXTENSION.getDatabases()[0].run(transaction -> {
|
||||
transaction.clear(VersionstampClock.SUBSPACE.range());
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void noEntries() {
|
||||
assertThat(versionstampClock.getVersionstamp(clock.instant())).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void recordVersionstamp() {
|
||||
final Instant start = clock.instant();
|
||||
final Instant firstInsert = start.plus(Duration.ofSeconds(1));
|
||||
final Instant betweenInserts = start.plus(Duration.ofMinutes(30));
|
||||
final Instant secondInsert = start.plus(Duration.ofHours(1));
|
||||
final Instant tomorrow = start.plus(Duration.ofDays(1));
|
||||
|
||||
clock.pin(firstInsert);
|
||||
final Versionstamp firstStamp = versionstampClock.recordVersionstampAndTime();
|
||||
|
||||
clock.pin(secondInsert);
|
||||
final Versionstamp secondStamp = versionstampClock.recordVersionstampAndTime();
|
||||
|
||||
assertThat(versionstampClock.getVersionstamp(start)).isEmpty();
|
||||
assertThat(versionstampClock.getVersionstamp(firstInsert)).isPresent().hasValue(firstStamp);
|
||||
assertThat(versionstampClock.getVersionstamp(betweenInserts)).isPresent().hasValue(firstStamp);
|
||||
assertThat(versionstampClock.getVersionstamp(secondInsert)).isPresent().hasValue(secondStamp);
|
||||
assertThat(versionstampClock.getVersionstamp(tomorrow)).isPresent().hasValue(secondStamp);
|
||||
}
|
||||
|
||||
@Test
|
||||
void clearExpiredEntries() {
|
||||
final Instant start = clock.instant();
|
||||
final Instant veryOld = start.minus(Duration.ofDays(45));
|
||||
final Instant old = start.minus(Duration.ofDays(21));
|
||||
|
||||
clock.pin(veryOld);
|
||||
final Versionstamp veryOldStamp = versionstampClock.recordVersionstampAndTime();
|
||||
|
||||
clock.pin(old);
|
||||
final Versionstamp oldStamp = versionstampClock.recordVersionstampAndTime();
|
||||
|
||||
clock.pin(start);
|
||||
final Versionstamp nowStamp = versionstampClock.recordVersionstampAndTime();
|
||||
|
||||
assertThat(versionstampClock.getVersionstamp(veryOld)).isPresent().hasValue(veryOldStamp);
|
||||
|
||||
versionstampClock.clearExpiredEntries(old);
|
||||
|
||||
assertThat(versionstampClock.getVersionstamp(veryOld)).isEmpty();
|
||||
assertThat(versionstampClock.getVersionstamp(old)).isPresent().hasValue(oldStamp);
|
||||
assertThat(versionstampClock.getVersionstamp(start)).isPresent().hasValue(nowStamp);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user