// // Copyright 2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only // import Foundation public import LibSignalClient private class LocalUserLeaveGroupJobRunnerFactory: JobRunnerFactory { func buildRunner() -> LocalUserLeaveGroupJobRunner { return buildRunner(isDeletingAccount: false, future: nil) } func buildRunner(isDeletingAccount: Bool, future: Future<[Promise]>?) -> LocalUserLeaveGroupJobRunner { return LocalUserLeaveGroupJobRunner(isDeletingAccount: isDeletingAccount, future: future) } } private class LocalUserLeaveGroupJobRunner: JobRunner { private enum Constants { static let maxRetries: UInt = 110 } private let isDeletingAccount: Bool private let future: Future<[Promise]>? init(isDeletingAccount: Bool, future: Future<[Promise]>?) { self.isDeletingAccount = isDeletingAccount self.future = future } func runJobAttempt(_ jobRecord: LocalUserLeaveGroupJobRecord) async -> JobAttemptResult<[Promise]> { return await JobAttemptResult.executeBlockWithDefaultErrorHandler( jobRecord: jobRecord, retryLimit: Constants.maxRetries, db: DependenciesBridge.shared.db, block: { try await _runJobAttempt(jobRecord) }, ) } func didFinishJob(_ jobRecordId: JobRecord.RowId, result: JobResult<[Promise]>) async { switch result.ranSuccessfullyOrError { case .success(let result): future?.resolve(result) case .failure(let error): future?.reject(error) } } private func _runJobAttempt(_ jobRecord: LocalUserLeaveGroupJobRecord) async throws -> [Promise] { if jobRecord.waitForMessageProcessing { try await GroupManager.waitForMessageFetchingAndProcessingWithTimeout() } let groupThread = SSKEnvironment.shared.databaseStorageRef.read { tx in return TSGroupThread.fetchGroupThreadViaCache(uniqueId: jobRecord.threadId, transaction: tx) } guard let groupThread, let groupModel = groupThread.groupModel as? TSGroupModelV2 else { throw OWSAssertionError("Missing V2 group thread for operation") } let replacementAdminAci: Aci? = try jobRecord.replacementAdminAciString.map { aciString in guard let aci = Aci.parseFrom(aciString: aciString) else { throw OWSAssertionError("Couldn't parse replacementAdminAci") } return aci } do { try await GroupManager.refreshGroupSendEndorsementsIfNeeded(threadId: groupThread.sqliteRowId!, groupModel: groupModel) } catch where !error.isNetworkFailureOrTimeout { Logger.warn("Tried and failed to refresh credentials; continuing anyways because credentials aren't required; error: \(error)") } let sendPromises = try await GroupManager.updateGroupV2( groupModel: groupModel, description: #fileID, isDeletingAccount: isDeletingAccount, ) { groupChangeSet in groupChangeSet.setShouldLeaveGroupDeclineInvite() // Sometimes when we leave a group we take care to assign a new admin. if let replacementAdminAci { groupChangeSet.changeRoleForMember(replacementAdminAci, role: .administrator) } } await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in jobRecord.anyRemove(transaction: tx) } return sendPromises } } public class LocalUserLeaveGroupJobQueue { private let jobQueueRunner: JobQueueRunner< JobRecordFinderImpl, LocalUserLeaveGroupJobRunnerFactory, > private var jobSerializer = CompletionSerializer() private let jobRunnerFactory: LocalUserLeaveGroupJobRunnerFactory public init(db: any DB, reachabilityManager: SSKReachabilityManager) { self.jobRunnerFactory = LocalUserLeaveGroupJobRunnerFactory() self.jobQueueRunner = JobQueueRunner( canExecuteJobsConcurrently: false, db: db, jobFinder: JobRecordFinderImpl(db: db), jobRunnerFactory: self.jobRunnerFactory, ) self.jobQueueRunner.listenForReachabilityChanges(reachabilityManager: reachabilityManager) } func start(appContext: AppContext) { jobQueueRunner.start(shouldRestartExistingJobs: appContext.isMainApp) } // MARK: - Promises /// - Returns: A Promise for leaving the group whose value is a list of /// Promises for sending the group update message(s) about leaving the /// group. (See `updateGroupV2` for details.) public func addJob( groupThread: TSGroupThread, replacementAdminAci: Aci?, waitForMessageProcessing: Bool, isDeletingAccount: Bool, tx: DBWriteTransaction, ) -> Promise<[Promise]> { guard groupThread.isGroupV2Thread else { owsFail("[GV1] Mutations on V1 groups should be impossible!") } return Promise { future in addJob( threadId: groupThread.uniqueId, replacementAdminAci: replacementAdminAci, waitForMessageProcessing: waitForMessageProcessing, isDeletingAccount: isDeletingAccount, future: future, tx: tx, ) } } private func addJob( threadId: String, replacementAdminAci: Aci?, waitForMessageProcessing: Bool, isDeletingAccount: Bool, future: Future<[Promise]>, tx: DBWriteTransaction, ) { let jobRecord = LocalUserLeaveGroupJobRecord( threadId: threadId, replacementAdminAci: replacementAdminAci, waitForMessageProcessing: waitForMessageProcessing, ) jobRecord.anyInsert(transaction: tx) jobSerializer.addOrderedSyncCompletion(tx: tx) { self.jobQueueRunner.addPersistedJob( jobRecord, runner: self.jobRunnerFactory.buildRunner(isDeletingAccount: isDeletingAccount, future: future), ) } } }