Move per-recipient error handling to the job queue
This commit is contained in:
parent
9c27a85845
commit
201f77818f
@ -627,7 +627,6 @@
|
||||
503BD28B2B44DA64009624FC /* OWSFakeProfileManager.swift in Sources */ = {isa = PBXBuildFile; fileRef = 503BD28A2B44DA64009624FC /* OWSFakeProfileManager.swift */; };
|
||||
503BDDB6296F5BE100FED3B2 /* ContactReminderTableViewCell.swift in Sources */ = {isa = PBXBuildFile; fileRef = 503BDDB5296F5BE100FED3B2 /* ContactReminderTableViewCell.swift */; };
|
||||
503C2F432977752B00217527 /* OWSURLSessionEndpoint.swift in Sources */ = {isa = PBXBuildFile; fileRef = 503C2F422977752B00217527 /* OWSURLSessionEndpoint.swift */; };
|
||||
503CEB642CD17C5F00F50148 /* MessageSenderRecipientErrors.swift in Sources */ = {isa = PBXBuildFile; fileRef = 503CEB632CD17C5F00F50148 /* MessageSenderRecipientErrors.swift */; };
|
||||
50423CA42BBF427900DCB8F5 /* StaleProfileFetcher.swift in Sources */ = {isa = PBXBuildFile; fileRef = 50423CA32BBF427900DCB8F5 /* StaleProfileFetcher.swift */; };
|
||||
504271B62BB4C54500E33C01 /* SystemContact.swift in Sources */ = {isa = PBXBuildFile; fileRef = 504271B52BB4C54500E33C01 /* SystemContact.swift */; };
|
||||
5042EAA3287F96FB00C9B19F /* VisibleBadgeResolverTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5042EAA2287F96FB00C9B19F /* VisibleBadgeResolverTest.swift */; };
|
||||
@ -638,6 +637,7 @@
|
||||
5049FA2F28BEAABE00D6E099 /* ContactDiscoveryV2Operation.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5049FA2D28BEAABE00D6E099 /* ContactDiscoveryV2Operation.swift */; };
|
||||
504F397C29D23B1700E849A6 /* ValidatedIncomingEnvelope.swift in Sources */ = {isa = PBXBuildFile; fileRef = 504F397B29D23B1700E849A6 /* ValidatedIncomingEnvelope.swift */; };
|
||||
504F98B12EAFFAC600DF465B /* KyberPreKeyUseRecord.swift in Sources */ = {isa = PBXBuildFile; fileRef = 504F98B02EAFFAC600DF465B /* KyberPreKeyUseRecord.swift */; };
|
||||
504F98B32EB0270A00DF465B /* SendMessageFailure.swift in Sources */ = {isa = PBXBuildFile; fileRef = 504F98B22EB0270A00DF465B /* SendMessageFailure.swift */; };
|
||||
5050A8792B76E2E100E9BFA4 /* PreKeyId.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5050A8782B76E2E100E9BFA4 /* PreKeyId.swift */; };
|
||||
5050A87B2B76EEC500E9BFA4 /* PreKeyIdTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5050A87A2B76EEC500E9BFA4 /* PreKeyIdTest.swift */; };
|
||||
505166D62BB37DA700FF6B4A /* IncomingCallEventSyncMessageParams.swift in Sources */ = {isa = PBXBuildFile; fileRef = D979CC252AD3933B006AAC49 /* IncomingCallEventSyncMessageParams.swift */; };
|
||||
@ -4596,7 +4596,6 @@
|
||||
503BD28A2B44DA64009624FC /* OWSFakeProfileManager.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = OWSFakeProfileManager.swift; sourceTree = "<group>"; };
|
||||
503BDDB5296F5BE100FED3B2 /* ContactReminderTableViewCell.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ContactReminderTableViewCell.swift; sourceTree = "<group>"; };
|
||||
503C2F422977752B00217527 /* OWSURLSessionEndpoint.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = OWSURLSessionEndpoint.swift; sourceTree = "<group>"; };
|
||||
503CEB632CD17C5F00F50148 /* MessageSenderRecipientErrors.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MessageSenderRecipientErrors.swift; sourceTree = "<group>"; };
|
||||
50423CA32BBF427900DCB8F5 /* StaleProfileFetcher.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = StaleProfileFetcher.swift; sourceTree = "<group>"; };
|
||||
504271B52BB4C54500E33C01 /* SystemContact.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SystemContact.swift; sourceTree = "<group>"; };
|
||||
5042EAA2287F96FB00C9B19F /* VisibleBadgeResolverTest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = VisibleBadgeResolverTest.swift; sourceTree = "<group>"; };
|
||||
@ -4605,6 +4604,7 @@
|
||||
5049FA2D28BEAABE00D6E099 /* ContactDiscoveryV2Operation.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ContactDiscoveryV2Operation.swift; sourceTree = "<group>"; };
|
||||
504F397B29D23B1700E849A6 /* ValidatedIncomingEnvelope.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ValidatedIncomingEnvelope.swift; sourceTree = "<group>"; };
|
||||
504F98B02EAFFAC600DF465B /* KyberPreKeyUseRecord.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = KyberPreKeyUseRecord.swift; sourceTree = "<group>"; };
|
||||
504F98B22EB0270A00DF465B /* SendMessageFailure.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SendMessageFailure.swift; sourceTree = "<group>"; };
|
||||
5050A8782B76E2E100E9BFA4 /* PreKeyId.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PreKeyId.swift; sourceTree = "<group>"; };
|
||||
5050A87A2B76EEC500E9BFA4 /* PreKeyIdTest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PreKeyIdTest.swift; sourceTree = "<group>"; };
|
||||
5052AF5D2ACB0E9700D7EE9F /* MergePair.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MergePair.swift; sourceTree = "<group>"; };
|
||||
@ -13722,7 +13722,6 @@
|
||||
F9C5C99B289453B100548EEE /* MessageSender+Errors.swift */,
|
||||
F9C5C954289453B100548EEE /* MessageSender+SenderKey.swift */,
|
||||
F9C5C8D5289453B100548EEE /* MessageSender.swift */,
|
||||
503CEB632CD17C5F00F50148 /* MessageSenderRecipientErrors.swift */,
|
||||
F9C5C976289453B100548EEE /* MessageSendLog.swift */,
|
||||
500BAD7E2C519F2D00B4CD7F /* MessageTimestampGenerator.swift */,
|
||||
500BAD7F2C519F2D00B4CD7F /* MessageTimestampGeneratorTest.swift */,
|
||||
@ -13767,6 +13766,7 @@
|
||||
66D13F092A731E590092D47B /* RecipientHidingManager+SignalServiceAddress.swift */,
|
||||
E1A090372A4B909B00F2BE8B /* RecipientHidingManager.swift */,
|
||||
50B0E9472AC73C3B005D46AB /* RecipientStateMerger.swift */,
|
||||
504F98B22EB0270A00DF465B /* SendMessageFailure.swift */,
|
||||
50A5AA9A2A7449D000CF2ECC /* ServerReceiptEnvelope.swift */,
|
||||
F9C5C947289453B100548EEE /* TypingIndicatorMessage.swift */,
|
||||
504F397B29D23B1700E849A6 /* ValidatedIncomingEnvelope.swift */,
|
||||
@ -18125,7 +18125,6 @@
|
||||
F9C5CBCA289453B300548EEE /* MessageSender.swift in Sources */,
|
||||
F9C5CDC8289453B400548EEE /* MessageSenderJobQueue.swift in Sources */,
|
||||
D9AE0AD929187F850063488B /* MessageSenderJobRecord.swift in Sources */,
|
||||
503CEB642CD17C5F00F50148 /* MessageSenderRecipientErrors.swift in Sources */,
|
||||
F9C5CC64289453B300548EEE /* MessageSendLog.swift in Sources */,
|
||||
F9C5CC19289453B300548EEE /* MessageSticker.swift in Sources */,
|
||||
66E793E52BC0D8A600929E5E /* MessageStickerManager.swift in Sources */,
|
||||
@ -18515,6 +18514,7 @@
|
||||
F9C5CD37289453B300548EEE /* SenderKeyStore.swift in Sources */,
|
||||
725465392BA01FAA00EABFD2 /* SendGiftBadgeJobQueue.swift in Sources */,
|
||||
D9AE0AD5291877600063488B /* SendGiftBadgeJobRecord.swift in Sources */,
|
||||
504F98B32EB0270A00DF465B /* SendMessageFailure.swift in Sources */,
|
||||
6619A1C52B2BA05C004B38FE /* SentMessageTranscript.swift in Sources */,
|
||||
6619A1C12B2A8587004B38FE /* SentMessageTranscriptReceiver+Shims.swift in Sources */,
|
||||
6619A1BA2B2A80B0004B38FE /* SentMessageTranscriptReceiver.swift in Sources */,
|
||||
|
||||
@ -486,18 +486,35 @@ public class MessageSenderJobQueue {
|
||||
let maxRetries = 110
|
||||
while true {
|
||||
assert(!Task.isCancelled, "Cancellation isn't supported.")
|
||||
do {
|
||||
operation.clearExternalRetryTriggers()
|
||||
try await SSKEnvironment.shared.messageSenderRef.sendMessage(operation.message)
|
||||
operation.clearExternalRetryTriggers()
|
||||
let result = await SSKEnvironment.shared.messageSenderRef.sendMessage(operation.message)
|
||||
let errors: [any Error]
|
||||
let arbitraryError: any Error
|
||||
switch result {
|
||||
case .success:
|
||||
return
|
||||
} catch where MessageSender.isRetryableError(error) && !error.isFatalError && attemptCount < maxRetries {
|
||||
attemptCount += 1
|
||||
if !operation.job.isInMemoryOnly {
|
||||
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
||||
operation.job.record.addFailure(tx: tx)
|
||||
}
|
||||
case .overallFailure(let error):
|
||||
errors = [error]
|
||||
arbitraryError = error
|
||||
case .recipientsFailure(let failure):
|
||||
errors = failure.recipientErrors.map(\.error)
|
||||
arbitraryError = failure.arbitraryError
|
||||
}
|
||||
var retryableError: (any Error)?
|
||||
var externalRetryTriggers: ExternalRetryTriggers = []
|
||||
var suggestedRetryDelay: TimeInterval = 0
|
||||
for error in errors {
|
||||
// Some errors should never be retried. Because group send is
|
||||
// all-or-nothing, this means we need to fail the entire operation even
|
||||
// when retries may work for other recipients.
|
||||
if error.isFatalError {
|
||||
throw error
|
||||
}
|
||||
// Keep track of the first retryable error we encounter -- we'd prefer to
|
||||
// throw a retryable error rather than one that's not retryable.
|
||||
if MessageSender.isRetryableError(error) {
|
||||
retryableError = retryableError ?? error
|
||||
}
|
||||
var externalRetryTriggers: ExternalRetryTriggers = []
|
||||
// If there's a network failure, this is an external error, so we want to
|
||||
// retry as soon as we reconnect.
|
||||
if error.isNetworkFailure {
|
||||
@ -511,31 +528,43 @@ public class MessageSenderJobQueue {
|
||||
if error.isTimeout {
|
||||
externalRetryTriggers.insert(.networkBecameReachable)
|
||||
}
|
||||
// Determine the minimum amount of backoff.
|
||||
let maxAverageBackoff: TimeInterval = 14.1 * .minute
|
||||
let exponentialRetryDelay: TimeInterval = OWSOperation.retryIntervalForExponentialBackoff(
|
||||
failureCount: attemptCount,
|
||||
maxAverageBackoff: maxAverageBackoff,
|
||||
)
|
||||
// If we have a Retry-After header, use it (within reasonable limits).
|
||||
let suggestedRetryDelay: TimeInterval? = error.httpRetryAfterDate.map {
|
||||
return min($0.timeIntervalSinceNow, maxAverageBackoff)
|
||||
// If there's a Retry-After header, pick the largest one. That's when we
|
||||
// expect we'll be able to complete the entire send successfully.
|
||||
if let retryAfterDelay = error.httpResponseHeaders?.retryAfterTimeInterval {
|
||||
suggestedRetryDelay = max(suggestedRetryDelay, retryAfterDelay)
|
||||
}
|
||||
// We pick the larger of the two values -- we don't want Retry-After
|
||||
// headers to be able to trigger tight retry loops on the client, so we
|
||||
// maintain a minimum of exponential backoff.
|
||||
var retryDelay = exponentialRetryDelay
|
||||
var httpBlurb = ""
|
||||
if let suggestedRetryDelay {
|
||||
retryDelay = max(retryDelay, suggestedRetryDelay)
|
||||
httpBlurb = " (retry-after: \(String(format: "%.1f", suggestedRetryDelay))s)"
|
||||
}
|
||||
Logger.warn("Resending \(operation.message.description) after \(String(format: "%.1f", retryDelay))s\(httpBlurb)")
|
||||
try? await withCooperativeTimeout(
|
||||
seconds: retryDelay,
|
||||
operation: { try await operation.waitForAnyExternalRetryTrigger(fromExternalRetryTriggers: externalRetryTriggers) }
|
||||
)
|
||||
}
|
||||
guard let retryableError else {
|
||||
throw arbitraryError
|
||||
}
|
||||
guard attemptCount < maxRetries else {
|
||||
throw retryableError
|
||||
}
|
||||
attemptCount += 1
|
||||
if !operation.job.isInMemoryOnly {
|
||||
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
||||
operation.job.record.addFailure(tx: tx)
|
||||
}
|
||||
}
|
||||
// Determine the minimum amount of backoff.
|
||||
let maxAverageBackoff: TimeInterval = 14.1 * .minute
|
||||
let exponentialRetryDelay: TimeInterval = OWSOperation.retryIntervalForExponentialBackoff(
|
||||
failureCount: attemptCount,
|
||||
maxAverageBackoff: maxAverageBackoff,
|
||||
)
|
||||
// We pick the larger of the two values -- we don't want Retry-After
|
||||
// headers to be able to trigger tight retry loops on the client, so we
|
||||
// maintain a minimum of exponential backoff.
|
||||
let retryDelay = max(exponentialRetryDelay, min(maxAverageBackoff, suggestedRetryDelay))
|
||||
var httpBlurb = ""
|
||||
if suggestedRetryDelay > 0 {
|
||||
httpBlurb = " (retry-after: \(String(format: "%.1f", suggestedRetryDelay))s)"
|
||||
}
|
||||
Logger.warn("Resending \(operation.message.description) after \(String(format: "%.1f", retryDelay))s\(httpBlurb)")
|
||||
try? await withCooperativeTimeout(
|
||||
seconds: retryDelay,
|
||||
operation: { try await operation.waitForAnyExternalRetryTrigger(fromExternalRetryTriggers: externalRetryTriggers) }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -356,17 +356,44 @@ public class MessageSender {
|
||||
|
||||
// MARK: - Constructing Message Sends
|
||||
|
||||
public func sendMessage(_ preparedOutgoingMessage: PreparedOutgoingMessage) async throws {
|
||||
do {
|
||||
Logger.info("Sending \(preparedOutgoingMessage)")
|
||||
try await _sendMessage(preparedOutgoingMessage)
|
||||
} catch {
|
||||
Logger.warn("Couldn't send \(preparedOutgoingMessage); there may also be individual send failures, but the overall failure is: \(error)")
|
||||
throw error
|
||||
}
|
||||
enum SendResult {
|
||||
case success
|
||||
|
||||
/// Something happened before[^1] we branched based on ServiceIds, so the
|
||||
/// same Error applies to the entire attempt to send the message.
|
||||
///
|
||||
/// [^1]: If we try to send to a group and every group member is
|
||||
/// unregistered, this is treated as an overall failure. There is an
|
||||
/// argument that this shouldn't be an error at all or should be
|
||||
/// per-recipient "recipients don't exist" errors.
|
||||
case overallFailure(any Error)
|
||||
|
||||
/// We reached a point where we may have a different error for every
|
||||
/// recipient. It will often be the case that many recipients encounter the
|
||||
/// "same" error. (For example, we may use the multi-recipient endpoint and
|
||||
/// then copy the same Error object for every recipient, but we also may fan
|
||||
/// out to individual recipients, and they all may encounter their own
|
||||
/// equivalent network failure error.)
|
||||
case recipientsFailure(SendMessageFailure)
|
||||
}
|
||||
|
||||
private func _sendMessage(_ preparedOutgoingMessage: PreparedOutgoingMessage) async throws {
|
||||
func sendMessage(_ preparedOutgoingMessage: PreparedOutgoingMessage) async -> SendResult {
|
||||
let sendFailure: SendMessageFailure?
|
||||
do {
|
||||
Logger.info("Sending \(preparedOutgoingMessage)")
|
||||
sendFailure = try await _sendMessage(preparedOutgoingMessage)
|
||||
} catch {
|
||||
Logger.warn("Couldn't send \(preparedOutgoingMessage); the overall failure is: \(error)")
|
||||
return .overallFailure(error)
|
||||
}
|
||||
if let sendFailure {
|
||||
Logger.warn("Couldn't send \(preparedOutgoingMessage); up to 3 per-recipient failures: \(sendFailure.recipientErrors.prefix(3))")
|
||||
return .recipientsFailure(sendFailure)
|
||||
}
|
||||
return .success
|
||||
}
|
||||
|
||||
private func _sendMessage(_ preparedOutgoingMessage: PreparedOutgoingMessage) async throws -> SendMessageFailure? {
|
||||
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
||||
preparedOutgoingMessage.updateAllUnsentRecipientsAsSending(tx: tx)
|
||||
}
|
||||
@ -383,7 +410,7 @@ public class MessageSender {
|
||||
try await taskGroup.waitForAll()
|
||||
}
|
||||
|
||||
try await preparedOutgoingMessage.send(self.sendPreparedMessage(_:))
|
||||
return try await preparedOutgoingMessage.send(self.sendPreparedMessage(_:))
|
||||
}
|
||||
|
||||
private func waitForPreKeyRotationIfNeeded() async throws {
|
||||
@ -565,7 +592,7 @@ public class MessageSender {
|
||||
}
|
||||
}
|
||||
|
||||
private func sendPreparedMessage(_ message: TSOutgoingMessage) async throws {
|
||||
private func sendPreparedMessage(_ message: TSOutgoingMessage) async throws -> SendMessageFailure? {
|
||||
if !areAttachmentsUploadedWithSneakyTransaction(for: message) {
|
||||
throw OWSUnretryableMessageSenderError()
|
||||
}
|
||||
@ -587,24 +614,31 @@ public class MessageSender {
|
||||
if DebugFlags.messageSendsFail.get() {
|
||||
throw OWSUnretryableMessageSenderError()
|
||||
}
|
||||
do {
|
||||
try await waitForPreKeyRotationIfNeeded()
|
||||
let senderCertificates = try await SSKEnvironment.shared.udManagerRef.fetchSenderCertificates(certificateExpirationPolicy: .permissive)
|
||||
try await sendPreparedMessage(
|
||||
try await waitForPreKeyRotationIfNeeded()
|
||||
let udManager = SSKEnvironment.shared.udManagerRef
|
||||
let senderCertificates = try await udManager.fetchSenderCertificates(certificateExpirationPolicy: .permissive)
|
||||
// Send the message.
|
||||
let sendResult = await Result(catching: {
|
||||
return try await sendPreparedMessage(
|
||||
message,
|
||||
recoveryState: OuterRecoveryState(),
|
||||
senderCertificates: senderCertificates
|
||||
)
|
||||
} catch {
|
||||
if message.wasSentToAnyRecipient {
|
||||
// Always ignore the sync error...
|
||||
try? await handleMessageSentLocally(message)
|
||||
}
|
||||
// ...so that we can throw the original error for the caller. (Note that we
|
||||
// throw this error even if the sync message is sent successfully.)
|
||||
throw error
|
||||
})
|
||||
// Send the sync message if it succeeded overall or for any recipient.
|
||||
let syncResult: Result<Void, any Error>?
|
||||
if sendResult.isSuccess || message.wasSentToAnyRecipient {
|
||||
syncResult = await Result(catching: { try await handleMessageSentLocally(message) })
|
||||
} else {
|
||||
syncResult = nil
|
||||
}
|
||||
try await handleMessageSentLocally(message)
|
||||
// If we encountered an error when sending, return that.
|
||||
if let sendFailure = try sendResult.get() {
|
||||
return sendFailure
|
||||
}
|
||||
// Otherwise, if only the sync message failed, return that.
|
||||
try syncResult?.get()
|
||||
return nil
|
||||
}
|
||||
|
||||
private enum SendMessageNextAction {
|
||||
@ -653,7 +687,7 @@ public class MessageSender {
|
||||
_ message: TSOutgoingMessage,
|
||||
recoveryState: OuterRecoveryState,
|
||||
senderCertificates: SenderCertificates
|
||||
) async throws {
|
||||
) async throws -> SendMessageFailure? {
|
||||
let nextAction = try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx -> SendMessageNextAction? in
|
||||
guard let thread = message.thread(tx: tx) else {
|
||||
throw MessageSenderError.threadMissing
|
||||
@ -805,7 +839,7 @@ public class MessageSender {
|
||||
|
||||
switch nextAction {
|
||||
case .none:
|
||||
return
|
||||
return nil
|
||||
case .lookUpPhoneNumbersAndTryAgain(let phoneNumbers):
|
||||
try await lookUpPhoneNumbers(phoneNumbers)
|
||||
retryRecoveryState = recoveryState.mutated({ $0.canLookUpPhoneNumbers = false })
|
||||
@ -836,26 +870,34 @@ public class MessageSender {
|
||||
endorsements: state.endorsements,
|
||||
localIdentifiers: state.localIdentifiers
|
||||
)
|
||||
let recipientErrors = MessageSenderRecipientErrors(recipientErrors: perRecipientErrors)
|
||||
if recipientErrors.containsAny(of: .invalidAuthHeader) {
|
||||
retryRecoveryState = recoveryState.mutated({ $0.canUseMultiRecipientSealedSender = false })
|
||||
break
|
||||
let sendMessageFailure: SendMessageFailure?
|
||||
if perRecipientErrors.isEmpty {
|
||||
sendMessageFailure = nil
|
||||
} else {
|
||||
sendMessageFailure = try await handleSendFailure(
|
||||
message: message,
|
||||
thread: state.thread,
|
||||
perRecipientErrors: perRecipientErrors,
|
||||
)
|
||||
}
|
||||
if recoveryState.canHandleMultiRecipientMismatchedDevices, recipientErrors.containsAny(of: .deviceUpdate) {
|
||||
retryRecoveryState = recoveryState.mutated({ $0.canHandleMultiRecipientMismatchedDevices = false })
|
||||
break
|
||||
if let sendMessageFailure {
|
||||
if sendMessageFailure.containsAny(of: .invalidAuthHeader) {
|
||||
retryRecoveryState = recoveryState.mutated({ $0.canUseMultiRecipientSealedSender = false })
|
||||
break
|
||||
}
|
||||
if recoveryState.canHandleMultiRecipientMismatchedDevices, sendMessageFailure.containsAny(of: .deviceUpdate) {
|
||||
retryRecoveryState = recoveryState.mutated({ $0.canHandleMultiRecipientMismatchedDevices = false })
|
||||
break
|
||||
}
|
||||
if recoveryState.canHandleMultiRecipientStaleDevices, sendMessageFailure.containsAny(of: .staleDevices) {
|
||||
retryRecoveryState = recoveryState.mutated({ $0.canHandleMultiRecipientStaleDevices = false })
|
||||
break
|
||||
}
|
||||
}
|
||||
if recoveryState.canHandleMultiRecipientStaleDevices, recipientErrors.containsAny(of: .staleDevices) {
|
||||
retryRecoveryState = recoveryState.mutated({ $0.canHandleMultiRecipientStaleDevices = false })
|
||||
break
|
||||
}
|
||||
if !perRecipientErrors.isEmpty {
|
||||
try await handleSendFailure(message: message, thread: state.thread, perRecipientErrors: perRecipientErrors)
|
||||
}
|
||||
return
|
||||
return sendMessageFailure
|
||||
}
|
||||
|
||||
try await sendPreparedMessage(
|
||||
return try await sendPreparedMessage(
|
||||
message,
|
||||
recoveryState: retryRecoveryState,
|
||||
senderCertificates: senderCertificates
|
||||
@ -966,7 +1008,7 @@ public class MessageSender {
|
||||
message: TSOutgoingMessage,
|
||||
thread: TSThread,
|
||||
perRecipientErrors allErrors: [(serviceId: ServiceId, error: any Error)]
|
||||
) async throws {
|
||||
) async throws -> SendMessageFailure? {
|
||||
var skippedRecipients = [ServiceId]()
|
||||
var filteredErrors = [(serviceId: ServiceId, error: any Error)]()
|
||||
|
||||
@ -993,29 +1035,14 @@ public class MessageSender {
|
||||
|
||||
// If we only received errors that we should ignore, consider this send a
|
||||
// success, unless the message could not be sent to any recipient.
|
||||
guard let anyError = filteredErrors.first?.error else {
|
||||
guard let sendMessageFailure = SendMessageFailure(recipientErrors: filteredErrors) else {
|
||||
if message.sentRecipientAddresses().count == 0 {
|
||||
throw MessageSenderErrorNoValidRecipients()
|
||||
}
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// Some errors should never be retried, in order to avoid hitting rate
|
||||
// limits, for example. Unfortunately, since group send retry is
|
||||
// all-or-nothing, we need to fail immediately even if some of the other
|
||||
// recipients had retryable errors.
|
||||
if let fatalError = filteredErrors.map({ $0.error }).first(where: { $0.isFatalError }) {
|
||||
throw fatalError
|
||||
}
|
||||
|
||||
// If any of the send errors are retryable, we want to retry. Therefore,
|
||||
// prefer to propagate a retryable error.
|
||||
if let retryableError = filteredErrors.map({ $0.error }).first(where: { Self.isRetryableError($0) }) {
|
||||
throw retryableError
|
||||
}
|
||||
|
||||
// Otherwise, if we have any error at all, propagate it.
|
||||
throw anyError
|
||||
return sendMessageFailure
|
||||
}
|
||||
|
||||
static func isRetryableError(_ error: any Error) -> Bool {
|
||||
|
||||
@ -1,17 +0,0 @@
|
||||
//
|
||||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
//
|
||||
|
||||
import Foundation
|
||||
import LibSignalClient
|
||||
|
||||
struct MessageSenderRecipientErrors {
|
||||
var recipientErrors: [(serviceId: ServiceId, error: any Error)]
|
||||
|
||||
func containsAny(of senderKeyErrors: MessageSender.SenderKeyError...) -> Bool {
|
||||
return recipientErrors.contains(where: { _, recipientError in
|
||||
return senderKeyErrors.contains(where: { $0 == (recipientError as? MessageSender.SenderKeyError) })
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -242,8 +242,8 @@ public class PreparedOutgoingMessage {
|
||||
|
||||
// MARK: - Sending
|
||||
|
||||
public func send(_ sender: (TSOutgoingMessage) async throws -> Void) async throws {
|
||||
try await sender(messageForSending)
|
||||
public func send<T>(_ sender: (TSOutgoingMessage) async throws -> T) async throws -> T {
|
||||
return try await sender(messageForSending)
|
||||
}
|
||||
|
||||
public func attachmentUploadOperations(tx: DBReadTransaction) -> [() async throws -> Void] {
|
||||
|
||||
28
SignalServiceKit/Messages/SendMessageFailure.swift
Normal file
28
SignalServiceKit/Messages/SendMessageFailure.swift
Normal file
@ -0,0 +1,28 @@
|
||||
//
|
||||
// Copyright 2025 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
//
|
||||
|
||||
import Foundation
|
||||
import LibSignalClient
|
||||
|
||||
struct SendMessageFailure {
|
||||
let recipientErrors: [(serviceId: ServiceId, error: any Error)]
|
||||
|
||||
init?(recipientErrors: [(ServiceId, any Error)]) {
|
||||
if recipientErrors.isEmpty {
|
||||
return nil
|
||||
}
|
||||
self.recipientErrors = recipientErrors
|
||||
}
|
||||
|
||||
var arbitraryError: any Error {
|
||||
return self.recipientErrors.first!.error
|
||||
}
|
||||
|
||||
func containsAny(of senderKeyError: MessageSender.SenderKeyError) -> Bool {
|
||||
return recipientErrors.contains(where: { _, recipientError in
|
||||
return senderKeyError == (recipientError as? MessageSender.SenderKeyError)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -16,12 +16,19 @@ class FakeMessageSender: MessageSender {
|
||||
super.init(accountChecker: accountChecker, groupSendEndorsementStore: GroupSendEndorsementStoreImpl())
|
||||
}
|
||||
|
||||
override func sendMessage(_ preparedMessage: PreparedOutgoingMessage) async throws {
|
||||
try await preparedMessage.send { message in
|
||||
sentMessages.append(message)
|
||||
sendMessageWasCalledBlock?(message)
|
||||
override func sendMessage(_ preparedMessage: PreparedOutgoingMessage) async -> MessageSender.SendResult {
|
||||
do {
|
||||
try await preparedMessage.send { message in
|
||||
sentMessages.append(message)
|
||||
sendMessageWasCalledBlock?(message)
|
||||
}
|
||||
} catch {
|
||||
return .overallFailure(error)
|
||||
}
|
||||
if let stubbedFailingError = stubbedFailingErrors.removeFirst() { throw stubbedFailingError }
|
||||
if let stubbedFailingError = stubbedFailingErrors.removeFirst() {
|
||||
return .overallFailure(stubbedFailingError)
|
||||
}
|
||||
return .success
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user