Don't trigger state transitions inside retryable transactions
This commit is contained in:
parent
808bb16103
commit
2abf55e395
@ -85,7 +85,7 @@ class FoundationDbMessagePublisher {
|
||||
|
||||
/// Tracks the current state of the publisher state machine. Initial state presumes that messages are available in the queue.
|
||||
private State state = State.MESSAGES_AVAILABLE;
|
||||
/// Reference to the sink we publishes messages to.
|
||||
/// Reference to the sink we publish messages to.
|
||||
private volatile FluxSink<FoundationDbMessageStreamEntry.Message> emitter;
|
||||
/// Future that completes when the watch for {@link #messagesAvailableWatchKey} triggers.
|
||||
private CompletableFuture<Void> watchFuture;
|
||||
@ -192,6 +192,7 @@ class FoundationDbMessagePublisher {
|
||||
setState(State.MESSAGES_AVAILABLE, event);
|
||||
transitionStateOnEvent(Event.INTERNAL_TRIGGER);
|
||||
}
|
||||
case MESSAGE_AVAILABLE_WATCH_TRIGGERED -> setState(State.MESSAGE_AVAILABLE_SIGNAL_BUFFERED, event);
|
||||
case FETCH_OR_PUBLISH_ERROR_OCCURRED -> setState(State.ERROR, event);
|
||||
default -> knownTransition = false;
|
||||
}
|
||||
@ -247,27 +248,33 @@ class FoundationDbMessagePublisher {
|
||||
///
|
||||
/// @return a future of a list of [FoundationDbMessageStreamEntry.Message] with a max size of [#maxMessagesPerScan]
|
||||
private CompletableFuture<List<FoundationDbMessageStreamEntry.Message>> getMessagesBatch() {
|
||||
final Consumer<Transaction> doBeforePageFetch = beforePageFetch.get();
|
||||
final Consumer<Transaction> doBeforePageFetch = beforePageFetch.get();
|
||||
|
||||
return database.runAsync(transaction -> {
|
||||
doBeforePageFetch.accept(transaction);
|
||||
|
||||
return getItemsInRange(transaction, beginKeyCursor, endKeyExclusive, maxMessagesPerScan)
|
||||
.thenApply(lastKeyReadAndItems -> {
|
||||
// Set our beginning key to just past the last key read so that we're ready for our next fetch
|
||||
lastKeyReadAndItems.first()
|
||||
.ifPresent(lastKeyRead -> beginKeyCursor = KeySelector.firstGreaterThan(lastKeyRead));
|
||||
|
||||
final List<FoundationDbMessageStreamEntry.Message> items = lastKeyReadAndItems.second();
|
||||
if (items.size() < maxMessagesPerScan) {
|
||||
transitionStateOnEvent(Event.FETCHED_ALL_AVAILABLE_MESSAGES);
|
||||
if (!terminateOnQueueEmpty) {
|
||||
setWatch(transaction);
|
||||
}
|
||||
if (lastKeyReadAndItems.second().size() < maxMessagesPerScan && !terminateOnQueueEmpty) {
|
||||
setWatch(transaction);
|
||||
}
|
||||
return items;
|
||||
});
|
||||
|
||||
}
|
||||
);
|
||||
return lastKeyReadAndItems;
|
||||
});
|
||||
})
|
||||
// Defer any state mutations until after the transaction has been committed. The transaction block can
|
||||
// fail/retry, and we don't want to trigger spurious state transitions when that happens.
|
||||
.thenApply(lastKeyReadAndItems -> {
|
||||
// Set our beginning key to just past the last key read so that we're ready for our next fetch
|
||||
lastKeyReadAndItems.first()
|
||||
.ifPresent(lastKeyRead -> beginKeyCursor = KeySelector.firstGreaterThan(lastKeyRead));
|
||||
|
||||
if (lastKeyReadAndItems.second().size() < maxMessagesPerScan) {
|
||||
transitionStateOnEvent(Event.FETCHED_ALL_AVAILABLE_MESSAGES);
|
||||
}
|
||||
|
||||
return lastKeyReadAndItems.second();
|
||||
});
|
||||
}
|
||||
|
||||
/// Fetch messages in the range between `begin` and `end` limited to a batch size of `maxMessagesPerSccan`
|
||||
|
||||
Loading…
Reference in New Issue
Block a user