Cancel ongoing Attachment Backfills from BackgroundMessageFetcher
This commit is contained in:
parent
889437cc6f
commit
33fc0ca0ab
@ -32,6 +32,8 @@ public class NotificationActionHandler {
|
||||
try await showThread(userInfo: userInfo)
|
||||
case .showMyStories:
|
||||
await showMyStories(appReadiness: appReadiness)
|
||||
case .showMessage:
|
||||
showMessage(userInfo: userInfo)
|
||||
case .showCallLobby:
|
||||
showCallLobby(userInfo: userInfo)
|
||||
case .submitDebugLogs:
|
||||
@ -204,6 +206,19 @@ public class NotificationActionHandler {
|
||||
SignalApp.shared.showMyStories(animated: UIApplication.shared.applicationState == .active)
|
||||
}
|
||||
|
||||
@MainActor
|
||||
private class func showMessage(userInfo: AppNotificationUserInfo) {
|
||||
guard let threadId = userInfo.threadId else {
|
||||
owsFailDebug("Missing threadId for showMessage action.")
|
||||
return
|
||||
}
|
||||
SignalApp.shared.presentConversationForThread(
|
||||
threadUniqueId: threadId,
|
||||
focusMessageId: userInfo.messageId,
|
||||
animated: UIApplication.shared.applicationState == .active,
|
||||
)
|
||||
}
|
||||
|
||||
@MainActor
|
||||
private class func showThread(uniqueId: String) {
|
||||
// If this happens when the app is not visible we skip the animation so the thread
|
||||
|
||||
@ -367,6 +367,15 @@
|
||||
/* Title for the attachment approval media quality sheet */
|
||||
"ATTACHMENT_APPROVAL_MEDIA_QUALITY_TITLE" = "Media Quality";
|
||||
|
||||
/* Notification body shown when media sync to a linked device is complete. */
|
||||
"ATTACHMENT_BACKFILL_FINISHED_NOTIFICATION" = "Finished syncing with Signal on your linked device.";
|
||||
|
||||
/* Notification body shown when media sync to a linked device was interrupted. */
|
||||
"ATTACHMENT_BACKFILL_INTERRUPTED_NOTIFICATION" = "Open Signal to continue syncing with your linked device";
|
||||
|
||||
/* Notification body shown while syncing media to a linked device. */
|
||||
"ATTACHMENT_BACKFILL_SYNCING_NOTIFICATION" = "Syncing with Signal on your linked device";
|
||||
|
||||
/* Generic filename for an attachment with no known name */
|
||||
"ATTACHMENT_DEFAULT_FILENAME" = "Attachment";
|
||||
|
||||
|
||||
@ -27,6 +27,7 @@ public class AttachmentBackfillManager {
|
||||
private let db: DB
|
||||
private let interactionStore: InteractionStore
|
||||
private let logger: PrefixedLogger
|
||||
private let notificationPresenter: NotificationPresenter
|
||||
private let recipientDatabaseTable: RecipientDatabaseTable
|
||||
private let syncMessageSender: AttachmentBackfillSyncMessageSender
|
||||
private let taskQueue: SerialTaskQueue
|
||||
@ -37,6 +38,7 @@ public class AttachmentBackfillManager {
|
||||
attachmentUploadManager: AttachmentUploadManager,
|
||||
db: DB,
|
||||
interactionStore: InteractionStore,
|
||||
notificationPresenter: NotificationPresenter,
|
||||
recipientDatabaseTable: RecipientDatabaseTable,
|
||||
syncMessageSender: AttachmentBackfillSyncMessageSender,
|
||||
threadStore: ThreadStore,
|
||||
@ -46,6 +48,7 @@ public class AttachmentBackfillManager {
|
||||
self.db = db
|
||||
self.interactionStore = interactionStore
|
||||
self.logger = PrefixedLogger(prefix: "[Backfill]")
|
||||
self.notificationPresenter = notificationPresenter
|
||||
self.recipientDatabaseTable = recipientDatabaseTable
|
||||
self.syncMessageSender = syncMessageSender
|
||||
self.taskQueue = SerialTaskQueue()
|
||||
@ -127,6 +130,34 @@ public class AttachmentBackfillManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Await the processing of any currently-enqueued inbound requests,
|
||||
/// cooperatively cancelling and waiting for teardown of said processing if
|
||||
/// cancelled.
|
||||
func awaitProcessingEnqueuedInboundRequests() async throws(CancellationError) {
|
||||
let flushTask = taskQueue.enqueue {
|
||||
// No-op: wait for the queue to flush.
|
||||
}
|
||||
let cancelledQueueFlushTask = AtomicValue<Task<Void, Error>?>(nil, lock: .init())
|
||||
|
||||
await withTaskCancellationHandler(
|
||||
operation: {
|
||||
try? await flushTask.value
|
||||
},
|
||||
onCancel: {
|
||||
cancelledQueueFlushTask.set(taskQueue.enqueueCancellingPrevious {
|
||||
// No-op: wait for the queue to flush.
|
||||
})
|
||||
},
|
||||
)
|
||||
|
||||
// If we were cancelled while waiting, as a best effort wait for the
|
||||
// cancelled tasks in the queue to complete.
|
||||
if let task = cancelledQueueFlushTask.get() {
|
||||
try? await task.value
|
||||
throw CancellationError()
|
||||
}
|
||||
}
|
||||
|
||||
/// Enqueues and kicks off an `AttachmentBackfillInboundRequestRecord` for
|
||||
/// the given inbound backfill request sync message.
|
||||
func enqueueInboundRequest(
|
||||
@ -192,12 +223,16 @@ public class AttachmentBackfillManager {
|
||||
requestRecordId: AttachmentBackfillInboundRequestRecord.IDType,
|
||||
localIdentifiers: LocalIdentifiers,
|
||||
) -> Task<Void, Error> {
|
||||
return taskQueue.enqueue { [self] () async -> Void in
|
||||
return taskQueue.enqueue { [self] in
|
||||
let requestRecord: AttachmentBackfillInboundRequestRecord?
|
||||
let backfillTarget: AttachmentBackfillTarget?
|
||||
let threadUniqueId: String?
|
||||
let messageUniqueId: String?
|
||||
(
|
||||
requestRecord,
|
||||
backfillTarget,
|
||||
threadUniqueId,
|
||||
messageUniqueId,
|
||||
) = db.read { tx in
|
||||
guard
|
||||
let record = failIfThrows(block: {
|
||||
@ -216,21 +251,47 @@ public class AttachmentBackfillManager {
|
||||
tx: tx,
|
||||
)
|
||||
else {
|
||||
return (nil, nil)
|
||||
return (nil, nil, nil, nil)
|
||||
}
|
||||
|
||||
return (record, backfillTarget)
|
||||
return (record, backfillTarget, message.uniqueThreadId, message.uniqueId)
|
||||
}
|
||||
|
||||
guard let requestRecord, let backfillTarget else {
|
||||
guard
|
||||
let requestRecord,
|
||||
let backfillTarget,
|
||||
let threadUniqueId,
|
||||
let messageUniqueId
|
||||
else {
|
||||
logger.warn("Missing request record or backfill target: no response will be sent.")
|
||||
return
|
||||
}
|
||||
|
||||
notificationPresenter.notifyUserOfAttachmentBackfill(
|
||||
threadUniqueId: threadUniqueId,
|
||||
messageUniqueId: messageUniqueId,
|
||||
body: OWSLocalizedString(
|
||||
"ATTACHMENT_BACKFILL_SYNCING_NOTIFICATION",
|
||||
comment: "Notification body shown while syncing media to a linked device.",
|
||||
),
|
||||
)
|
||||
|
||||
let backfillAttemptResults = await attemptBackfill(
|
||||
interactionId: requestRecord.interactionId,
|
||||
)
|
||||
|
||||
if Task.isCancelled {
|
||||
notificationPresenter.notifyUserOfAttachmentBackfill(
|
||||
threadUniqueId: threadUniqueId,
|
||||
messageUniqueId: messageUniqueId,
|
||||
body: OWSLocalizedString(
|
||||
"ATTACHMENT_BACKFILL_INTERRUPTED_NOTIFICATION",
|
||||
comment: "Notification body shown when media sync to a linked device was interrupted.",
|
||||
),
|
||||
)
|
||||
throw CancellationError()
|
||||
}
|
||||
|
||||
await db.awaitableWrite { tx in
|
||||
self.sendBackfillAttemptResponse(
|
||||
backfillTarget: backfillTarget,
|
||||
@ -249,6 +310,15 @@ public class AttachmentBackfillManager {
|
||||
db.touch(interaction: backfillTargetMessage, shouldReindex: false, tx: tx)
|
||||
}
|
||||
}
|
||||
|
||||
notificationPresenter.notifyUserOfAttachmentBackfill(
|
||||
threadUniqueId: threadUniqueId,
|
||||
messageUniqueId: messageUniqueId,
|
||||
body: OWSLocalizedString(
|
||||
"ATTACHMENT_BACKFILL_FINISHED_NOTIFICATION",
|
||||
comment: "Notification body shown when media sync to a linked device is complete.",
|
||||
),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -578,6 +578,7 @@ extension AppSetup.GlobalsContinuation {
|
||||
attachmentUploadManager: attachmentUploadManager,
|
||||
db: db,
|
||||
interactionStore: interactionStore,
|
||||
notificationPresenter: notificationPresenter,
|
||||
recipientDatabaseTable: recipientDatabaseTable,
|
||||
syncMessageSender: messageSenderJobQueue,
|
||||
threadStore: threadStore,
|
||||
@ -1632,6 +1633,7 @@ extension AppSetup.GlobalsContinuation {
|
||||
let messageFetcherJob = MessageFetcherJob()
|
||||
|
||||
let backgroundMessageFetcherFactory = BackgroundMessageFetcherFactory(
|
||||
attachmentBackfillManager: attachmentBackfillManager,
|
||||
chatConnectionManager: chatConnectionManager,
|
||||
groupMessageProcessorManager: groupMessageProcessorManager,
|
||||
messageProcessor: messageProcessor,
|
||||
|
||||
@ -6,6 +6,7 @@
|
||||
import Foundation
|
||||
|
||||
public struct BackgroundMessageFetcherFactory {
|
||||
private let attachmentBackfillManager: AttachmentBackfillManager
|
||||
private let chatConnectionManager: any ChatConnectionManager
|
||||
private let groupMessageProcessorManager: GroupMessageProcessorManager
|
||||
private let messageProcessor: MessageProcessor
|
||||
@ -14,6 +15,7 @@ public struct BackgroundMessageFetcherFactory {
|
||||
private let storageServiceManager: any StorageServiceManager
|
||||
|
||||
public init(
|
||||
attachmentBackfillManager: AttachmentBackfillManager,
|
||||
chatConnectionManager: any ChatConnectionManager,
|
||||
groupMessageProcessorManager: GroupMessageProcessorManager,
|
||||
messageProcessor: MessageProcessor,
|
||||
@ -21,6 +23,7 @@ public struct BackgroundMessageFetcherFactory {
|
||||
receiptSender: ReceiptSender,
|
||||
storageServiceManager: any StorageServiceManager,
|
||||
) {
|
||||
self.attachmentBackfillManager = attachmentBackfillManager
|
||||
self.chatConnectionManager = chatConnectionManager
|
||||
self.groupMessageProcessorManager = groupMessageProcessorManager
|
||||
self.messageProcessor = messageProcessor
|
||||
@ -31,6 +34,7 @@ public struct BackgroundMessageFetcherFactory {
|
||||
|
||||
public func buildFetcher() -> BackgroundMessageFetcher {
|
||||
return BackgroundMessageFetcher(
|
||||
attachmentBackfillManager: self.attachmentBackfillManager,
|
||||
chatConnectionManager: self.chatConnectionManager,
|
||||
groupMessageProcessorManager: self.groupMessageProcessorManager,
|
||||
messageProcessor: self.messageProcessor,
|
||||
@ -42,6 +46,7 @@ public struct BackgroundMessageFetcherFactory {
|
||||
}
|
||||
|
||||
public actor BackgroundMessageFetcher {
|
||||
private let attachmentBackfillManager: AttachmentBackfillManager
|
||||
private let chatConnectionManager: any ChatConnectionManager
|
||||
private let groupMessageProcessorManager: GroupMessageProcessorManager
|
||||
private let messageProcessor: MessageProcessor
|
||||
@ -50,6 +55,7 @@ public actor BackgroundMessageFetcher {
|
||||
private let storageServiceManager: any StorageServiceManager
|
||||
|
||||
fileprivate init(
|
||||
attachmentBackfillManager: AttachmentBackfillManager,
|
||||
chatConnectionManager: any ChatConnectionManager,
|
||||
groupMessageProcessorManager: GroupMessageProcessorManager,
|
||||
messageProcessor: MessageProcessor,
|
||||
@ -57,6 +63,7 @@ public actor BackgroundMessageFetcher {
|
||||
receiptSender: ReceiptSender,
|
||||
storageServiceManager: any StorageServiceManager,
|
||||
) {
|
||||
self.attachmentBackfillManager = attachmentBackfillManager
|
||||
self.chatConnectionManager = chatConnectionManager
|
||||
self.groupMessageProcessorManager = groupMessageProcessorManager
|
||||
self.messageProcessor = messageProcessor
|
||||
@ -121,11 +128,14 @@ public actor BackgroundMessageFetcher {
|
||||
async let pendingOps: Void = MessageReceiver.waitForPendingTasks()
|
||||
// Wait until Storage Service has settled.
|
||||
async let pendingStorageService: Void = self.storageServiceManager.waitForSteadyState()
|
||||
// Wait until attachment backfills are done.
|
||||
async let pendingAttachmentBackfills: Void = self.attachmentBackfillManager.awaitProcessingEnqueuedInboundRequests()
|
||||
|
||||
try await pendingReceipts
|
||||
try await pendingMessages
|
||||
try await pendingOps
|
||||
try await pendingStorageService
|
||||
try await pendingAttachmentBackfills
|
||||
}
|
||||
|
||||
// Finally, wait for any notifications to finish posting
|
||||
|
||||
@ -120,6 +120,14 @@ public class NoopNotificationPresenterImpl: NotificationPresenter {
|
||||
Logger.warn("")
|
||||
}
|
||||
|
||||
public func notifyUserOfAttachmentBackfill(
|
||||
threadUniqueId: String,
|
||||
messageUniqueId: String,
|
||||
body: String,
|
||||
) {
|
||||
Logger.warn("")
|
||||
}
|
||||
|
||||
public func notifyUserOfMediaTierQuotaConsumed() {
|
||||
Logger.warn("")
|
||||
}
|
||||
|
||||
@ -58,6 +58,12 @@ public protocol NotificationPresenter {
|
||||
|
||||
func scheduleNotifyForBackupsEnabled(backupsTimestamp: Date)
|
||||
|
||||
func notifyUserOfAttachmentBackfill(
|
||||
threadUniqueId: String,
|
||||
messageUniqueId: String,
|
||||
body: String,
|
||||
)
|
||||
|
||||
func notifyUserOfMediaTierQuotaConsumed()
|
||||
|
||||
func notifyUserOfBackupsMediaError()
|
||||
|
||||
@ -46,6 +46,7 @@ public enum AppNotificationDefaultAction: String {
|
||||
case showChatList
|
||||
case showLinkedDevices
|
||||
case showBackupsSettings
|
||||
case showMessage
|
||||
}
|
||||
|
||||
public struct AppNotificationUserInfo {
|
||||
@ -167,6 +168,7 @@ public enum AppNotificationCategory: String, CaseIterable {
|
||||
case listMediaIntegrityCheckFailure = "Signal.AppNotificationCategory.listMediaIntegrityCheckFailure"
|
||||
case pollEndNotification = "Signal.AppNotificationCategory.pollEndNotification"
|
||||
case pollVoteNotification = "Signal.AppNotificationCategory.pollVoteNotification"
|
||||
case attachmentBackfill = "Signal.AppNotificationCategory.attachmentBackfill"
|
||||
|
||||
var shouldClearOnAppActivate: Bool {
|
||||
switch self {
|
||||
@ -186,7 +188,8 @@ public enum AppNotificationCategory: String, CaseIterable {
|
||||
.transferRelaunch,
|
||||
.deregistration,
|
||||
.pollEndNotification,
|
||||
.pollVoteNotification:
|
||||
.pollVoteNotification,
|
||||
.attachmentBackfill:
|
||||
return true
|
||||
case
|
||||
.newDeviceLinked,
|
||||
@ -241,6 +244,8 @@ public enum AppNotificationCategory: String, CaseIterable {
|
||||
return []
|
||||
case .pollVoteNotification:
|
||||
return []
|
||||
case .attachmentBackfill:
|
||||
return []
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1210,6 +1215,28 @@ public class NotificationPresenterImpl: NotificationPresenter {
|
||||
}
|
||||
}
|
||||
|
||||
public func notifyUserOfAttachmentBackfill(
|
||||
threadUniqueId: String,
|
||||
messageUniqueId: String,
|
||||
body: String,
|
||||
) {
|
||||
var userInfo = AppNotificationUserInfo()
|
||||
userInfo.threadId = threadUniqueId
|
||||
userInfo.messageId = messageUniqueId
|
||||
userInfo.defaultAction = .showMessage
|
||||
enqueueNotificationAction {
|
||||
await self.notifyViaPresenter(
|
||||
category: .attachmentBackfill,
|
||||
title: nil,
|
||||
body: body,
|
||||
threadIdentifier: nil,
|
||||
userInfo: userInfo,
|
||||
soundQuery: .none,
|
||||
replacingIdentifier: "attachmentBackfill-\(messageUniqueId)",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
public func notifyUserOfMediaTierQuotaConsumed() {
|
||||
var userInfo = AppNotificationUserInfo()
|
||||
userInfo.defaultAction = .showBackupsSettings
|
||||
|
||||
@ -225,7 +225,8 @@ public class UserNotificationPresenter {
|
||||
.deregistration,
|
||||
.newDeviceLinked,
|
||||
.backupsEnabled,
|
||||
.backupsMediaTierQuotaConsumed:
|
||||
.backupsMediaTierQuotaConsumed,
|
||||
.attachmentBackfill:
|
||||
// Always show these notifications
|
||||
return true
|
||||
|
||||
|
||||
@ -16,6 +16,7 @@ class AttachmentBackfillManagerTest: SSKBaseTest {
|
||||
private var interactionStore: InteractionStoreImpl!
|
||||
private var mockSyncMessageSender: MockSyncMessageSender!
|
||||
private var mockUploadManager: MockAttachmentUploadManager!
|
||||
private var mockNotificationPresenter: NoopNotificationPresenterImpl!
|
||||
private var recipientDatabaseTable: RecipientDatabaseTable!
|
||||
private var threadStore: ThreadStoreImpl!
|
||||
|
||||
@ -34,6 +35,7 @@ class AttachmentBackfillManagerTest: SSKBaseTest {
|
||||
interactionStore = InteractionStoreImpl()
|
||||
mockSyncMessageSender = MockSyncMessageSender()
|
||||
mockUploadManager = MockAttachmentUploadManager()
|
||||
mockNotificationPresenter = NoopNotificationPresenterImpl()
|
||||
recipientDatabaseTable = RecipientDatabaseTable()
|
||||
threadStore = ThreadStoreImpl()
|
||||
|
||||
@ -42,6 +44,7 @@ class AttachmentBackfillManagerTest: SSKBaseTest {
|
||||
attachmentUploadManager: mockUploadManager,
|
||||
db: db,
|
||||
interactionStore: interactionStore,
|
||||
notificationPresenter: mockNotificationPresenter,
|
||||
recipientDatabaseTable: recipientDatabaseTable,
|
||||
syncMessageSender: mockSyncMessageSender,
|
||||
threadStore: threadStore,
|
||||
@ -233,6 +236,56 @@ class AttachmentBackfillManagerTest: SSKBaseTest {
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - awaitProcessingEnqueuedInboundRequests
|
||||
|
||||
func testAwaitEnqueued_cooperativelyCancels() async throws {
|
||||
let (requestRecord, message) = insertMessageAndRequestRecord(thread: otherAciThread)
|
||||
let messageRowId = message.sqliteRowId!
|
||||
let threadRowId = otherAciThread.sqliteRowId!
|
||||
|
||||
_ = insertAttachmentWithReference(messageRowId: messageRowId, threadRowId: threadRowId, orderInMessage: 0)
|
||||
|
||||
// Use a continuation to keep track, in the test, of when the upload
|
||||
// attempt starts.
|
||||
let (uploadStream, uploadContinuation) = AsyncStream.makeStream(of: Void.self)
|
||||
|
||||
mockUploadManager.uploadBlock = { _ in
|
||||
// Signal that the upload has started, then suspend indefinitely.
|
||||
// The suspension will be interrupted when the upload is cancelled.
|
||||
uploadContinuation.yield()
|
||||
try await Task.sleep(nanoseconds: .max)
|
||||
}
|
||||
|
||||
// Kick off processing.
|
||||
let processingTask = manager.processInboundRequest(
|
||||
requestRecordId: requestRecord.id,
|
||||
localIdentifiers: localIdentifiers,
|
||||
)
|
||||
|
||||
// Wait for the upload to start.
|
||||
var uploadStreamIterator = uploadStream.makeAsyncIterator()
|
||||
await uploadStreamIterator.next()
|
||||
|
||||
// Now, await and immediately cancel processing. This should cause us to
|
||||
// cancel all in-flight processing.
|
||||
let awaitTask = Task {
|
||||
try await manager.awaitProcessingEnqueuedInboundRequests()
|
||||
}
|
||||
awaitTask.cancel()
|
||||
|
||||
for task in [processingTask, awaitTask] {
|
||||
do {
|
||||
try await task.value
|
||||
XCTFail("Should have thrown!")
|
||||
} catch is CancellationError {
|
||||
// Excellent.
|
||||
}
|
||||
}
|
||||
|
||||
// The processing was cancelled, so no response should have been sent.
|
||||
XCTAssertTrue(mockSyncMessageSender.sentResponses.isEmpty)
|
||||
}
|
||||
|
||||
// MARK: -
|
||||
|
||||
private func buildBackfillRequestProto(
|
||||
|
||||
@ -474,6 +474,7 @@ private enum CrashyMocks {
|
||||
func cancelNotifications(reactionId: String) { failTest(Self.self) }
|
||||
func cancelNotificationsForMissedCalls(threadUniqueId: String) { failTest(Self.self) }
|
||||
func cancelNotifications(for storyMessage: StoryMessage) { failTest(Self.self) }
|
||||
func notifyUserOfAttachmentBackfill(threadUniqueId: String, messageUniqueId: String, body: String) { failTest(Self.self) }
|
||||
func notifyUserOfMediaTierQuotaConsumed() { failTest(Self.self) }
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user