707 lines
26 KiB
Swift
707 lines
26 KiB
Swift
//
|
|
// Copyright 2026 Signal Messenger, LLC
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
//
|
|
|
|
import LibSignalClient
|
|
|
|
public class AttachmentBackfillManager {
|
|
|
|
/// Wrapper around `MessageSenderJobQueue`, for tests.
|
|
protocol AttachmentBackfillSyncMessageSender {
|
|
func add(
|
|
attachmentBackfillRequestSyncMessage: AttachmentBackfillRequestSyncMessage,
|
|
tx: DBWriteTransaction,
|
|
)
|
|
|
|
func add(
|
|
attachmentBackfillResponseSyncMessage: AttachmentBackfillResponseSyncMessage,
|
|
tx: DBWriteTransaction,
|
|
)
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
private let attachmentStore: AttachmentStore
|
|
private let attachmentUploadManager: AttachmentUploadManager
|
|
private let db: DB
|
|
private let interactionStore: InteractionStore
|
|
private let logger: PrefixedLogger
|
|
private let notificationPresenter: NotificationPresenter
|
|
private let recipientDatabaseTable: RecipientDatabaseTable
|
|
private let syncMessageSender: AttachmentBackfillSyncMessageSender
|
|
private let taskQueue: SerialTaskQueue
|
|
private let threadStore: ThreadStore
|
|
|
|
init(
|
|
attachmentStore: AttachmentStore,
|
|
attachmentUploadManager: AttachmentUploadManager,
|
|
db: DB,
|
|
interactionStore: InteractionStore,
|
|
notificationPresenter: NotificationPresenter,
|
|
recipientDatabaseTable: RecipientDatabaseTable,
|
|
syncMessageSender: AttachmentBackfillSyncMessageSender,
|
|
threadStore: ThreadStore,
|
|
) {
|
|
self.attachmentStore = attachmentStore
|
|
self.attachmentUploadManager = attachmentUploadManager
|
|
self.db = db
|
|
self.interactionStore = interactionStore
|
|
self.logger = PrefixedLogger(prefix: "[Backfill]")
|
|
self.notificationPresenter = notificationPresenter
|
|
self.recipientDatabaseTable = recipientDatabaseTable
|
|
self.syncMessageSender = syncMessageSender
|
|
self.taskQueue = SerialTaskQueue()
|
|
self.threadStore = threadStore
|
|
}
|
|
|
|
// MARK: - Outbound Requests
|
|
|
|
public func sendOutboundRequest(
|
|
message: TSMessage,
|
|
localIdentifiers: LocalIdentifiers,
|
|
tx: DBWriteTransaction,
|
|
) {
|
|
guard
|
|
let backfillTarget = assembleBackfillTarget(
|
|
message: message,
|
|
localIdentifiers: localIdentifiers,
|
|
tx: tx,
|
|
)
|
|
else {
|
|
logger.warn("Failed to assemble backfill target for outbound request.")
|
|
return
|
|
}
|
|
|
|
guard let localThread = threadStore.getOrCreateLocalThread(tx: tx) else {
|
|
owsFailDebug("Failed to get local thread.", logger: logger)
|
|
return
|
|
}
|
|
|
|
let requestProtoBuilder = SSKProtoSyncMessageAttachmentBackfillRequest.builder()
|
|
requestProtoBuilder.setTargetMessage(backfillTarget.addressableMessage.asProto)
|
|
requestProtoBuilder.setTargetConversation(backfillTarget.conversationIdentifier.asProto)
|
|
|
|
do {
|
|
let syncMessage = try AttachmentBackfillRequestSyncMessage(
|
|
requestProto: requestProtoBuilder.buildInfallibly(),
|
|
localThread: localThread,
|
|
tx: tx,
|
|
)
|
|
|
|
syncMessageSender.add(
|
|
attachmentBackfillRequestSyncMessage: syncMessage,
|
|
tx: tx,
|
|
)
|
|
} catch {
|
|
owsFailDebug("Failed to build backfill request sync message! \(error)")
|
|
return
|
|
}
|
|
}
|
|
|
|
// MARK: - Inbound Requests
|
|
|
|
/// Returns whether we have an enqueued `AttachmentBackfillInboundRequestRecord`
|
|
/// for the given interaction ID.
|
|
public func hasEnqueuedInboundRequest(interactionRowId: Int64, tx: DBReadTransaction) -> Bool {
|
|
return AttachmentBackfillInboundRequestRecord.fetchRecord(
|
|
interactionId: interactionRowId,
|
|
tx: tx,
|
|
) != nil
|
|
}
|
|
|
|
/// Serially processes already-enqueued `AttachmentBackfillInboundRequestRecord`s,
|
|
/// for example from interrupted previous launches.
|
|
/// - SeeAlso `processInboundRequest(requestRecord:localIdentifiers:)`
|
|
public func processEnqueuedInboundRequests(registeredState: RegisteredState) {
|
|
guard registeredState.isPrimary else {
|
|
return
|
|
}
|
|
|
|
let enqueuedRequestRecords: [AttachmentBackfillInboundRequestRecord] = db.read { tx in
|
|
AttachmentBackfillInboundRequestRecord.fetchAllAscending(tx: tx)
|
|
}
|
|
|
|
for enqueuedRequestRecord in enqueuedRequestRecords {
|
|
_ = processInboundRequest(
|
|
requestRecordId: enqueuedRequestRecord.id,
|
|
localIdentifiers: registeredState.localIdentifiers,
|
|
)
|
|
}
|
|
}
|
|
|
|
/// Await the processing of any currently-enqueued inbound requests,
|
|
/// cooperatively cancelling and waiting for teardown of said processing if
|
|
/// cancelled.
|
|
func awaitProcessingEnqueuedInboundRequests() async throws(CancellationError) {
|
|
let flushTask = taskQueue.enqueue {
|
|
// No-op: wait for the queue to flush.
|
|
}
|
|
let cancelledQueueFlushTask = AtomicValue<Task<Void, Error>?>(nil, lock: .init())
|
|
|
|
await withTaskCancellationHandler(
|
|
operation: {
|
|
try? await flushTask.value
|
|
},
|
|
onCancel: {
|
|
cancelledQueueFlushTask.set(taskQueue.enqueueCancellingPrevious {
|
|
// No-op: wait for the queue to flush.
|
|
})
|
|
},
|
|
)
|
|
|
|
// If we were cancelled while waiting, as a best effort wait for the
|
|
// cancelled tasks in the queue to complete.
|
|
if let task = cancelledQueueFlushTask.get() {
|
|
try? await task.value
|
|
throw CancellationError()
|
|
}
|
|
}
|
|
|
|
/// Enqueues and kicks off an `AttachmentBackfillInboundRequestRecord` for
|
|
/// the given inbound backfill request sync message.
|
|
func enqueueInboundRequest(
|
|
attachmentBackfillRequestProto: SSKProtoSyncMessageAttachmentBackfillRequest,
|
|
registeredState: RegisteredState,
|
|
tx: DBWriteTransaction,
|
|
) {
|
|
guard registeredState.isPrimary else {
|
|
logger.warn("Dropping backfill request: not registered primary.")
|
|
return
|
|
}
|
|
|
|
guard
|
|
let backfillTarget: AttachmentBackfillTarget = parseBackfillTarget(
|
|
attachmentBackfillRequestProto: attachmentBackfillRequestProto,
|
|
tx: tx,
|
|
)
|
|
else {
|
|
logger.warn("Missing or invalid backfill target!")
|
|
return
|
|
}
|
|
|
|
guard
|
|
let backfillTargetMessage: TSMessage = locateBackfillTargetMessage(
|
|
backfillTarget,
|
|
tx: tx,
|
|
)
|
|
else {
|
|
logger.info("Missing message for backfill target.")
|
|
sendTargetNotFoundResponse(
|
|
backfillTarget: backfillTarget,
|
|
localIdentifiers: registeredState.localIdentifiers,
|
|
tx: tx,
|
|
)
|
|
return
|
|
}
|
|
|
|
let backfillRecord = AttachmentBackfillInboundRequestRecord.fetchOrInsertRecord(
|
|
interactionId: backfillTargetMessage.sqliteRowId!,
|
|
tx: tx,
|
|
)
|
|
|
|
let logger = logger.suffixed(inboundRequestRecordId: backfillRecord.id)
|
|
logger.info("Enqueued inbound request.")
|
|
|
|
// Touch the target message so we reload it in the ConversationView, to
|
|
// display that it's being backfilled.
|
|
db.touch(interaction: backfillTargetMessage, shouldReindex: false, tx: tx)
|
|
|
|
tx.addSyncCompletion { [self] in
|
|
_ = processInboundRequest(
|
|
requestRecordId: backfillRecord.id,
|
|
localIdentifiers: registeredState.localIdentifiers,
|
|
)
|
|
}
|
|
}
|
|
|
|
/// Process the given `AttachmentBackfillInboundRequestRecord`, by uploading
|
|
/// its body attachments and enqueuing a response sync message.
|
|
func processInboundRequest(
|
|
requestRecordId: AttachmentBackfillInboundRequestRecord.IDType,
|
|
localIdentifiers: LocalIdentifiers,
|
|
) -> Task<Void, Error> {
|
|
return taskQueue.enqueue { [self] in
|
|
let logger = logger.suffixed(inboundRequestRecordId: requestRecordId)
|
|
|
|
let requestRecord: AttachmentBackfillInboundRequestRecord?
|
|
let backfillTarget: AttachmentBackfillTarget?
|
|
let threadUniqueId: String?
|
|
let messageUniqueId: String?
|
|
(
|
|
requestRecord,
|
|
backfillTarget,
|
|
threadUniqueId,
|
|
messageUniqueId,
|
|
) = await db.awaitableWrite { tx in
|
|
guard
|
|
let record = failIfThrows(block: {
|
|
try AttachmentBackfillInboundRequestRecord.fetchOne(
|
|
tx.database,
|
|
key: requestRecordId,
|
|
)
|
|
})
|
|
else {
|
|
logger.warn("Missing request record: no response will be sent.")
|
|
return (nil, nil, nil, nil)
|
|
}
|
|
|
|
guard
|
|
let message = interactionStore.fetchInteraction(
|
|
rowId: record.interactionId,
|
|
tx: tx,
|
|
) as? TSMessage,
|
|
let backfillTarget = assembleBackfillTarget(
|
|
message: message,
|
|
localIdentifiers: localIdentifiers,
|
|
tx: tx,
|
|
)
|
|
else {
|
|
logger.warn("Missing backfill target: no response will be sent.")
|
|
failIfThrows {
|
|
try record.delete(tx.database)
|
|
}
|
|
return (nil, nil, nil, nil)
|
|
}
|
|
|
|
return (record, backfillTarget, message.uniqueThreadId, message.uniqueId)
|
|
}
|
|
|
|
guard
|
|
let requestRecord,
|
|
let backfillTarget,
|
|
let threadUniqueId,
|
|
let messageUniqueId
|
|
else {
|
|
return
|
|
}
|
|
|
|
notificationPresenter.notifyUserOfAttachmentBackfill(
|
|
threadUniqueId: threadUniqueId,
|
|
messageUniqueId: messageUniqueId,
|
|
body: OWSLocalizedString(
|
|
"ATTACHMENT_BACKFILL_SYNCING_NOTIFICATION",
|
|
comment: "Notification body shown while syncing media to a linked device.",
|
|
),
|
|
)
|
|
|
|
let backfillAttemptResults = await attemptBackfill(
|
|
interactionId: requestRecord.interactionId,
|
|
)
|
|
|
|
if Task.isCancelled {
|
|
logger.warn("Backfill attempt cancelled.")
|
|
|
|
notificationPresenter.notifyUserOfAttachmentBackfill(
|
|
threadUniqueId: threadUniqueId,
|
|
messageUniqueId: messageUniqueId,
|
|
body: OWSLocalizedString(
|
|
"ATTACHMENT_BACKFILL_INTERRUPTED_NOTIFICATION",
|
|
comment: "Notification body shown when media sync to a linked device was interrupted.",
|
|
),
|
|
)
|
|
throw CancellationError()
|
|
}
|
|
|
|
await db.awaitableWrite { tx in
|
|
let backfillAttemptDescription = backfillAttemptResults
|
|
.map { result in
|
|
switch result {
|
|
case .success: "success"
|
|
case .failure(let error) where error.isRetryable: "retry"
|
|
case .failure: "failure"
|
|
}
|
|
}
|
|
.joined(separator: ";")
|
|
logger.info("Sending backfill response: \(backfillAttemptDescription)")
|
|
|
|
self.sendBackfillAttemptResponse(
|
|
backfillTarget: backfillTarget,
|
|
backfillAttemptResults: backfillAttemptResults,
|
|
localIdentifiers: localIdentifiers,
|
|
tx: tx,
|
|
)
|
|
|
|
failIfThrows {
|
|
try requestRecord.delete(tx.database)
|
|
}
|
|
|
|
// Touch the target message so we reload it in the ConversationView,
|
|
// to display that we're done with the backfill.
|
|
if let backfillTargetMessage = locateBackfillTargetMessage(backfillTarget, tx: tx) {
|
|
db.touch(interaction: backfillTargetMessage, shouldReindex: false, tx: tx)
|
|
}
|
|
}
|
|
|
|
notificationPresenter.notifyUserOfAttachmentBackfill(
|
|
threadUniqueId: threadUniqueId,
|
|
messageUniqueId: messageUniqueId,
|
|
body: OWSLocalizedString(
|
|
"ATTACHMENT_BACKFILL_FINISHED_NOTIFICATION",
|
|
comment: "Notification body shown when media sync to a linked device is complete.",
|
|
),
|
|
)
|
|
}
|
|
}
|
|
|
|
private func attemptBackfill(
|
|
interactionId: Int64,
|
|
) async -> [Result<SSKProtoAttachmentPointer, Error>] {
|
|
let backfillableReferencedAttachments: [ReferencedAttachment] = db.read { tx in
|
|
let stickerReferencedAttachments = attachmentStore.fetchReferencedAttachments(
|
|
for: .messageSticker(messageRowId: interactionId),
|
|
tx: tx,
|
|
)
|
|
|
|
if !stickerReferencedAttachments.isEmpty {
|
|
return stickerReferencedAttachments
|
|
}
|
|
|
|
return attachmentStore.fetchReferencedAttachments(
|
|
for: .messageBodyAttachment(messageRowId: interactionId),
|
|
tx: tx,
|
|
)
|
|
}
|
|
|
|
if backfillableReferencedAttachments.isEmpty {
|
|
logger.warn("No attachments for backfill target.")
|
|
return []
|
|
}
|
|
|
|
return await withTaskGroup(
|
|
of: (Int, Result<SSKProtoAttachmentPointer, Error>).self,
|
|
returning: [Result<SSKProtoAttachmentPointer, Error>].self,
|
|
) { taskGroup in
|
|
for (index, referencedAttachment) in backfillableReferencedAttachments.enumerated() {
|
|
taskGroup.addTask { [self] in
|
|
let result = await uploadAttachmentForBackfill(referencedAttachment: referencedAttachment)
|
|
return (index, result)
|
|
}
|
|
}
|
|
|
|
var indexedResults = [(Int, Result<SSKProtoAttachmentPointer, Error>)]()
|
|
for await indexedResult in taskGroup {
|
|
indexedResults.append(indexedResult)
|
|
}
|
|
|
|
return indexedResults
|
|
.sorted(by: { $0.0 < $1.0 })
|
|
.map(\.1)
|
|
}
|
|
}
|
|
|
|
private struct BackedUpAttachmentMissingLocalFileError: Error {}
|
|
|
|
private func uploadAttachmentForBackfill(
|
|
referencedAttachment: ReferencedAttachment,
|
|
) async -> Result<SSKProtoAttachmentPointer, Error> {
|
|
let logger = logger.suffixed(with: "[\(referencedAttachment.attachment.id)]")
|
|
|
|
if
|
|
referencedAttachment.attachment.asStream() == nil,
|
|
referencedAttachment.attachment.mediaTierInfo != nil
|
|
{
|
|
// We don't have the file locally, but we do have media-tier CDN
|
|
// info (implying we may be able to retrieve the file from our
|
|
// Backup). AttachmentBackfill doesn't support sending media-tier
|
|
// CDN pointers so we can't actually do a backfill, but we don't
|
|
// want to return a terminal error since that'll prevent our linked
|
|
// device from ever trying again in the future.
|
|
logger.warn("Missing local file, but media-tier info present.")
|
|
return .failure(BackedUpAttachmentMissingLocalFileError())
|
|
}
|
|
|
|
do {
|
|
try await Retry.performWithBackoff(maxAttempts: 4) { [attachmentUploadManager] in
|
|
// This method will short circuit and return without doing an
|
|
// actual upload if the attachment has recent, reusable transit-
|
|
// tier info.
|
|
try await attachmentUploadManager.uploadTransitTierAttachment(
|
|
attachmentId: referencedAttachment.attachment.id,
|
|
progress: nil,
|
|
)
|
|
}
|
|
|
|
guard
|
|
// Refetch the attachment to get updated transit tier info.
|
|
let refetchedAttachment = db.read(block: { tx in
|
|
return attachmentStore.fetch(
|
|
id: referencedAttachment.attachment.id,
|
|
tx: tx,
|
|
)
|
|
}),
|
|
let attachmentProto = ReferencedAttachment(
|
|
reference: referencedAttachment.reference,
|
|
attachment: refetchedAttachment,
|
|
).asProtoForSending()
|
|
else {
|
|
return .failure(OWSAssertionError(
|
|
"Failed to get proto for sending after successful upload!",
|
|
logger: logger,
|
|
))
|
|
}
|
|
|
|
return .success(attachmentProto)
|
|
} catch {
|
|
if error.isRetryable {
|
|
logger.warn("Ran out of retries uploading for backfill.")
|
|
} else {
|
|
logger.error("Failed to upload for backfill! \(error)")
|
|
}
|
|
|
|
return .failure(error)
|
|
}
|
|
}
|
|
|
|
// MARK: - Outbound Responses
|
|
|
|
private func sendTargetNotFoundResponse(
|
|
backfillTarget: AttachmentBackfillTarget,
|
|
localIdentifiers: LocalIdentifiers,
|
|
tx: DBWriteTransaction,
|
|
) {
|
|
sendBackfillResponse(
|
|
backfillTarget: backfillTarget,
|
|
localIdentifiers: localIdentifiers,
|
|
customizeResponseBlock: { responseBuilder in
|
|
responseBuilder.setError(.messageNotFound)
|
|
},
|
|
tx: tx,
|
|
)
|
|
}
|
|
|
|
private func sendBackfillAttemptResponse(
|
|
backfillTarget: AttachmentBackfillTarget,
|
|
backfillAttemptResults: [Result<SSKProtoAttachmentPointer, Error>],
|
|
localIdentifiers: LocalIdentifiers,
|
|
tx: DBWriteTransaction,
|
|
) {
|
|
sendBackfillResponse(
|
|
backfillTarget: backfillTarget,
|
|
localIdentifiers: localIdentifiers,
|
|
customizeResponseBlock: { responseBuilder in
|
|
let attachmentDatas = backfillAttemptResults.map { attemptResult -> SSKProtoSyncMessageAttachmentBackfillResponseAttachmentData in
|
|
let attachmentDataBuilder = SSKProtoSyncMessageAttachmentBackfillResponseAttachmentData.builder()
|
|
switch attemptResult {
|
|
case .success(let attachmentProto):
|
|
attachmentDataBuilder.setAttachment(attachmentProto)
|
|
case .failure(is BackedUpAttachmentMissingLocalFileError):
|
|
attachmentDataBuilder.setStatus(.pending)
|
|
case .failure(let error) where error.isRetryable:
|
|
attachmentDataBuilder.setStatus(.pending)
|
|
case .failure:
|
|
attachmentDataBuilder.setStatus(.terminalError)
|
|
}
|
|
return attachmentDataBuilder.buildInfallibly()
|
|
}
|
|
|
|
let attachmentDataListBuilder = SSKProtoSyncMessageAttachmentBackfillResponseAttachmentDataList.builder()
|
|
attachmentDataListBuilder.setAttachments(attachmentDatas)
|
|
responseBuilder.setAttachments(attachmentDataListBuilder.buildInfallibly())
|
|
},
|
|
tx: tx,
|
|
)
|
|
}
|
|
|
|
private func sendBackfillResponse(
|
|
backfillTarget: AttachmentBackfillTarget,
|
|
localIdentifiers: LocalIdentifiers,
|
|
customizeResponseBlock: (SSKProtoSyncMessageAttachmentBackfillResponseBuilder) -> Void,
|
|
tx: DBWriteTransaction,
|
|
) {
|
|
guard let localThread = threadStore.getOrCreateLocalThread(tx: tx) else {
|
|
owsFailDebug("Failed to get local thread.", logger: logger)
|
|
return
|
|
}
|
|
|
|
let responseProtoBuilder = SSKProtoSyncMessageAttachmentBackfillResponse.builder()
|
|
responseProtoBuilder.setTargetMessage(backfillTarget.addressableMessage.asProto)
|
|
responseProtoBuilder.setTargetConversation(backfillTarget.conversationIdentifier.asProto)
|
|
customizeResponseBlock(responseProtoBuilder)
|
|
|
|
do {
|
|
let syncMessage = try AttachmentBackfillResponseSyncMessage(
|
|
responseProto: responseProtoBuilder.buildInfallibly(),
|
|
localThread: localThread,
|
|
tx: tx,
|
|
)
|
|
|
|
syncMessageSender.add(
|
|
attachmentBackfillResponseSyncMessage: syncMessage,
|
|
tx: tx,
|
|
)
|
|
} catch {
|
|
owsFailDebug("Failed to build backfill response sync message! \(error)")
|
|
return
|
|
}
|
|
}
|
|
|
|
// MARK: - Inbound Responses
|
|
|
|
func enqueueInboundResponse(
|
|
attachmentBackfillResponseProto: SSKProtoSyncMessageAttachmentBackfillResponse,
|
|
registeredState: RegisteredState,
|
|
tx: DBWriteTransaction,
|
|
) {
|
|
// TODO: Enqueue downloads using the attachment pointers in the response.
|
|
logger.warn("AttachmentBackfillResponse not yet supported.")
|
|
}
|
|
|
|
// MARK: - Backfill Targets
|
|
|
|
private struct AttachmentBackfillTarget {
|
|
let addressableMessage: AddressableMessage
|
|
let conversationIdentifier: ConversationIdentifier
|
|
}
|
|
|
|
private func parseBackfillTarget(
|
|
attachmentBackfillRequestProto: SSKProtoSyncMessageAttachmentBackfillRequest,
|
|
tx: DBReadTransaction,
|
|
) -> AttachmentBackfillTarget? {
|
|
guard
|
|
let addressableMessageProto = attachmentBackfillRequestProto.targetMessage,
|
|
let addressableMessage = AddressableMessage(proto: addressableMessageProto)
|
|
else {
|
|
return nil
|
|
}
|
|
|
|
guard
|
|
let conversationIdentifierProto = attachmentBackfillRequestProto.targetConversation,
|
|
let conversationIdentifier = ConversationIdentifier(proto: conversationIdentifierProto)
|
|
else {
|
|
return nil
|
|
}
|
|
|
|
return AttachmentBackfillTarget(
|
|
addressableMessage: addressableMessage,
|
|
conversationIdentifier: conversationIdentifier,
|
|
)
|
|
}
|
|
|
|
private func locateBackfillTargetMessage(
|
|
_ backfillTarget: AttachmentBackfillTarget,
|
|
tx: DBReadTransaction,
|
|
) -> TSMessage? {
|
|
let thread: TSThread
|
|
switch backfillTarget.conversationIdentifier {
|
|
case .serviceId(let serviceId):
|
|
guard
|
|
let recipient = recipientDatabaseTable.fetchRecipient(serviceId: serviceId, transaction: tx),
|
|
let contactThread = threadStore.fetchContactThread(recipient: recipient, tx: tx)
|
|
else {
|
|
return nil
|
|
}
|
|
|
|
thread = contactThread
|
|
case .e164(let e164):
|
|
guard
|
|
let recipient = recipientDatabaseTable.fetchRecipient(phoneNumber: e164.stringValue, transaction: tx),
|
|
let contactThread = threadStore.fetchContactThread(recipient: recipient, tx: tx)
|
|
else {
|
|
return nil
|
|
}
|
|
|
|
thread = contactThread
|
|
case .groupIdentifier(let groupIdentifier):
|
|
guard let groupThread = threadStore.fetchGroupThread(groupId: groupIdentifier, tx: tx) else {
|
|
return nil
|
|
}
|
|
|
|
thread = groupThread
|
|
}
|
|
|
|
let authorAddress: SignalServiceAddress
|
|
switch backfillTarget.addressableMessage.author {
|
|
case .aci(let aci):
|
|
authorAddress = SignalServiceAddress(aci)
|
|
case .e164(let e164):
|
|
authorAddress = SignalServiceAddress(e164)
|
|
}
|
|
|
|
return interactionStore.findMessage(
|
|
withTimestamp: backfillTarget.addressableMessage.sentTimestamp,
|
|
threadId: thread.uniqueId,
|
|
author: authorAddress,
|
|
tx: tx,
|
|
)
|
|
}
|
|
|
|
private func assembleBackfillTarget(
|
|
message: TSMessage,
|
|
localIdentifiers: LocalIdentifiers,
|
|
tx: DBReadTransaction,
|
|
) -> AttachmentBackfillTarget? {
|
|
guard
|
|
let addressableMessage = AddressableMessage(
|
|
message: message,
|
|
localIdentifiers: localIdentifiers,
|
|
)
|
|
else {
|
|
owsFailDebug("Target message missing author info!", logger: logger)
|
|
return nil
|
|
}
|
|
|
|
guard let thread = threadStore.fetchThread(uniqueId: message.uniqueThreadId, tx: tx) else {
|
|
owsFailDebug("Missing thread for message!", logger: logger)
|
|
return nil
|
|
}
|
|
|
|
let conversationIdentifier: ConversationIdentifier
|
|
if let contactThread = thread as? TSContactThread {
|
|
if let serviceId = ServiceId.parseFrom(serviceIdBinary: nil, serviceIdString: contactThread.contactUUID) {
|
|
conversationIdentifier = .serviceId(serviceId)
|
|
} else if let e164 = E164(contactThread.contactPhoneNumber) {
|
|
conversationIdentifier = .e164(e164)
|
|
} else {
|
|
logger.warn("Contact thread missing identifiers!")
|
|
return nil
|
|
}
|
|
} else if
|
|
let groupThread = thread as? TSGroupThread,
|
|
let groupIdentifier = try? groupThread.groupIdentifier
|
|
{
|
|
conversationIdentifier = .groupIdentifier(groupIdentifier)
|
|
} else {
|
|
owsFailDebug("Target thread is neither contact nor group!", logger: logger)
|
|
return nil
|
|
}
|
|
|
|
return AttachmentBackfillTarget(
|
|
addressableMessage: addressableMessage,
|
|
conversationIdentifier: conversationIdentifier,
|
|
)
|
|
}
|
|
}
|
|
|
|
// MARK: - AttachmentBackfillManager.AttachmentBackfillSyncMessageSender
|
|
|
|
extension MessageSenderJobQueue: AttachmentBackfillManager.AttachmentBackfillSyncMessageSender {
|
|
func add(
|
|
attachmentBackfillRequestSyncMessage: AttachmentBackfillRequestSyncMessage,
|
|
tx: DBWriteTransaction,
|
|
) {
|
|
add(
|
|
message: .preprepared(transientMessageWithoutAttachments: attachmentBackfillRequestSyncMessage),
|
|
transaction: tx,
|
|
)
|
|
}
|
|
|
|
func add(
|
|
attachmentBackfillResponseSyncMessage: AttachmentBackfillResponseSyncMessage,
|
|
tx: DBWriteTransaction,
|
|
) {
|
|
add(
|
|
message: .preprepared(transientMessageWithoutAttachments: attachmentBackfillResponseSyncMessage),
|
|
transaction: tx,
|
|
)
|
|
}
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
private extension PrefixedLogger {
|
|
func suffixed(inboundRequestRecordId: AttachmentBackfillInboundRequestRecord.IDType) -> PrefixedLogger {
|
|
return suffixed(with: "[ID:\(inboundRequestRecordId)]")
|
|
}
|
|
}
|