From 2abf55e39594664888557e2380a02216b1b961c0 Mon Sep 17 00:00:00 2001 From: Jon Chambers <63609320+jon-signal@users.noreply.github.com> Date: Thu, 25 Jun 2026 17:41:33 -0400 Subject: [PATCH] Don't trigger state transitions inside retryable transactions --- .../FoundationDbMessagePublisher.java | 39 +++++++++++-------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessagePublisher.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessagePublisher.java index de2009000..18c8f561e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessagePublisher.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/foundationdb/FoundationDbMessagePublisher.java @@ -85,7 +85,7 @@ class FoundationDbMessagePublisher { /// Tracks the current state of the publisher state machine. Initial state presumes that messages are available in the queue. private State state = State.MESSAGES_AVAILABLE; - /// Reference to the sink we publishes messages to. + /// Reference to the sink we publish messages to. private volatile FluxSink emitter; /// Future that completes when the watch for {@link #messagesAvailableWatchKey} triggers. private CompletableFuture watchFuture; @@ -192,6 +192,7 @@ class FoundationDbMessagePublisher { setState(State.MESSAGES_AVAILABLE, event); transitionStateOnEvent(Event.INTERNAL_TRIGGER); } + case MESSAGE_AVAILABLE_WATCH_TRIGGERED -> setState(State.MESSAGE_AVAILABLE_SIGNAL_BUFFERED, event); case FETCH_OR_PUBLISH_ERROR_OCCURRED -> setState(State.ERROR, event); default -> knownTransition = false; } @@ -247,27 +248,33 @@ class FoundationDbMessagePublisher { /// /// @return a future of a list of [FoundationDbMessageStreamEntry.Message] with a max size of [#maxMessagesPerScan] private CompletableFuture> getMessagesBatch() { - final Consumer doBeforePageFetch = beforePageFetch.get(); + final Consumer doBeforePageFetch = beforePageFetch.get(); + return database.runAsync(transaction -> { doBeforePageFetch.accept(transaction); + return getItemsInRange(transaction, beginKeyCursor, endKeyExclusive, maxMessagesPerScan) .thenApply(lastKeyReadAndItems -> { - // Set our beginning key to just past the last key read so that we're ready for our next fetch - lastKeyReadAndItems.first() - .ifPresent(lastKeyRead -> beginKeyCursor = KeySelector.firstGreaterThan(lastKeyRead)); - - final List items = lastKeyReadAndItems.second(); - if (items.size() < maxMessagesPerScan) { - transitionStateOnEvent(Event.FETCHED_ALL_AVAILABLE_MESSAGES); - if (!terminateOnQueueEmpty) { - setWatch(transaction); - } + if (lastKeyReadAndItems.second().size() < maxMessagesPerScan && !terminateOnQueueEmpty) { + setWatch(transaction); } - return items; - }); - } - ); + return lastKeyReadAndItems; + }); + }) + // Defer any state mutations until after the transaction has been committed. The transaction block can + // fail/retry, and we don't want to trigger spurious state transitions when that happens. + .thenApply(lastKeyReadAndItems -> { + // Set our beginning key to just past the last key read so that we're ready for our next fetch + lastKeyReadAndItems.first() + .ifPresent(lastKeyRead -> beginKeyCursor = KeySelector.firstGreaterThan(lastKeyRead)); + + if (lastKeyReadAndItems.second().size() < maxMessagesPerScan) { + transitionStateOnEvent(Event.FETCHED_ALL_AVAILABLE_MESSAGES); + } + + return lastKeyReadAndItems.second(); + }); } /// Fetch messages in the range between `begin` and `end` limited to a batch size of `maxMessagesPerSccan`