Retire "total queue index" from message management scripts
This commit is contained in:
parent
38121bcbfa
commit
99bbbbe1ac
@ -53,7 +53,6 @@ import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
||||
import org.whispersystems.textsecuregcm.util.Pair;
|
||||
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
import reactor.core.observability.micrometer.Micrometer;
|
||||
import reactor.core.publisher.Flux;
|
||||
@ -89,9 +88,6 @@ import reactor.core.scheduler.Schedulers;
|
||||
* <dt>{@code queueLockKey}</dt>
|
||||
* <dd>Used to indicate that a queue is being modified by the {@link MessagePersister} and that {@code get_items} should
|
||||
* return an empty list.</dd>
|
||||
* <dt>{@code queueTotalIndexKey}</dt>
|
||||
* <dd>A sorted set of all queues in a shard. A queue’s score is the timestamp of its oldest message, which is used by
|
||||
* the {@link MessagePersister} to prioritize queues to persist.</dd>
|
||||
* </dl>
|
||||
* <p>
|
||||
* At a high level, the process is:
|
||||
@ -732,14 +728,6 @@ public class MessagesCache {
|
||||
return ("user_queue_metadata::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
static byte[] getQueueIndexKey(final UUID accountUuid, final byte deviceId) {
|
||||
return getQueueIndexKey(SlotHash.getSlot(accountUuid.toString() + "::" + deviceId));
|
||||
}
|
||||
|
||||
static byte[] getQueueIndexKey(final int slot) {
|
||||
return ("user_queue_index::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}").getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
static byte[] getSharedMrmKey(final UUID mrmGuid) {
|
||||
return ("mrm::{" + mrmGuid.toString() + "}").getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
@ -58,13 +58,11 @@ class MessagesCacheInsertScript {
|
||||
final List<byte[]> keys = List.of(
|
||||
MessagesCache.getMessageQueueKey(destinationUuid, destinationDevice), // queueKey
|
||||
MessagesCache.getMessageQueueMetadataKey(destinationUuid, destinationDevice), // queueMetadataKey
|
||||
MessagesCache.getQueueIndexKey(destinationUuid, destinationDevice), // queueTotalIndexKey
|
||||
RedisMessageAvailabilityManager.getClientEventChannel(destinationUuid, destinationDevice) // eventChannelKey
|
||||
);
|
||||
|
||||
final List<byte[]> args = new ArrayList<>(Arrays.asList(
|
||||
EnvelopeUtil.compress(envelope).toByteArray(), // message
|
||||
String.valueOf(envelope.getServerTimestamp()).getBytes(StandardCharsets.UTF_8), // currentTime
|
||||
envelope.getServerGuid().getBytes(StandardCharsets.UTF_8), // guid
|
||||
NEW_MESSAGE_EVENT_BYTES // eventPayload
|
||||
));
|
||||
|
||||
@ -37,8 +37,7 @@ class MessagesCacheRemoveByGuidScript {
|
||||
|
||||
final List<byte[]> keys = List.of(
|
||||
MessagesCache.getMessageQueueKey(destinationUuid, destinationDevice), // queueKey
|
||||
MessagesCache.getMessageQueueMetadataKey(destinationUuid, destinationDevice), // queueMetadataKey
|
||||
MessagesCache.getQueueIndexKey(destinationUuid, destinationDevice) // queueTotalIndexKey
|
||||
MessagesCache.getMessageQueueMetadataKey(destinationUuid, destinationDevice) // queueMetadataKey
|
||||
);
|
||||
final List<byte[]> args = messageGuids.stream().map(guid -> guid.toString().getBytes(StandardCharsets.UTF_8))
|
||||
.toList();
|
||||
|
||||
@ -39,8 +39,7 @@ class MessagesCacheRemoveQueueScript {
|
||||
|
||||
final List<byte[]> keys = List.of(
|
||||
MessagesCache.getMessageQueueKey(destinationUuid, destinationDevice), // queueKey
|
||||
MessagesCache.getMessageQueueMetadataKey(destinationUuid, destinationDevice), // queueMetadataKey
|
||||
MessagesCache.getQueueIndexKey(destinationUuid, destinationDevice) // queueTotalIndexKey
|
||||
MessagesCache.getMessageQueueMetadataKey(destinationUuid, destinationDevice) // queueMetadataKey
|
||||
);
|
||||
|
||||
final List<byte[]> args = new ArrayList<>();
|
||||
|
||||
@ -3,12 +3,10 @@
|
||||
|
||||
local queueKey = KEYS[1] -- sorted set of Envelopes for a device, by queue-local ID
|
||||
local queueMetadataKey = KEYS[2] -- hash of message GUID to queue-local IDs
|
||||
local queueTotalIndexKey = KEYS[3] -- sorted set of all queues in the shard, by timestamp of oldest message
|
||||
local eventChannelKey = KEYS[4] -- pub/sub channel for message availability events
|
||||
local eventChannelKey = KEYS[3] -- pub/sub channel for message availability events
|
||||
local message = ARGV[1] -- [bytes] the Envelope to insert
|
||||
local currentTime = ARGV[2] -- [number] the message timestamp, to sort the queue in the queueTotalIndex
|
||||
local guid = ARGV[3] -- [string] the message GUID
|
||||
local eventPayload = ARGV[4] -- [bytes] a protobuf payload for a "message available" pub/sub event
|
||||
local guid = ARGV[2] -- [string] the message GUID
|
||||
local eventPayload = ARGV[3] -- [bytes] a protobuf payload for a "message available" pub/sub event
|
||||
|
||||
if redis.call("HEXISTS", queueMetadataKey, guid) == 1 then
|
||||
return tonumber(redis.call("HGET", queueMetadataKey, guid))
|
||||
@ -22,6 +20,4 @@ redis.call("HSET", queueMetadataKey, guid, messageId)
|
||||
redis.call("EXPIRE", queueKey, 3974400) -- 46 days
|
||||
redis.call("EXPIRE", queueMetadataKey, 3974400) -- 46 days
|
||||
|
||||
redis.call("ZADD", queueTotalIndexKey, "NX", currentTime, queueKey)
|
||||
|
||||
return redis.call("SPUBLISH", eventChannelKey, eventPayload) > 0
|
||||
|
||||
@ -5,7 +5,6 @@
|
||||
|
||||
local queueKey = KEYS[1] -- sorted set of Envelopes for a device, by queue-local ID
|
||||
local queueMetadataKey = KEYS[2] -- hash of message GUID to queue-local IDs
|
||||
local queueTotalIndexKey = KEYS[3] -- sorted set of all queues in the shard, by timestamp of oldest message
|
||||
local messageGuids = ARGV -- [list[string]] message GUIDs
|
||||
|
||||
local removedMessages = {}
|
||||
@ -28,7 +27,6 @@ end
|
||||
if (redis.call("ZCARD", queueKey) == 0) then
|
||||
redis.call("DEL", queueKey)
|
||||
redis.call("DEL", queueMetadataKey)
|
||||
redis.call("ZREM", queueTotalIndexKey, queueKey)
|
||||
end
|
||||
|
||||
return removedMessages
|
||||
|
||||
@ -6,7 +6,6 @@
|
||||
|
||||
local queueKey = KEYS[1] -- sorted set of Envelopes for a device, by queue-local ID
|
||||
local queueMetadataKey = KEYS[2] -- hash of message GUID to queue-local IDs
|
||||
local queueTotalIndexKey = KEYS[3] -- sorted set of all queues in the shard, by timestamp of oldest message
|
||||
local limit = ARGV[1] -- the maximum number of messages to return
|
||||
local processedMessageGuids = { unpack(ARGV, 2) }
|
||||
|
||||
@ -23,7 +22,6 @@ local messages = redis.call("ZRANGE", queueKey, 0, limit-1)
|
||||
if #messages == 0 then
|
||||
redis.call("DEL", queueKey)
|
||||
redis.call("DEL", queueMetadataKey)
|
||||
redis.call("ZREM", queueTotalIndexKey, queueKey)
|
||||
end
|
||||
|
||||
return messages
|
||||
|
||||
Loading…
Reference in New Issue
Block a user