diff --git a/SignalServiceKit/Messages/MessageFetcherJob.swift b/SignalServiceKit/Messages/MessageFetcherJob.swift index 0a6c8e46ce..c91832ae8f 100644 --- a/SignalServiceKit/Messages/MessageFetcherJob.swift +++ b/SignalServiceKit/Messages/MessageFetcherJob.swift @@ -26,7 +26,6 @@ public class MessageFetcherJob { owsPrecondition(CurrentAppContext().shouldProcessIncomingMessages) owsPrecondition(CurrentAppContext().isNSE) owsPrecondition(self.appReadiness.isAppReady) - owsPrecondition(!self.shouldUseWebSocket) owsAssertDebug(DependenciesBridge.shared.tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered) await self.startGroupMessageProcessorsIfNeeded() @@ -37,25 +36,17 @@ public class MessageFetcherJob { await SSKEnvironment.shared.groupMessageProcessorManagerRef.startAllProcessors() } - private var shouldUseWebSocket: Bool { - return OWSChatConnection.canAppUseSocketsToMakeRequests - } - public var hasCompletedInitialFetch: Bool { - if shouldUseWebSocket { - return ( - DependenciesBridge.shared.chatConnectionManager.identifiedConnectionState == .open && - DependenciesBridge.shared.chatConnectionManager.hasEmptiedInitialQueue - ) - } else { - return self.didFinishFetchingViaREST.get() + get async { + let chatConnectionManager = DependenciesBridge.shared.chatConnectionManager + return await chatConnectionManager.hasEmptiedInitialQueue || self.didFinishFetchingViaREST.get() } } func preconditionForFetchingComplete() -> some Precondition { return NotificationPrecondition( - notificationName: shouldUseWebSocket ? OWSChatConnection.chatConnectionStateDidChange : Self.didChangeStateNotificationName, - isSatisfied: { self.hasCompletedInitialFetch } + notificationNames: [OWSChatConnection.chatConnectionStateDidChange, Self.didChangeStateNotificationName], + isSatisfied: { await self.hasCompletedInitialFetch } ) } diff --git a/SignalServiceKit/Messages/OWSMessageDecrypter.swift b/SignalServiceKit/Messages/OWSMessageDecrypter.swift index 6f9782516d..19077162d2 100644 --- a/SignalServiceKit/Messages/OWSMessageDecrypter.swift +++ b/SignalServiceKit/Messages/OWSMessageDecrypter.swift @@ -7,7 +7,8 @@ import LibSignalClient public class OWSMessageDecrypter { - private var senderIdsResetDuringCurrentBatch = NSMutableSet() + private let senderIdsResetDuringCurrentBatch = AtomicValue>(Set(), lock: .init()) + private var placeholderCleanupTimer: Timer? { didSet { oldValue?.invalidate() } } @@ -31,15 +32,16 @@ public class OWSMessageDecrypter { @objc func messageProcessorDidDrainQueue() { - // We don't want to send additional resets until we - // have received the "empty" response from the WebSocket - // or finished at least one REST fetch. - guard SSKEnvironment.shared.messageFetcherJobRef.hasCompletedInitialFetch else { return } + Task { + // We don't want to send additional resets until we have received the + // "empty" response from the WebSocket or finished at least one REST fetch. + guard await SSKEnvironment.shared.messageFetcherJobRef.hasCompletedInitialFetch else { return } - // We clear all recently reset sender ids any time the - // decryption queue has drained, so that any new messages - // that fail to decrypt will reset the session again. - senderIdsResetDuringCurrentBatch.removeAllObjects() + // We clear all recently reset sender ids any time the decryption queue has + // drained, so that any new messages that fail to decrypt will reset the + // session again. + senderIdsResetDuringCurrentBatch.update { $0.removeAll() } + } } private func trySendNullMessage( @@ -354,9 +356,7 @@ public class OWSMessageDecrypter { // resets. When the message decrypt queue is drained, the list of recently // reset IDs is cleared. let senderId = "\(sourceAci).\(sourceDeviceId)" - if !senderIdsResetDuringCurrentBatch.contains(senderId) { - senderIdsResetDuringCurrentBatch.add(senderId) - + if senderIdsResetDuringCurrentBatch.update(block: { $0.insert(senderId).inserted }) { // We don't reset sessions for messages sent to our PNI because those are // receive-only & we don't send retries FROM our PNI back to the sender. diff --git a/SignalServiceKit/Network/ChatConnectionManager.swift b/SignalServiceKit/Network/ChatConnectionManager.swift index 6aa761916f..dd142a0964 100644 --- a/SignalServiceKit/Network/ChatConnectionManager.swift +++ b/SignalServiceKit/Network/ChatConnectionManager.swift @@ -17,7 +17,7 @@ public protocol ChatConnectionManager { /// all connection tokens are released). func waitUntilIdentifiedConnectionShouldBeClosed() async throws var identifiedConnectionState: OWSChatConnectionState { get } - var hasEmptiedInitialQueue: Bool { get } + var hasEmptiedInitialQueue: Bool { get async } func shouldWaitForSocketToMakeRequest(connectionType: OWSChatConnectionType) -> Bool func requestConnections(shouldReconnectIfConnectedElsewhere: Bool) -> [OWSChatConnection.ConnectionToken] @@ -95,7 +95,9 @@ public class ChatConnectionManagerImpl: ChatConnectionManager { } public var hasEmptiedInitialQueue: Bool { - connectionIdentified.hasEmptiedInitialQueue + get async { + return await connectionIdentified.hasEmptiedInitialQueue + } } } diff --git a/SignalServiceKit/Network/OWSChatConnection.swift b/SignalServiceKit/Network/OWSChatConnection.swift index 6e37499213..444f32f928 100644 --- a/SignalServiceKit/Network/OWSChatConnection.swift +++ b/SignalServiceKit/Network/OWSChatConnection.swift @@ -63,9 +63,10 @@ public class OWSChatConnection { return .closed } - // This var must be thread-safe. public var hasEmptiedInitialQueue: Bool { - false + get async { + return false + } } fileprivate var logPrefix: String { @@ -884,9 +885,15 @@ internal class OWSUnauthConnectionUsingLibSignal: OWSChatConnectionUsingLibSigna } internal class OWSAuthConnectionUsingLibSignal: OWSChatConnectionUsingLibSignal, ChatConnectionListener { - private let _hasEmptiedInitialQueue = AtomicBool(false, lock: .sharedGlobal) + private var _hasEmptiedInitialQueue = false override var hasEmptiedInitialQueue: Bool { - _hasEmptiedInitialQueue.get() + get async { + return await withCheckedContinuation { continuation in + serialQueue.async { + continuation.resume(returning: self._hasEmptiedInitialQueue) + } + } + } } private var _keepaliveSenderTask: Task? @@ -931,13 +938,8 @@ internal class OWSAuthConnectionUsingLibSignal: OWSChatConnectionUsingLibSignal< } keepaliveSenderTask = makeKeepaliveTask(service) case .closed: - // While _hasEmptiedInitialQueue is atomic, that's not sufficient to guarantee the - // *order* of writes. We do that by making sure we only set it on the serial queue, - // and then make sure libsignal's serialized callbacks result in scheduling on the - // serial queue. keepaliveSenderTask = nil - _hasEmptiedInitialQueue.set(false) - Logger.debug("Reset _hasEmptiedInitialQueue") + _hasEmptiedInitialQueue = false } } } @@ -1036,8 +1038,8 @@ internal class OWSAuthConnectionUsingLibSignal: OWSChatConnectionUsingLibSignal< // We have since disconnected from the chat service instance that reported the empty queue. return } - let alreadyEmptied = self._hasEmptiedInitialQueue.swap(true) - Logger.debug("Initial queue emptied") + let alreadyEmptied = self._hasEmptiedInitialQueue + self._hasEmptiedInitialQueue = true if !alreadyEmptied { // This notification is used to wake up anything waiting for hasEmptiedInitialQueue. diff --git a/SignalServiceKit/Preconditions/NotificationPrecondition.swift b/SignalServiceKit/Preconditions/NotificationPrecondition.swift index 87e02261fa..281ee12ce9 100644 --- a/SignalServiceKit/Preconditions/NotificationPrecondition.swift +++ b/SignalServiceKit/Preconditions/NotificationPrecondition.swift @@ -8,25 +8,35 @@ import Foundation /// Waits until `isSatisfied()` returns true. Checks the initial result and /// then re-checks the result each time notificationName fires. public struct NotificationPrecondition: Precondition, Sendable { - private let notificationName: Notification.Name - private let isSatisfied: @Sendable () -> Bool + private let notificationNames: [Notification.Name] + private let isSatisfied: @Sendable () async -> Bool - public init(notificationName: Notification.Name, isSatisfied: @escaping @Sendable () -> Bool) { - self.notificationName = notificationName + public init(notificationName: Notification.Name, isSatisfied: @escaping @Sendable () async -> Bool) { + self.init(notificationNames: [notificationName], isSatisfied: isSatisfied) + } + + public init(notificationNames: [Notification.Name], isSatisfied: @escaping @Sendable () async -> Bool) { + self.notificationNames = notificationNames self.isSatisfied = isSatisfied } public func waitUntilSatisfied() async -> WaitResult { let result = CancellableContinuation() - let observer = NotificationCenter.default.addObserver(forName: notificationName, object: nil, queue: nil, using: { _ in - if self.isSatisfied() { - return result.resume(with: .success(())) - } - }) - defer { - NotificationCenter.default.removeObserver(observer) + let observers = self.notificationNames.map { + return NotificationCenter.default.addObserver(forName: $0, object: nil, queue: nil, using: { _ in + Task { + if await self.isSatisfied() { + result.resume(with: .success(())) + } + } + }) } - if isSatisfied() { + defer { + for observer in observers { + NotificationCenter.default.removeObserver(observer) + } + } + if await isSatisfied() { return .satisfiedImmediately } do {