// // Copyright 2023 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only // import Foundation public enum JobAttemptResult { /// The Job has succeeded or reached a terminal error. A retryable error may /// become terminal if too many failures occur. case finished(Result) /// The Job encountered a transient, retryable error. If `canRetryEarly` is /// true, the job may be attempted again before the time interval has /// elapsed (eg, when Reachability reports that we've reconnected). case retryAfter(TimeInterval, canRetryEarly: Bool = true) /// Invokes `block` and handles retryable errors. /// /// If `block()` succeeds, a terminal success result is returned. In this /// case, the caller (or, more typically, `block`) is responsible for /// removing the job from the database. /// /// If `block()` throws an error, calls `performDefaultErrorHandler`. public static func executeBlockWithDefaultErrorHandler( jobRecord: JobRecord, retryLimit: UInt, db: any DB, block: () async throws -> Success, ) async -> JobAttemptResult { do { let result = try await block() return .finished(.success(result)) } catch { return await db.awaitableWrite { tx in return performDefaultErrorHandler(error: error, jobRecord: jobRecord, retryLimit: retryLimit, tx: tx) } } } /// Performs default error handling for an error. /// /// If the job throws a retryable error, `jobRecord.failureCount` is /// incremented (assuming it's less than `retryLimit`) and this method /// returns a `.retryAfter` value with exponential backoff. /// /// If the job throws a terminal error (or retryable error with no retries /// remaining), the job is removed and a terminal error is returned. public static func performDefaultErrorHandler( error: Error, jobRecord: JobRecord, retryLimit: UInt, tx: DBWriteTransaction, ) -> JobAttemptResult { if jobRecord.failureCount < retryLimit, error.isRetryable { jobRecord.addFailure(tx: tx) let delay = OWSOperation.retryIntervalForExponentialBackoff(failureCount: jobRecord.failureCount, maxAverageBackoff: 14.1 * .minute) return .retryAfter(delay, canRetryEarly: true) } else { jobRecord.anyRemove(transaction: tx) return .finished(.failure(error)) } } } public enum JobResult { /// The Job ran to completion and returned a Result. case ranJob(Result) /// The Job couldn't be found, so it wasn't executed. case notFound /// The Job couldn't be fetched because of an Error. case fetchError(Error) public var ranSuccessfullyOrError: Result { switch self { case .ranJob(.success(let result)): return .success(result) case .ranJob(.failure(let error)), .fetchError(let error): return .failure(error) case .notFound: return .failure(OWSGenericError("JobRecord not found.")) } } } /// A `JobRunner` is responsible for running a `JobRecord`. /// /// A single JobRunner is used for all attempts of a `JobRecord` (within a /// single app launch/process). Therefore, a `JobRunner` is a good place to /// store mutable, transient state (eg a per-job error counter). /// /// When a job is initially scheduled, the caller can provide its own /// `JobRunner`. This can be useful for registering completion callbacks. /// However, it's important to note that those completion callbacks will be /// lost if the app is relaunched and a new `JobRunner` is created. In /// practice, this doesn't cause problems because the UX that's waiting for /// the completion callback is also dismissed if the app is relaunched. /// /// When should you use this type as opposed to ``TaskQueueLoader`` and ``TaskRecord``? /// The JobRunner classes use a single db table across all jobs, and thus provide more in-built functionality /// around scheduling, starting, retrying, and failing jobs. ``TaskQueueLoader`` requires you to write /// your own db table (or in memory queue) but offers less functionality in exchange. You should consider /// ``TaskQueueLoader`` mainly if you want to enforce a priority order on your jobs; ``JobRunner`` /// only handles FIFO ordering. public protocol JobRunner { associatedtype JobRecordType: JobRecord associatedtype JobAttemptResultSuccessType /// Runs a single attempt of the job. /// /// Each attempt can return one of two results: `.finished` (eg, "success", /// "terminal error", "out of retries") or `.retryAfter` (ie "network error; /// try again in 2 minutes"). /// /// If this method `.finished`, then it's also responsible for removing /// `jobRecord` from the database. Passing this responsibility to /// `runJobAttempt` ensures that removing `jobRecord` can be performed /// atomically with other database operations. (In DEBUG builds, the caller /// will try to ensure this invariant remains true.) func runJobAttempt(_ jobRecord: JobRecordType) async -> JobAttemptResult /// Invoked when a job reaches a terminal result. /// /// This method is guaranteed to be invoked exactly once for every /// `JobRunner` provided to `JobQueueRunner.addPersistedJob(...)` (assuming /// the app doesn't crash just before or during its execution). /// /// If a `JobRecord` is removed from the database before it's run (or if /// fetching it throws an error), then `runJobAttempt` won't be invoked, but /// this method will still be invoked. This method is therefore an excellent /// place to invoke "exactly once", in-memory completion handlers. func didFinishJob(_ jobRecordId: JobRecord.RowId, result: JobResult) async } /// A `JobRunnerFactory` creates `JobRunner` instances. /// /// Most `JobRunner` types currently use `Dependencies`, but when they /// don't, these factories will be responsible for passing dependencies into /// `JobRunner` instances. public protocol JobRunnerFactory { associatedtype JobRunnerType: JobRunner, Sendable /// Creates a `JobRunner` for a `JobRecord` loaded from the database. func buildRunner() -> JobRunnerType } public class JobQueueRunner< JobFinderType: JobRecordFinder & Sendable, JobRunnerFactoryType: JobRunnerFactory & Sendable, > where JobFinderType.JobRecordType == JobRunnerFactoryType.JobRunnerType.JobRecordType { typealias SuccessType = JobRunnerFactoryType.JobRunnerType.JobAttemptResultSuccessType private let db: any DB private let jobFinder: JobFinderType private let jobRunnerFactory: JobRunnerFactoryType private var observers = [NSObjectProtocol]() private enum Mode { /// The runner hasn't been started yet, or the runner is in the process of /// starting. In both of these states, new jobs are held until persisted /// jobs have been loaded. (This ensures new jobs execute after old jobs.) case loading(canExecuteJobsConcurrently: Bool, jobsToEnqueueAfterLoading: [QueuedJob]) /// The runner is operating concurrently. New jobs are started immediately. case concurrent /// The runner is operating serially. No jobs are running, so a new job can /// be started immediately. case serialPaused /// The runner is operating serially. A job is being executed, so any new /// jobs will be added to `nextJobs`. When the current job finishes, it will /// start the first job in `nextJobs`. case serialRunning(nextJobs: [QueuedJob]) } private struct QueuedJob { var rowId: JobRecord.RowId var runner: JobRunnerFactoryType.JobRunnerType } private struct State { var mode: Mode /// If a job encounters a transient failure, it can request to be run again /// after a delay. While it's waiting, it will store a reference to its /// waiting Task here. On certain external triggers (eg we reconnect to the /// Internet), these waiting Tasks can be canceled to trigger the next /// attempt immediately. var waitingTasks = [JobRecord.RowId: Task]() } private let state: AtomicValue public init(canExecuteJobsConcurrently: Bool, db: any DB, jobFinder: JobFinderType, jobRunnerFactory: JobRunnerFactoryType) { let mode: Mode = .loading(canExecuteJobsConcurrently: canExecuteJobsConcurrently, jobsToEnqueueAfterLoading: []) self.state = AtomicValue(State(mode: mode), lock: .init()) self.db = db self.jobFinder = jobFinder self.jobRunnerFactory = jobRunnerFactory } deinit { observers.forEach { NotificationCenter.default.removeObserver($0) } } public func start(shouldRestartExistingJobs: Bool) { Task { await self._start(shouldRestartExistingJobs: shouldRestartExistingJobs) } } private func _start(shouldRestartExistingJobs: Bool) async { var oldJobs = [JobFinderType.JobRecordType]() if shouldRestartExistingJobs { do { oldJobs.append(contentsOf: try await jobFinder.loadRunnableJobs(updateRunnableJobRecord: { _, _ in })) } catch { Logger.error("Couldn't start existing jobs, so no new jobs will start: \(error)") return } } state.update { state in let newMode: Mode switch state.mode { case .loading(let canExecuteJobsConcurrently, let jobsToEnqueueAfterLoading): // Every runner must be started before it can execute jobs. When a runner // is started, it may optionally fetch all previously-persisted jobs to // execute. Any jobs that are scheduled while previously-persisted jobs are // being loaded from disk must be scheduled after those // previously-persisted jobs. If a new job is persisted while // previously-persisted jobs are being loaded, then it may or may not be // considered a previously-persisted job (depending on db race conditions). // If it is present in the previously-persisted jobs, we want to ignore it // since the new job might have a custom `JobRunner`. let newRowIds = Set(jobsToEnqueueAfterLoading.map { $0.rowId }) var queuedJobs = [QueuedJob]() queuedJobs.append( contentsOf: oldJobs.filter { !newRowIds.contains($0.id!) }.map { QueuedJob(rowId: $0.id!, runner: jobRunnerFactory.buildRunner()) }, ) queuedJobs.append(contentsOf: jobsToEnqueueAfterLoading) if canExecuteJobsConcurrently { queuedJobs.forEach(runJob(_:)) newMode = .concurrent } else if queuedJobs.isEmpty { newMode = .serialPaused } else { var serialJobs = queuedJobs runJob(serialJobs.removeFirst()) newMode = .serialRunning(nextJobs: serialJobs) } case .concurrent, .serialPaused, .serialRunning: owsFail("Can't start a JobQueueRunner more than once.") } state.mode = newMode } } // MARK: - Reachability Changes public func listenForReachabilityChanges(reachabilityManager: SSKReachabilityManager) { observers.append(NotificationCenter.default.addObserver( forName: SSKReachability.owsReachabilityDidChange, object: reachabilityManager, queue: nil, using: { [weak self] _ in if reachabilityManager.isReachable { self?.retryWaitingJobs() } }, )) } func retryWaitingJobs() { // Cancel each waiting task so that the next retry can commence. state.update { state in state.waitingTasks.forEach { _, waitingTask in waitingTask.cancel() } } } // MARK: - Queuing Jobs public func addPersistedJob(_ jobRecord: JobFinderType.JobRecordType, runner: JobRunnerFactoryType.JobRunnerType? = nil) { enqueueAndStartJob(QueuedJob(rowId: jobRecord.id!, runner: runner ?? jobRunnerFactory.buildRunner())) } private func enqueueAndStartJob(_ job: QueuedJob) { state.update { state in switch state.mode { case .loading(let canExecuteJobsConcurrently, let jobsToEnqueueAfterLoading): state.mode = .loading( canExecuteJobsConcurrently: canExecuteJobsConcurrently, jobsToEnqueueAfterLoading: jobsToEnqueueAfterLoading + [job], ) case .concurrent: runJob(job) case .serialPaused: runJob(job) state.mode = .serialRunning(nextJobs: []) case .serialRunning(nextJobs: let nextJobs): state.mode = .serialRunning(nextJobs: nextJobs + [job]) } } } // MARK: - Running Jobs private func runJob(_ queuedJob: QueuedJob) { Task { let jobResult = await _runJob(queuedJob) await queuedJob.runner.didFinishJob(queuedJob.rowId, result: jobResult) state.update { state in startNextJob(state: &state) } } } private func _runJob(_ queuedJob: QueuedJob) async -> JobResult { while true { switch await runJobAttempt(queuedJob) { case .runAgain: continue case .notFound: return .notFound case .finished(let result): return .ranJob(result) case .fetchError(let error): Logger.warn("Couldn't fetch a job; skipping just that job: \(error)") return .fetchError(error) } } } private enum JobAttemptResult { case runAgain case finished(Result) case notFound case fetchError(Error) } private func runJobAttempt(_ queuedJob: QueuedJob) async -> JobAttemptResult { let jobRecord: JobFinderType.JobRecordType? do { jobRecord = try db.read(block: { tx in try jobFinder.fetchJob(rowId: queuedJob.rowId, tx: tx) }) } catch { return .fetchError(error) } guard let jobRecord else { // The job no longer exists, so we can start the next one. return .notFound } let result = await queuedJob.runner.runJobAttempt(jobRecord) switch result { case .finished(let result): // In DEBUG builds, make sure that .finished jobs were deleted. assert(db.read(block: { tx in (try? jobFinder.fetchJob(rowId: queuedJob.rowId, tx: tx)) == nil })) return .finished(result) case .retryAfter(let retryAfter, let canRetryEarly): // Create a Task that waits for `retryAfter`. If `retryWaitingJobs` is // called, this Task will be canceled, starting the next retry immediately. let waitingTask = Task { _ = try? await Task.sleep(nanoseconds: retryAfter.clampedNanoseconds) } if canRetryEarly { state.update { state in state.waitingTasks[queuedJob.rowId] = waitingTask } } await waitingTask.value if canRetryEarly { state.update { state in state.waitingTasks[queuedJob.rowId] = nil } } return .runAgain } } private func startNextJob(state: inout State) { switch state.mode { case .loading, .serialPaused: owsFailBeta("Can't start the next job.") case .concurrent: return // All of these are started immediately. case .serialRunning(nextJobs: var nextJobs): state.mode = { if nextJobs.isEmpty { // There's no more jobs to run, so pause the runner. return .serialPaused } else { let jobToRun = nextJobs.removeFirst() runJob(jobToRun) return .serialRunning(nextJobs: nextJobs) } }() } } }