169 lines
6.1 KiB
Swift
169 lines
6.1 KiB
Swift
//
|
|
// Copyright 2022 Signal Messenger, LLC
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
//
|
|
|
|
import Foundation
|
|
public import LibSignalClient
|
|
|
|
private class LocalUserLeaveGroupJobRunnerFactory: JobRunnerFactory {
|
|
func buildRunner() -> LocalUserLeaveGroupJobRunner {
|
|
return buildRunner(isDeletingAccount: false, future: nil)
|
|
}
|
|
|
|
func buildRunner(isDeletingAccount: Bool, future: Future<[Promise<Void>]>?) -> LocalUserLeaveGroupJobRunner {
|
|
return LocalUserLeaveGroupJobRunner(isDeletingAccount: isDeletingAccount, future: future)
|
|
}
|
|
}
|
|
|
|
private class LocalUserLeaveGroupJobRunner: JobRunner {
|
|
private enum Constants {
|
|
static let maxRetries: UInt = 110
|
|
}
|
|
|
|
private let isDeletingAccount: Bool
|
|
private let future: Future<[Promise<Void>]>?
|
|
|
|
init(isDeletingAccount: Bool, future: Future<[Promise<Void>]>?) {
|
|
self.isDeletingAccount = isDeletingAccount
|
|
self.future = future
|
|
}
|
|
|
|
func runJobAttempt(_ jobRecord: LocalUserLeaveGroupJobRecord) async -> JobAttemptResult<[Promise<Void>]> {
|
|
return await JobAttemptResult.executeBlockWithDefaultErrorHandler(
|
|
jobRecord: jobRecord,
|
|
retryLimit: Constants.maxRetries,
|
|
db: DependenciesBridge.shared.db,
|
|
block: { try await _runJobAttempt(jobRecord) },
|
|
)
|
|
}
|
|
|
|
func didFinishJob(_ jobRecordId: JobRecord.RowId, result: JobResult<[Promise<Void>]>) async {
|
|
switch result.ranSuccessfullyOrError {
|
|
case .success(let result):
|
|
future?.resolve(result)
|
|
case .failure(let error):
|
|
future?.reject(error)
|
|
}
|
|
}
|
|
|
|
private func _runJobAttempt(_ jobRecord: LocalUserLeaveGroupJobRecord) async throws -> [Promise<Void>] {
|
|
if jobRecord.waitForMessageProcessing {
|
|
try await GroupManager.waitForMessageFetchingAndProcessingWithTimeout()
|
|
}
|
|
|
|
let groupThread = SSKEnvironment.shared.databaseStorageRef.read { tx in
|
|
return TSGroupThread.fetchGroupThreadViaCache(uniqueId: jobRecord.threadId, transaction: tx)
|
|
}
|
|
|
|
guard let groupThread, let groupModel = groupThread.groupModel as? TSGroupModelV2 else {
|
|
throw OWSAssertionError("Missing V2 group thread for operation")
|
|
}
|
|
|
|
let replacementAdminAci: Aci? = try jobRecord.replacementAdminAciString.map { aciString in
|
|
guard let aci = Aci.parseFrom(aciString: aciString) else {
|
|
throw OWSAssertionError("Couldn't parse replacementAdminAci")
|
|
}
|
|
return aci
|
|
}
|
|
|
|
do {
|
|
try await GroupManager.refreshGroupSendEndorsementsIfNeeded(threadId: groupThread.sqliteRowId!, groupModel: groupModel)
|
|
} catch where !error.isNetworkFailureOrTimeout {
|
|
Logger.warn("Tried and failed to refresh credentials; continuing anyways because credentials aren't required; error: \(error)")
|
|
}
|
|
|
|
let sendPromises = try await GroupManager.updateGroupV2(
|
|
groupModel: groupModel,
|
|
description: #fileID,
|
|
isDeletingAccount: isDeletingAccount,
|
|
) { groupChangeSet in
|
|
groupChangeSet.setShouldLeaveGroupDeclineInvite()
|
|
|
|
// Sometimes when we leave a group we take care to assign a new admin.
|
|
if let replacementAdminAci {
|
|
groupChangeSet.changeRoleForMember(replacementAdminAci, role: .administrator)
|
|
}
|
|
}
|
|
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
|
jobRecord.anyRemove(transaction: tx)
|
|
}
|
|
|
|
return sendPromises
|
|
}
|
|
}
|
|
|
|
public class LocalUserLeaveGroupJobQueue {
|
|
private let jobQueueRunner: JobQueueRunner<
|
|
JobRecordFinderImpl<LocalUserLeaveGroupJobRecord>,
|
|
LocalUserLeaveGroupJobRunnerFactory,
|
|
>
|
|
private var jobSerializer = CompletionSerializer()
|
|
private let jobRunnerFactory: LocalUserLeaveGroupJobRunnerFactory
|
|
|
|
public init(db: any DB, reachabilityManager: SSKReachabilityManager) {
|
|
self.jobRunnerFactory = LocalUserLeaveGroupJobRunnerFactory()
|
|
self.jobQueueRunner = JobQueueRunner(
|
|
canExecuteJobsConcurrently: false,
|
|
db: db,
|
|
jobFinder: JobRecordFinderImpl(db: db),
|
|
jobRunnerFactory: self.jobRunnerFactory,
|
|
)
|
|
self.jobQueueRunner.listenForReachabilityChanges(reachabilityManager: reachabilityManager)
|
|
}
|
|
|
|
func start(appContext: AppContext) {
|
|
jobQueueRunner.start(shouldRestartExistingJobs: appContext.isMainApp)
|
|
}
|
|
|
|
// MARK: - Promises
|
|
|
|
/// - Returns: A Promise for leaving the group whose value is a list of
|
|
/// Promises for sending the group update message(s) about leaving the
|
|
/// group. (See `updateGroupV2` for details.)
|
|
public func addJob(
|
|
groupThread: TSGroupThread,
|
|
replacementAdminAci: Aci?,
|
|
waitForMessageProcessing: Bool,
|
|
isDeletingAccount: Bool,
|
|
tx: DBWriteTransaction,
|
|
) -> Promise<[Promise<Void>]> {
|
|
guard groupThread.isGroupV2Thread else {
|
|
owsFail("[GV1] Mutations on V1 groups should be impossible!")
|
|
}
|
|
return Promise { future in
|
|
addJob(
|
|
threadId: groupThread.uniqueId,
|
|
replacementAdminAci: replacementAdminAci,
|
|
waitForMessageProcessing: waitForMessageProcessing,
|
|
isDeletingAccount: isDeletingAccount,
|
|
future: future,
|
|
tx: tx,
|
|
)
|
|
}
|
|
}
|
|
|
|
private func addJob(
|
|
threadId: String,
|
|
replacementAdminAci: Aci?,
|
|
waitForMessageProcessing: Bool,
|
|
isDeletingAccount: Bool,
|
|
future: Future<[Promise<Void>]>,
|
|
tx: DBWriteTransaction,
|
|
) {
|
|
let jobRecord = LocalUserLeaveGroupJobRecord(
|
|
threadId: threadId,
|
|
replacementAdminAci: replacementAdminAci,
|
|
waitForMessageProcessing: waitForMessageProcessing,
|
|
)
|
|
jobRecord.anyInsert(transaction: tx)
|
|
jobSerializer.addOrderedSyncCompletion(tx: tx) {
|
|
self.jobQueueRunner.addPersistedJob(
|
|
jobRecord,
|
|
runner: self.jobRunnerFactory.buildRunner(isDeletingAccount: isDeletingAccount, future: future),
|
|
)
|
|
}
|
|
}
|
|
}
|