Adopt sendMultiRecipientMessage API
This commit is contained in:
parent
6ee12915ff
commit
fee9e5bd0e
@ -359,6 +359,10 @@ public class RemoteConfig {
|
||||
))
|
||||
}
|
||||
|
||||
public var shouldUseDynamicSendMessageTimeout: Bool {
|
||||
return !isEnabled(.dynamicSendMessageTimeoutKillSwitch)
|
||||
}
|
||||
|
||||
// MARK: - RingRTC
|
||||
|
||||
public var ringrtcNwPathMonitorTrial: Bool {
|
||||
@ -595,6 +599,7 @@ private enum IsEnabledFlag: String, FlagType {
|
||||
case cardGiftDonationKillSwitch = "ios.cardGiftDonationKillSwitch"
|
||||
case cardMonthlyDonationKillSwitch = "ios.cardMonthlyDonationKillSwitch"
|
||||
case cardOneTimeDonationKillSwitch = "ios.cardOneTimeDonationKillSwitch"
|
||||
case dynamicSendMessageTimeoutKillSwitch = "ios.dynamicSendMessageTimeoutKillSwitch"
|
||||
case enableAutoAPNSRotation = "ios.enableAutoAPNSRotation"
|
||||
case enableGifSearch = "global.gifSearch"
|
||||
case groupTerminateReceiveKillSwitch = "ios.groupTerminateReceiveKillSwitch"
|
||||
@ -622,6 +627,7 @@ private enum IsEnabledFlag: String, FlagType {
|
||||
case .cardGiftDonationKillSwitch: false
|
||||
case .cardMonthlyDonationKillSwitch: false
|
||||
case .cardOneTimeDonationKillSwitch: false
|
||||
case .dynamicSendMessageTimeoutKillSwitch: true
|
||||
case .enableAutoAPNSRotation: false
|
||||
case .enableGifSearch: false
|
||||
case .groupTerminateReceiveKillSwitch: true
|
||||
|
||||
@ -4,6 +4,7 @@
|
||||
//
|
||||
|
||||
import Foundation
|
||||
import LibSignalClient
|
||||
|
||||
/// Durably enqueues outgoing messages.
|
||||
///
|
||||
@ -546,6 +547,9 @@ public class MessageSenderJobQueue {
|
||||
if let retryAfterDelay = error.httpResponseHeaders?.retryAfterTimeInterval {
|
||||
suggestedRetryDelay = max(suggestedRetryDelay, retryAfterDelay)
|
||||
}
|
||||
if case SignalError.rateLimitedError(retryAfter: let retryAfter, message: _) = error {
|
||||
suggestedRetryDelay = max(suggestedRetryDelay, retryAfter)
|
||||
}
|
||||
// If there's a Retry-After from the AccountChecker, we want to wait for
|
||||
// the sum of the Retry-Afters. (This avoids pathological O(n^2) behavior.)
|
||||
if let rateLimitError = error as? AccountChecker.RateLimitError {
|
||||
|
||||
@ -21,8 +21,7 @@ extension MessageSender {
|
||||
|
||||
enum SenderKeyError: Error, IsRetryableProvider, UserErrorDescriptionProvider {
|
||||
case invalidAuthHeader
|
||||
case deviceUpdate
|
||||
case staleDevices
|
||||
case mismatchedDevices
|
||||
|
||||
var isRetryableProvider: Bool { true }
|
||||
|
||||
@ -72,7 +71,7 @@ extension MessageSender {
|
||||
|
||||
let threadRecipients = thread.recipientAddresses(with: tx).compactMap(\.serviceId)
|
||||
|
||||
let authBuilder: (_ readyRecipients: [ServiceId]) -> TSRequest.SealedSenderAuth
|
||||
let authBuilder: (_ readyRecipients: [ServiceId]) -> MultiRecipientSendAuth
|
||||
if message.isStorySend {
|
||||
authBuilder = { _ in return .story }
|
||||
// Importantly, endorsements may be nonnil in this case, and the individual
|
||||
@ -92,7 +91,7 @@ extension MessageSender {
|
||||
// individual endorsement, so we can safely force-unwrap here.
|
||||
combined = combined.byRemoving(endorsements.individual[serviceId]!)
|
||||
}
|
||||
return .endorsement(GroupSendFullTokenBuilder(
|
||||
return .groupSend(GroupSendFullTokenBuilder(
|
||||
secretParams: endorsements.secretParams,
|
||||
expiration: endorsements.expiration,
|
||||
endorsement: combined,
|
||||
@ -221,7 +220,7 @@ extension MessageSender {
|
||||
in thread: TSThread,
|
||||
message: any SendableMessage,
|
||||
serializedMessage: SerializedMessage,
|
||||
authBuilder: (_ readyRecipients: [ServiceId]) -> TSRequest.SealedSenderAuth,
|
||||
authBuilder: (_ readyRecipients: [ServiceId]) -> MultiRecipientSendAuth,
|
||||
senderCertificate: SenderCertificate,
|
||||
localIdentifiers: LocalIdentifiers,
|
||||
) async -> [(ServiceId, any Error)] {
|
||||
@ -277,7 +276,7 @@ extension MessageSender {
|
||||
to recipients: [Recipient],
|
||||
message: any SendableMessage,
|
||||
payloadId: Int64?,
|
||||
authBuilder: () -> TSRequest.SealedSenderAuth,
|
||||
authBuilder: () -> MultiRecipientSendAuth,
|
||||
localIdentifiers: LocalIdentifiers,
|
||||
) async -> [(ServiceId, any Error)] {
|
||||
let sendResult: SenderKeySendResult
|
||||
@ -477,109 +476,79 @@ extension MessageSender {
|
||||
to recipients: [Recipient],
|
||||
message: any SendableMessage,
|
||||
ciphertextResult: Result<Data, any Error>,
|
||||
authBuilder: () -> TSRequest.SealedSenderAuth,
|
||||
authBuilder: () -> MultiRecipientSendAuth,
|
||||
) async throws -> SenderKeySendResult {
|
||||
Logger.info("Sending sender key message with timestamp \(message.timestamp) to \(recipients.map(\.serviceId).sorted())")
|
||||
let ciphertext = try ciphertextResult.get()
|
||||
let auth = authBuilder()
|
||||
let result = try await Retry.performRepeatedly(
|
||||
block: {
|
||||
return try await self._sendSenderKeyRequest(
|
||||
encryptedMessageBody: ciphertext,
|
||||
timestamp: message.timestamp,
|
||||
isOnline: message.isOnline,
|
||||
isUrgent: message.isUrgent,
|
||||
recipients: recipients,
|
||||
auth: auth,
|
||||
)
|
||||
},
|
||||
onError: { error, attemptCount in
|
||||
if attemptCount <= 1, (error as? OWSHTTPError)?.httpStatusCode == 428 {
|
||||
// Retry immediately if we submitted a push challenge.
|
||||
} else {
|
||||
throw error
|
||||
}
|
||||
},
|
||||
let result = try await self._sendSenderKeyRequest(
|
||||
payload: ciphertext,
|
||||
timestamp: message.timestamp,
|
||||
isOnline: message.isOnline,
|
||||
isUrgent: message.isUrgent,
|
||||
recipients: recipients,
|
||||
auth: auth,
|
||||
)
|
||||
Logger.info("Sent sender key message with timestamp \(message.timestamp) to \(result.successServiceIds.sorted()) (unregistered: \(result.unregisteredServiceIds.sorted()))")
|
||||
return result
|
||||
}
|
||||
|
||||
private func _sendSenderKeyRequest(
|
||||
encryptedMessageBody: Data,
|
||||
payload: Data,
|
||||
timestamp: UInt64,
|
||||
isOnline: Bool,
|
||||
isUrgent: Bool,
|
||||
recipients: [Recipient],
|
||||
auth: TSRequest.SealedSenderAuth,
|
||||
auth: MultiRecipientSendAuth,
|
||||
) async throws -> SenderKeySendResult {
|
||||
let chatConnectionManager = DependenciesBridge.shared.chatConnectionManager
|
||||
let remoteConfigProvider = SSKEnvironment.shared.remoteConfigManagerRef
|
||||
do {
|
||||
let httpResponse = try await self.performSenderKeySend(
|
||||
ciphertext: encryptedMessageBody,
|
||||
timestamp: timestamp,
|
||||
isOnline: isOnline,
|
||||
isUrgent: isUrgent,
|
||||
auth: auth,
|
||||
)
|
||||
|
||||
guard httpResponse.responseStatusCode == 200 else { throw
|
||||
OWSAssertionError("Unhandled error")
|
||||
let remoteConfig = remoteConfigProvider.currentConfig()
|
||||
let timeout = OWSRequestFactory.sendMessageTimeout(estimatedRequestSize: payload.count + 200)
|
||||
let response = try await chatConnectionManager.withUnauthService(
|
||||
.messages,
|
||||
timeout: remoteConfig.shouldUseDynamicSendMessageTimeout ? timeout : .infinity,
|
||||
) {
|
||||
return try await $0.sendMultiRecipientMessage(
|
||||
payload,
|
||||
timestamp: timestamp,
|
||||
auth: auth,
|
||||
onlineOnly: isOnline,
|
||||
urgent: isUrgent,
|
||||
)
|
||||
}
|
||||
|
||||
let response = try Self.decodeSuccessResponse(data: httpResponse.responseBodyData ?? Data())
|
||||
let unregisteredServiceIds = Set(response.unregisteredServiceIds.map { $0.wrappedValue })
|
||||
let unregisteredServiceIds = Set(response.unregisteredIds)
|
||||
let successful = recipients.filter { !unregisteredServiceIds.contains($0.serviceId) }
|
||||
let unregistered = recipients.filter { unregisteredServiceIds.contains($0.serviceId) }
|
||||
return SenderKeySendResult(success: successful, unregistered: unregistered)
|
||||
} catch {
|
||||
if let httpError = error as? OWSHTTPError {
|
||||
let statusCode = httpError.httpStatusCode ?? 0
|
||||
let responseData = httpError.httpResponseData
|
||||
switch statusCode {
|
||||
case 401:
|
||||
Logger.warn("Invalid composite authorization header for sender key send request. Falling back to fanout")
|
||||
throw SenderKeyError.invalidAuthHeader
|
||||
case 409:
|
||||
} catch SignalError.requestUnauthorized(_) {
|
||||
Logger.warn("Invalid composite authorization header for sender key send request. Falling back to fanout")
|
||||
throw SenderKeyError.invalidAuthHeader
|
||||
} catch SignalError.mismatchedDevices(entries: let entries, message: _) {
|
||||
let databaseStorage = SSKEnvironment.shared.databaseStorageRef
|
||||
await databaseStorage.awaitableWrite { tx in
|
||||
for entry in entries {
|
||||
// Incorrect device set. We should add/remove devices and try again.
|
||||
let responseBody = try Self.decode409Response(data: responseData ?? Data())
|
||||
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
||||
for account in responseBody {
|
||||
handleMismatchedDevices(
|
||||
serviceId: account.serviceId,
|
||||
missingDevices: account.devices.missingDevices,
|
||||
extraDevices: account.devices.extraDevices,
|
||||
tx: tx,
|
||||
)
|
||||
}
|
||||
if !entry.missingDevices.isEmpty || !entry.extraDevices.isEmpty {
|
||||
handleMismatchedDevices(
|
||||
serviceId: entry.account,
|
||||
missingDevices: entry.missingDevices.compactMap(DeviceId.init(validating:)),
|
||||
extraDevices: entry.extraDevices.compactMap(DeviceId.init(validating:)),
|
||||
tx: tx,
|
||||
)
|
||||
}
|
||||
throw SenderKeyError.deviceUpdate
|
||||
case 410:
|
||||
// Server reports stale devices. We should reset our session and try again.
|
||||
let responseBody = try Self.decode410Response(data: responseData ?? Data())
|
||||
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
||||
for account in responseBody {
|
||||
handleStaleDevices(serviceId: account.serviceId, staleDevices: account.devices.staleDevices, tx: tx)
|
||||
}
|
||||
if !entry.staleDevices.isEmpty {
|
||||
handleStaleDevices(
|
||||
serviceId: entry.account,
|
||||
staleDevices: entry.staleDevices.compactMap(DeviceId.init(validating:)),
|
||||
tx: tx,
|
||||
)
|
||||
}
|
||||
throw SenderKeyError.staleDevices
|
||||
case 428:
|
||||
guard let body = responseData, let expiry = error.httpRetryAfterDate else {
|
||||
throw OWSAssertionError("Invalid spam response body")
|
||||
}
|
||||
try await withCheckedThrowingContinuation { continuation in
|
||||
SSKEnvironment.shared.spamChallengeResolverRef.handleServerChallengeBody(body, retryAfter: expiry) { didSucceed in
|
||||
if didSucceed {
|
||||
continuation.resume()
|
||||
} else {
|
||||
continuation.resume(throwing: SpamChallengeRequiredError())
|
||||
}
|
||||
}
|
||||
}
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
throw error
|
||||
throw SenderKeyError.mismatchedDevices
|
||||
}
|
||||
}
|
||||
|
||||
@ -634,67 +603,7 @@ extension MessageSender {
|
||||
return ciphertext
|
||||
}
|
||||
|
||||
private func performSenderKeySend(
|
||||
ciphertext: Data,
|
||||
timestamp: UInt64,
|
||||
isOnline: Bool,
|
||||
isUrgent: Bool,
|
||||
auth: TSRequest.SealedSenderAuth,
|
||||
) async throws -> HTTPResponse {
|
||||
let request = OWSRequestFactory.submitMultiRecipientMessageRequest(
|
||||
ciphertext: ciphertext,
|
||||
timestamp: timestamp,
|
||||
isOnline: isOnline,
|
||||
isUrgent: isUrgent,
|
||||
auth: auth,
|
||||
)
|
||||
return try await SSKEnvironment.shared.networkManagerRef.asyncRequest(request)
|
||||
}
|
||||
|
||||
private static func isValidRegistrationId(_ registrationId: UInt32) -> Bool {
|
||||
return (registrationId & RegistrationIdGenerator.Constants.maximumRegistrationId) == registrationId
|
||||
}
|
||||
}
|
||||
|
||||
private extension MessageSender {
|
||||
|
||||
struct SuccessPayload: Decodable {
|
||||
let unregisteredServiceIds: [ServiceIdString]
|
||||
|
||||
enum CodingKeys: String, CodingKey {
|
||||
case unregisteredServiceIds = "uuids404"
|
||||
}
|
||||
}
|
||||
|
||||
struct AccountMismatchedDevices: Decodable {
|
||||
@ServiceIdString var serviceId: ServiceId
|
||||
let devices: MismatchedDevices
|
||||
|
||||
enum CodingKeys: String, CodingKey {
|
||||
case serviceId = "uuid"
|
||||
case devices
|
||||
}
|
||||
}
|
||||
|
||||
struct AccountStaleDevices: Decodable {
|
||||
@ServiceIdString var serviceId: ServiceId
|
||||
let devices: StaleDevices
|
||||
|
||||
enum CodingKeys: String, CodingKey {
|
||||
case serviceId = "uuid"
|
||||
case devices
|
||||
}
|
||||
}
|
||||
|
||||
static func decodeSuccessResponse(data: Data) throws -> SuccessPayload {
|
||||
return try JSONDecoder().decode(SuccessPayload.self, from: data)
|
||||
}
|
||||
|
||||
static func decode409Response(data: Data) throws -> [AccountMismatchedDevices] {
|
||||
return try JSONDecoder().decode([AccountMismatchedDevices].self, from: data)
|
||||
}
|
||||
|
||||
static func decode410Response(data: Data) throws -> [AccountStaleDevices] {
|
||||
return try JSONDecoder().decode([AccountStaleDevices].self, from: data)
|
||||
}
|
||||
}
|
||||
|
||||
@ -647,7 +647,6 @@ public class MessageSender {
|
||||
var canRefreshExpiringGroupSendEndorsements = true
|
||||
var canUseMultiRecipientSealedSender = true
|
||||
var canHandleMultiRecipientMismatchedDevices = true
|
||||
var canHandleMultiRecipientStaleDevices = true
|
||||
|
||||
func mutated(_ block: (inout Self) -> Void) -> Self {
|
||||
var mutableSelf = self
|
||||
@ -841,14 +840,10 @@ public class MessageSender {
|
||||
retryRecoveryState = recoveryState.mutated({ $0.canUseMultiRecipientSealedSender = false })
|
||||
break
|
||||
}
|
||||
if recoveryState.canHandleMultiRecipientMismatchedDevices, sendMessageFailure.containsAny(of: .deviceUpdate) {
|
||||
if recoveryState.canHandleMultiRecipientMismatchedDevices, sendMessageFailure.containsAny(of: .mismatchedDevices) {
|
||||
retryRecoveryState = recoveryState.mutated({ $0.canHandleMultiRecipientMismatchedDevices = false })
|
||||
break
|
||||
}
|
||||
if recoveryState.canHandleMultiRecipientStaleDevices, sendMessageFailure.containsAny(of: .staleDevices) {
|
||||
retryRecoveryState = recoveryState.mutated({ $0.canHandleMultiRecipientStaleDevices = false })
|
||||
break
|
||||
}
|
||||
}
|
||||
return SendMessageResult(isNoteToSelf: false, sendMessageFailure: sendMessageFailure)
|
||||
}
|
||||
@ -1042,7 +1037,19 @@ public class MessageSender {
|
||||
}
|
||||
|
||||
static func isRetryableError(_ error: any Error) -> Bool {
|
||||
return (error.isRetryable && error.httpStatusCode != 508) || error.httpStatusCode == 429 || error is AccountChecker.RateLimitError
|
||||
if error.isRetryable, error.httpStatusCode != 508 {
|
||||
return true
|
||||
}
|
||||
if error.httpStatusCode == 429 {
|
||||
return true
|
||||
}
|
||||
if case SignalError.rateLimitedError(retryAfter: _, message: _) = error {
|
||||
return true
|
||||
}
|
||||
if error is AccountChecker.RateLimitError {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
private func normalizeRecipientStatesIfNeeded(
|
||||
|
||||
@ -11,7 +11,6 @@ public enum OWSRequestFactory {
|
||||
static let textSecureAccountsAPI = "v1/accounts"
|
||||
static let textSecureAttributesAPI = "v1/accounts/attributes/"
|
||||
static let textSecureMessagesAPI = "v1/messages/"
|
||||
static let textSecureMultiRecipientMessageAPI = "v1/messages/multi_recipient"
|
||||
static let textSecureKeysAPI = "v2/keys"
|
||||
static let textSecureSignedKeysAPI = "v2/keys/signed"
|
||||
static let textSecureDirectoryAPI = "v1/directory"
|
||||
@ -133,34 +132,7 @@ public enum OWSRequestFactory {
|
||||
return request
|
||||
}
|
||||
|
||||
static func submitMultiRecipientMessageRequest(
|
||||
ciphertext: Data,
|
||||
timestamp: UInt64,
|
||||
isOnline: Bool,
|
||||
isUrgent: Bool,
|
||||
auth: TSRequest.SealedSenderAuth,
|
||||
) -> TSRequest {
|
||||
owsAssertDebug(timestamp > 0)
|
||||
|
||||
// We build the URL by hand instead of passing the query parameters into the query parameters
|
||||
// AFNetworking won't handle both query parameters and an httpBody (which we need here)
|
||||
var components = URLComponents(string: self.textSecureMultiRecipientMessageAPI)!
|
||||
components.queryItems = [
|
||||
URLQueryItem(name: "ts", value: "\(timestamp)"),
|
||||
URLQueryItem(name: "online", value: isOnline ? "true" : "false"),
|
||||
URLQueryItem(name: "urgent", value: isUrgent ? "true" : "false"),
|
||||
URLQueryItem(name: "story", value: auth.isStory ? "true" : "false"),
|
||||
]
|
||||
|
||||
var request = TSRequest(url: components.url!, method: "PUT", parameters: nil)
|
||||
request.timeoutInterval = sendMessageTimeout(estimatedRequestSize: ciphertext.count + 200)
|
||||
request.headers["Content-Type"] = "application/vnd.signal-messenger.mrm"
|
||||
request.auth = .sealedSender(auth)
|
||||
request.body = .data(ciphertext)
|
||||
return request
|
||||
}
|
||||
|
||||
private static func sendMessageTimeout(estimatedRequestSize: Int) -> TimeInterval {
|
||||
static func sendMessageTimeout(estimatedRequestSize: Int) -> TimeInterval {
|
||||
let bandwidthEstimate: Double = 40_000 // kbit/s
|
||||
let transferEstimate = Double(estimatedRequestSize) / (bandwidthEstimate / 8)
|
||||
let latencyEstimate: Double = Self.textSecureHTTPTimeOut
|
||||
|
||||
Loading…
Reference in New Issue
Block a user