Migrate Profiles cache key to use hash tag
This commit is contained in:
parent
56f25f5a39
commit
a3c712f8d3
@ -15,9 +15,12 @@ import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import javax.annotation.Nullable;
|
||||
import io.lettuce.core.RedisFuture;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -35,6 +38,7 @@ public class ProfilesManager {
|
||||
private final Logger logger = LoggerFactory.getLogger(ProfilesManager.class);
|
||||
|
||||
private static final String CACHE_PREFIX = "profiles::";
|
||||
private static final String CACHE_PREFIX_V1 = "profiles_v1::";
|
||||
|
||||
private final Profiles profiles;
|
||||
private final FaultTolerantRedisClusterClient cacheCluster;
|
||||
@ -145,6 +149,7 @@ public class ProfilesManager {
|
||||
final String profileJson = mapper.writeValueAsString(profile);
|
||||
|
||||
cacheCluster.useCluster(connection -> connection.sync().hset(getCacheKey(uuid), profile.version(), profileJson));
|
||||
cacheCluster.useBinaryCluster(connection -> connection.sync().hset(getCacheKeyV1(uuid), profile.version().getBytes(), profileJson.getBytes()));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
@ -159,8 +164,11 @@ public class ProfilesManager {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
||||
return cacheCluster.withCluster(connection ->
|
||||
connection.async().hset(getCacheKey(uuid), profile.version(), profileJson))
|
||||
final RedisFuture<Boolean> oldSet = cacheCluster.withCluster(connection ->
|
||||
connection.async().hset(getCacheKey(uuid), profile.version(), profileJson));
|
||||
final RedisFuture<Boolean> newSet = cacheCluster.withBinaryCluster(connection ->
|
||||
connection.async().hset(getCacheKeyV1(uuid), profile.version().getBytes(), profileJson.getBytes()));
|
||||
return CompletableFuture.allOf(oldSet.toCompletableFuture(), newSet.toCompletableFuture())
|
||||
.thenRun(Util.NOOP)
|
||||
.toCompletableFuture();
|
||||
}
|
||||
@ -200,9 +208,15 @@ public class ProfilesManager {
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> redisDelete(UUID uuid) {
|
||||
|
||||
final Supplier<CompletionStage<Void>> deletesSupplier = () ->
|
||||
CompletableFuture.allOf(
|
||||
cacheCluster.withCluster(connection -> connection.async().del(getCacheKey(uuid))).toCompletableFuture(),
|
||||
cacheCluster.withBinaryCluster(connection -> connection.async().del(getCacheKeyV1(uuid)))
|
||||
.toCompletableFuture()
|
||||
);
|
||||
return ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
.executeCompletionStage(retryExecutor,
|
||||
() -> cacheCluster.withCluster(connection -> connection.async().del(getCacheKey(uuid))))
|
||||
.executeCompletionStage(retryExecutor, deletesSupplier)
|
||||
.toCompletableFuture()
|
||||
.thenRun(Util.NOOP);
|
||||
}
|
||||
@ -211,4 +225,9 @@ public class ProfilesManager {
|
||||
static String getCacheKey(UUID uuid) {
|
||||
return CACHE_PREFIX + uuid.toString();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static byte[] getCacheKeyV1(UUID uuid) {
|
||||
return (CACHE_PREFIX_V1 + '{' + uuid.toString() + '}').getBytes();
|
||||
}
|
||||
}
|
||||
|
||||
@ -31,7 +31,6 @@ import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.signal.libsignal.protocol.ServiceId;
|
||||
import org.signal.libsignal.zkgroup.InvalidInputException;
|
||||
@ -50,6 +49,8 @@ public class ProfilesManagerTest {
|
||||
private Profiles profiles;
|
||||
private RedisAdvancedClusterCommands<String, String> commands;
|
||||
private RedisAdvancedClusterAsyncCommands<String, String> asyncCommands;
|
||||
private RedisAdvancedClusterCommands<byte[], byte[]> binaryCommands;
|
||||
private RedisAdvancedClusterAsyncCommands<byte[], byte[]> binaryAsyncCommands;
|
||||
private S3AsyncClient s3Client;
|
||||
|
||||
private ProfilesManager profilesManager;
|
||||
@ -61,9 +62,13 @@ public class ProfilesManagerTest {
|
||||
//noinspection unchecked
|
||||
commands = mock(RedisAdvancedClusterCommands.class);
|
||||
asyncCommands = mock(RedisAdvancedClusterAsyncCommands.class);
|
||||
binaryCommands = mock(RedisAdvancedClusterCommands.class);
|
||||
binaryAsyncCommands = mock(RedisAdvancedClusterAsyncCommands.class);
|
||||
final FaultTolerantRedisClusterClient cacheCluster = RedisClusterHelper.builder()
|
||||
.stringCommands(commands)
|
||||
.stringAsyncCommands(asyncCommands)
|
||||
.binaryCommands(binaryCommands)
|
||||
.binaryAsyncCommands(binaryAsyncCommands)
|
||||
.build();
|
||||
|
||||
profiles = mock(Profiles.class);
|
||||
@ -149,6 +154,8 @@ public class ProfilesManagerTest {
|
||||
|
||||
when(asyncCommands.hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"))).thenReturn(MockRedisFuture.completedFuture(null));
|
||||
when(asyncCommands.hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString())).thenReturn(MockRedisFuture.completedFuture(null));
|
||||
when(binaryAsyncCommands.hget(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()))).thenReturn(MockRedisFuture.completedFuture(null));
|
||||
when(binaryAsyncCommands.hset(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()), any(byte[].class))).thenReturn(MockRedisFuture.completedFuture(null));
|
||||
when(profiles.getAsync(eq(uuid), eq("someversion"))).thenReturn(CompletableFuture.completedFuture(Optional.of(profile)));
|
||||
|
||||
Optional<VersionedProfile> retrieved = profilesManager.getAsync(uuid, "someversion").join();
|
||||
@ -158,7 +165,10 @@ public class ProfilesManagerTest {
|
||||
|
||||
verify(asyncCommands, times(1)).hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"));
|
||||
verify(asyncCommands, times(1)).hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString());
|
||||
verify(binaryAsyncCommands, times(0)).hget(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()));
|
||||
verify(binaryAsyncCommands, times(1)).hset(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()), any(byte[].class));
|
||||
verifyNoMoreInteractions(asyncCommands);
|
||||
verifyNoMoreInteractions(binaryAsyncCommands);
|
||||
|
||||
verify(profiles, times(1)).getAsync(eq(uuid), eq("someversion"));
|
||||
verifyNoMoreInteractions(profiles);
|
||||
@ -205,6 +215,11 @@ public class ProfilesManagerTest {
|
||||
.thenReturn(failUpdateCache
|
||||
? MockRedisFuture.failedFuture(new RedisException("Connection lost"))
|
||||
: MockRedisFuture.completedFuture(null));
|
||||
when(binaryAsyncCommands.hget(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()))).thenReturn(MockRedisFuture.failedFuture(new RedisException("Connection lost")));
|
||||
when(binaryAsyncCommands.hset(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()), any(byte[].class)))
|
||||
.thenReturn(failUpdateCache
|
||||
? MockRedisFuture.failedFuture(new RedisException("Connection lost"))
|
||||
: MockRedisFuture.completedFuture(null));
|
||||
when(profiles.getAsync(eq(uuid), eq("someversion"))).thenReturn(CompletableFuture.completedFuture(Optional.of(profile)));
|
||||
|
||||
Optional<VersionedProfile> retrieved = profilesManager.getAsync(uuid, "someversion").join();
|
||||
@ -214,6 +229,8 @@ public class ProfilesManagerTest {
|
||||
|
||||
verify(asyncCommands, times(1)).hget(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"));
|
||||
verify(asyncCommands, times(1)).hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString());
|
||||
verify(binaryAsyncCommands, times(0)).hget(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()));
|
||||
verify(binaryAsyncCommands, times(1)).hset(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()), any(byte[].class));
|
||||
verifyNoMoreInteractions(asyncCommands);
|
||||
|
||||
verify(profiles, times(1)).getAsync(eq(uuid), eq("someversion"));
|
||||
@ -244,12 +261,15 @@ public class ProfilesManagerTest {
|
||||
null, null, "somecommitment".getBytes());
|
||||
|
||||
when(asyncCommands.hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), anyString())).thenReturn(MockRedisFuture.completedFuture(null));
|
||||
when(binaryAsyncCommands.hset(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()), any(byte[].class))).thenReturn(MockRedisFuture.completedFuture(null));
|
||||
when(profiles.setAsync(eq(uuid), eq(profile))).thenReturn(CompletableFuture.completedFuture(null));
|
||||
|
||||
profilesManager.setAsync(uuid, profile).join();
|
||||
|
||||
verify(asyncCommands, times(1)).hset(eq(ProfilesManager.getCacheKey(uuid)), eq("someversion"), any());
|
||||
verifyNoMoreInteractions(asyncCommands);
|
||||
verify(binaryAsyncCommands, times(1)).hset(eq(ProfilesManager.getCacheKeyV1(uuid)), eq("someversion".getBytes()), any());
|
||||
verifyNoMoreInteractions(binaryAsyncCommands);
|
||||
|
||||
verify(profiles, times(1)).setAsync(eq(uuid), eq(profile));
|
||||
verifyNoMoreInteractions(profiles);
|
||||
@ -264,6 +284,7 @@ public class ProfilesManagerTest {
|
||||
final String avatarTwo = "avatar2";
|
||||
when(profiles.deleteAll(uuid)).thenReturn(CompletableFuture.completedFuture(List.of(avatarOne, avatarTwo)));
|
||||
when(asyncCommands.del(ProfilesManager.getCacheKey(uuid))).thenReturn(MockRedisFuture.completedFuture(null));
|
||||
when(binaryAsyncCommands.del(ProfilesManager.getCacheKeyV1(uuid))).thenReturn(MockRedisFuture.completedFuture(null));
|
||||
when(s3Client.deleteObject(any(DeleteObjectRequest.class)))
|
||||
.thenReturn(CompletableFuture.completedFuture(null))
|
||||
.thenReturn(CompletableFuture.failedFuture(new RuntimeException("some error")));
|
||||
@ -272,6 +293,7 @@ public class ProfilesManagerTest {
|
||||
|
||||
verify(profiles).deleteAll(uuid);
|
||||
verify(asyncCommands).del(ProfilesManager.getCacheKey(uuid));
|
||||
verify(binaryAsyncCommands).del(ProfilesManager.getCacheKeyV1(uuid));
|
||||
if (includeAvatar) {
|
||||
verify(s3Client).deleteObject(DeleteObjectRequest.builder()
|
||||
.bucket(BUCKET)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user