Remove obsolete Profiles cache key

This commit is contained in:
Chris Eager 2026-04-13 15:53:33 -05:00 committed by Chris Eager
parent a3c712f8d3
commit 53a35a827e
2 changed files with 40 additions and 80 deletions

View File

@ -11,17 +11,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.lettuce.core.RedisException;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import io.lettuce.core.RedisFuture;
import io.micrometer.core.instrument.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
@ -37,7 +34,6 @@ public class ProfilesManager {
private final Logger logger = LoggerFactory.getLogger(ProfilesManager.class);
private static final String CACHE_PREFIX = "profiles::";
private static final String CACHE_PREFIX_V1 = "profiles_v1::";
private final Profiles profiles;
@ -146,36 +142,32 @@ public class ProfilesManager {
private void redisSet(UUID uuid, VersionedProfile profile) {
try {
final String profileJson = mapper.writeValueAsString(profile);
final byte[] profileJson = mapper.writeValueAsBytes(profile);
cacheCluster.useCluster(connection -> connection.sync().hset(getCacheKey(uuid), profile.version(), profileJson));
cacheCluster.useBinaryCluster(connection -> connection.sync().hset(getCacheKeyV1(uuid), profile.version().getBytes(), profileJson.getBytes()));
cacheCluster.useBinaryCluster(connection -> connection.sync().hset(getCacheKeyV1(uuid), profile.version().getBytes(), profileJson));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
}
private CompletableFuture<Void> redisSetAsync(UUID uuid, VersionedProfile profile) {
final String profileJson;
final byte[] profileJson;
try {
profileJson = mapper.writeValueAsString(profile);
profileJson = mapper.writeValueAsBytes(profile);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
final RedisFuture<Boolean> oldSet = cacheCluster.withCluster(connection ->
connection.async().hset(getCacheKey(uuid), profile.version(), profileJson));
final RedisFuture<Boolean> newSet = cacheCluster.withBinaryCluster(connection ->
connection.async().hset(getCacheKeyV1(uuid), profile.version().getBytes(), profileJson.getBytes()));
return CompletableFuture.allOf(oldSet.toCompletableFuture(), newSet.toCompletableFuture())
return cacheCluster.withBinaryCluster(connection ->
connection.async().hset(getCacheKeyV1(uuid), profile.version().getBytes(), profileJson)).toCompletableFuture()
.thenRun(Util.NOOP)
.toCompletableFuture();
}
private Optional<VersionedProfile> redisGet(UUID uuid, String version) {
try {
@Nullable final String json = cacheCluster.withCluster(connection -> connection.sync().hget(getCacheKey(uuid), version));
@Nullable final byte[] json = cacheCluster.withBinaryCluster(connection -> connection.sync().hget(getCacheKeyV1(uuid), version.getBytes()));
return parseProfileJson(json);
} catch (RedisException e) {
@ -185,8 +177,8 @@ public class ProfilesManager {
}
private CompletableFuture<Optional<VersionedProfile>> redisGetAsync(UUID uuid, String version) {
return cacheCluster.withCluster(connection ->
connection.async().hget(getCacheKey(uuid), version))
return cacheCluster.withBinaryCluster(connection ->
connection.async().hget(getCacheKeyV1(uuid), version.getBytes()))
.thenApply(this::parseProfileJson)
.exceptionally(throwable -> {
logger.warn("Failed to read versioned profile from Redis", throwable);
@ -195,7 +187,7 @@ public class ProfilesManager {
.toCompletableFuture();
}
private Optional<VersionedProfile> parseProfileJson(@Nullable final String maybeJson) {
private Optional<VersionedProfile> parseProfileJson(@Nullable final byte[] maybeJson) {
try {
if (maybeJson != null) {
return Optional.of(mapper.readValue(maybeJson, VersionedProfile.class));
@ -209,23 +201,12 @@ public class ProfilesManager {
private CompletableFuture<Void> redisDelete(UUID uuid) {
final Supplier<CompletionStage<Void>> deletesSupplier = () ->
CompletableFuture.allOf(
cacheCluster.withCluster(connection -> connection.async().del(getCacheKey(uuid))).toCompletableFuture(),
cacheCluster.withBinaryCluster(connection -> connection.async().del(getCacheKeyV1(uuid)))
.toCompletableFuture()
);
return ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
.executeCompletionStage(retryExecutor, deletesSupplier)
.executeCompletionStage(retryExecutor, () -> cacheCluster.withBinaryCluster(connection -> connection.async().del(getCacheKeyV1(uuid))))
.toCompletableFuture()
.thenRun(Util.NOOP);
}
@VisibleForTesting
static String getCacheKey(UUID uuid) {
return CACHE_PREFIX + uuid.toString();
}
@VisibleForTesting
static byte[] getCacheKeyV1(UUID uuid) {
return (CACHE_PREFIX_V1 + '{' + uuid.toString() + '}').getBytes();

View File

@ -9,8 +9,8 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.AdditionalMatchers.aryEq;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -47,8 +47,6 @@ import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
public class ProfilesManagerTest {
private Profiles profiles;
private RedisAdvancedClusterCommands<String, String> commands;
private RedisAdvancedClusterAsyncCommands<String, String> asyncCommands;
private RedisAdvancedClusterCommands<byte[], byte[]> binaryCommands;
private RedisAdvancedClusterAsyncCommands<byte[], byte[]> binaryAsyncCommands;
private S3AsyncClient s3Client;
@ -60,13 +58,10 @@ public class ProfilesManagerTest {
@BeforeEach
void setUp() {
//noinspection unchecked
commands = mock(RedisAdvancedClusterCommands.class);
asyncCommands = mock(RedisAdvancedClusterAsyncCommands.class);
binaryCommands = mock(RedisAdvancedClusterCommands.class);
//noinspection unchecked
binaryAsyncCommands = mock(RedisAdvancedClusterAsyncCommands.class);
final FaultTolerantRedisClusterClient cacheCluster = RedisClusterHelper.builder()
.stringCommands(commands)
.stringAsyncCommands(asyncCommands)
.binaryCommands(binaryCommands)
.binaryAsyncCommands(binaryAsyncCommands)
.build();
@ -82,10 +77,11 @@ public class ProfilesManagerTest {
final UUID uuid = UUID.randomUUID();
final byte[] name = TestRandomUtil.nextBytes(81);
final byte[] commitment = new ProfileKey(new byte[32]).getCommitment(new ServiceId.Aci(uuid)).serialize();
when(commands.hget(eq(ProfilesManager.getCacheKey( uuid)), eq("someversion"))).thenReturn(String.format(
"{\"version\": \"someversion\", \"name\": \"%s\", \"avatar\": \"someavatar\", \"commitment\":\"%s\"}",
ProfileTestHelper.encodeToBase64(name),
ProfileTestHelper.encodeToBase64(commitment)));
when(binaryCommands.hget(eq(ProfilesManager.getCacheKeyV1(uuid)), aryEq("someversion".getBytes())))
.thenReturn(String.format(
"{\"version\": \"someversion\", \"name\": \"%s\", \"avatar\": \"someavatar\", \"commitment\":\"%s\"}",
ProfileTestHelper.encodeToBase64(name),
ProfileTestHelper.encodeToBase64(commitment)).getBytes());
Optional<VersionedProfile> profile = profilesManager.get(uuid, "someversion");
@ -94,8 +90,8 @@ public class ProfilesManagerTest {
assertEquals("someavatar", profile.get().avatar());
assertArrayEquals(profile.get().commitment(), commitment);
verify(commands, times(1)).hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"));
verifyNoMoreInteractions(commands);
verify(binaryCommands, times(1)).hget(eq(ProfilesManager.getCacheKeyV1(uuid)), aryEq("someversion".getBytes()));
verifyNoMoreInteractions(binaryCommands);
verifyNoMoreInteractions(profiles);
}
@ -105,10 +101,10 @@ public class ProfilesManagerTest {
final byte[] name = TestRandomUtil.nextBytes(81);
final byte[] commitment = new ProfileKey(new byte[32]).getCommitment(new ServiceId.Aci(uuid)).serialize();
when(asyncCommands.hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"))).thenReturn(
MockRedisFuture.completedFuture(String.format("{\"version\": \"someversion\", \"name\": \"%s\", \"avatar\": \"someavatar\", \"commitment\":\"%s\"}",
when(binaryAsyncCommands.hget(eq(ProfilesManager.getCacheKeyV1(uuid)), aryEq("someversion".getBytes())))
.thenReturn(MockRedisFuture.completedFuture(String.format("{\"version\": \"someversion\", \"name\": \"%s\", \"avatar\": \"someavatar\", \"commitment\":\"%s\"}",
ProfileTestHelper.encodeToBase64(name),
ProfileTestHelper.encodeToBase64(commitment))));
ProfileTestHelper.encodeToBase64(commitment)).getBytes()));
Optional<VersionedProfile> profile = profilesManager.getAsync(uuid, "someversion").join();
@ -117,8 +113,8 @@ public class ProfilesManagerTest {
assertEquals("someavatar", profile.get().avatar());
assertArrayEquals(profile.get().commitment(), commitment);
verify(asyncCommands, times(1)).hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"));
verifyNoMoreInteractions(asyncCommands);
verify(binaryAsyncCommands, times(1)).hget(eq(ProfilesManager.getCacheKeyV1(uuid)), aryEq("someversion".getBytes()));
verifyNoMoreInteractions(binaryAsyncCommands);
verifyNoMoreInteractions(profiles);
}
@ -129,7 +125,7 @@ public class ProfilesManagerTest {
final VersionedProfile profile = new VersionedProfile("someversion", name, "someavatar", null, null,
null, null, "somecommitment".getBytes());
when(commands.hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"))).thenReturn(null);
when(binaryCommands.hget(eq(ProfilesManager.getCacheKeyV1(uuid)), aryEq("someversion".getBytes()))).thenReturn(null);
when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile));
Optional<VersionedProfile> retrieved = profilesManager.get(uuid, "someversion");
@ -137,9 +133,9 @@ public class ProfilesManagerTest {
assertTrue(retrieved.isPresent());
assertSame(retrieved.get(), profile);
verify(commands, times(1)).hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"));
verify(commands, times(1)).hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString());
verifyNoMoreInteractions(commands);
verify(binaryCommands, times(1)).hget(eq(ProfilesManager.getCacheKeyV1(uuid)), aryEq("someversion".getBytes()));
verify(binaryCommands, times(1)).hset(eq(ProfilesManager.getCacheKeyV1(uuid)), aryEq("someversion".getBytes()), any(byte[].class));
verifyNoMoreInteractions(binaryCommands);
verify(profiles, times(1)).get(eq(uuid), eq("someversion"));
verifyNoMoreInteractions(profiles);
@ -152,8 +148,6 @@ public class ProfilesManagerTest {
final VersionedProfile profile = new VersionedProfile("someversion", name, "someavatar", null, null,
null, null, "somecommitment".getBytes());
when(asyncCommands.hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"))).thenReturn(MockRedisFuture.completedFuture(null));
when(asyncCommands.hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString())).thenReturn(MockRedisFuture.completedFuture(null));
when(binaryAsyncCommands.hget(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()))).thenReturn(MockRedisFuture.completedFuture(null));
when(binaryAsyncCommands.hset(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()), any(byte[].class))).thenReturn(MockRedisFuture.completedFuture(null));
when(profiles.getAsync(eq(uuid), eq("someversion"))).thenReturn(CompletableFuture.completedFuture(Optional.of(profile)));
@ -163,11 +157,8 @@ public class ProfilesManagerTest {
assertTrue(retrieved.isPresent());
assertSame(retrieved.get(), profile);
verify(asyncCommands, times(1)).hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"));
verify(asyncCommands, times(1)).hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString());
verify(binaryAsyncCommands, times(0)).hget(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()));
verify(binaryAsyncCommands, times(1)).hget(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()));
verify(binaryAsyncCommands, times(1)).hset(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()), any(byte[].class));
verifyNoMoreInteractions(asyncCommands);
verifyNoMoreInteractions(binaryAsyncCommands);
verify(profiles, times(1)).getAsync(eq(uuid), eq("someversion"));
@ -182,9 +173,9 @@ public class ProfilesManagerTest {
final VersionedProfile profile = new VersionedProfile("someversion", name, "someavatar", null, null,
null, null, "somecommitment".getBytes());
when(commands.hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"))).thenThrow(new RedisException("Connection lost"));
when(binaryCommands.hget(eq(ProfilesManager.getCacheKeyV1(uuid)), aryEq("someversion".getBytes()))).thenThrow(new RedisException("Connection lost"));
if (failUpdateCache) {
when(commands.hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString()))
when(binaryCommands.hset(eq(ProfilesManager.getCacheKeyV1(uuid)), aryEq("someversion".getBytes()), any(byte[].class)))
.thenThrow(new RedisException("Connection lost"));
}
when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile));
@ -194,9 +185,9 @@ public class ProfilesManagerTest {
assertTrue(retrieved.isPresent());
assertSame(retrieved.get(), profile);
verify(commands, times(1)).hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"));
verify(commands, times(1)).hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString());
verifyNoMoreInteractions(commands);
verify(binaryCommands, times(1)).hget(eq(ProfilesManager.getCacheKeyV1(uuid)), aryEq("someversion".getBytes()));
verify(binaryCommands, times(1)).hset(eq(ProfilesManager.getCacheKeyV1(uuid)), aryEq("someversion".getBytes()), any(byte[].class));
verifyNoMoreInteractions(binaryCommands);
verify(profiles, times(1)).get(eq(uuid), eq("someversion"));
verifyNoMoreInteractions(profiles);
@ -210,11 +201,6 @@ public class ProfilesManagerTest {
final VersionedProfile profile = new VersionedProfile("someversion", name, "someavatar", null, null,
null, null, "somecommitment".getBytes());
when(asyncCommands.hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"))).thenReturn(MockRedisFuture.failedFuture(new RedisException("Connection lost")));
when(asyncCommands.hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString()))
.thenReturn(failUpdateCache
? MockRedisFuture.failedFuture(new RedisException("Connection lost"))
: MockRedisFuture.completedFuture(null));
when(binaryAsyncCommands.hget(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()))).thenReturn(MockRedisFuture.failedFuture(new RedisException("Connection lost")));
when(binaryAsyncCommands.hset(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()), any(byte[].class)))
.thenReturn(failUpdateCache
@ -227,11 +213,9 @@ public class ProfilesManagerTest {
assertTrue(retrieved.isPresent());
assertSame(retrieved.get(), profile);
verify(asyncCommands, times(1)).hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"));
verify(asyncCommands, times(1)).hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString());
verify(binaryAsyncCommands, times(0)).hget(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()));
verify(binaryAsyncCommands, times(1)).hget(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()));
verify(binaryAsyncCommands, times(1)).hset(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()), any(byte[].class));
verifyNoMoreInteractions(asyncCommands);
verifyNoMoreInteractions(binaryAsyncCommands);
verify(profiles, times(1)).getAsync(eq(uuid), eq("someversion"));
verifyNoMoreInteractions(profiles);
@ -246,8 +230,8 @@ public class ProfilesManagerTest {
profilesManager.set(uuid, profile);
verify(commands, times(1)).hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), any());
verifyNoMoreInteractions(commands);
verify(binaryCommands, times(1)).hset(eq(ProfilesManager.getCacheKeyV1(uuid)), aryEq("someversion".getBytes()), any(byte[].class));
verifyNoMoreInteractions(binaryCommands);
verify(profiles, times(1)).set(eq(uuid), eq(profile));
verifyNoMoreInteractions(profiles);
@ -260,14 +244,11 @@ public class ProfilesManagerTest {
final VersionedProfile profile = new VersionedProfile("someversion", name, "someavatar", null, null,
null, null, "somecommitment".getBytes());
when(asyncCommands.hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString())).thenReturn(MockRedisFuture.completedFuture(null));
when(binaryAsyncCommands.hset(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()), any(byte[].class))).thenReturn(MockRedisFuture.completedFuture(null));
when(profiles.setAsync(eq(uuid), eq(profile))).thenReturn(CompletableFuture.completedFuture(null));
profilesManager.setAsync(uuid, profile).join();
verify(asyncCommands, times(1)).hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), any());
verifyNoMoreInteractions(asyncCommands);
verify(binaryAsyncCommands, times(1)).hset(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()), any());
verifyNoMoreInteractions(binaryAsyncCommands);
@ -283,7 +264,6 @@ public class ProfilesManagerTest {
final String avatarOne = "avatar1";
final String avatarTwo = "avatar2";
when(profiles.deleteAll(uuid)).thenReturn(CompletableFuture.completedFuture(List.of(avatarOne, avatarTwo)));
when(asyncCommands.del(ProfilesManager.getCacheKey(uuid))).thenReturn(MockRedisFuture.completedFuture(null));
when(binaryAsyncCommands.del(ProfilesManager.getCacheKeyV1(uuid))).thenReturn(MockRedisFuture.completedFuture(null));
when(s3Client.deleteObject(any(DeleteObjectRequest.class)))
.thenReturn(CompletableFuture.completedFuture(null))
@ -292,7 +272,6 @@ public class ProfilesManagerTest {
profilesManager.deleteAll(uuid, includeAvatar).join();
verify(profiles).deleteAll(uuid);
verify(asyncCommands).del(ProfilesManager.getCacheKey(uuid));
verify(binaryAsyncCommands).del(ProfilesManager.getCacheKeyV1(uuid));
if (includeAvatar) {
verify(s3Client).deleteObject(DeleteObjectRequest.builder()