209 lines
8.9 KiB
Swift
209 lines
8.9 KiB
Swift
//
|
|
// Copyright 2023 Signal Messenger, LLC
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
//
|
|
|
|
import Foundation
|
|
public import GRDB
|
|
|
|
public enum TimeGatedBatch {
|
|
/// Processes `objects` within one or more transactions.
|
|
///
|
|
/// You probably don't need this method and shouldn't use it. Splitting an
|
|
/// operation across multiple transactions (whether you're using this method
|
|
/// or not) requires careful consideration to ensure data integrity.
|
|
///
|
|
/// Most loops should require approximately the same amount of time for each
|
|
/// object. In those cases, it's simpler to split `objects` into batches
|
|
/// with a fixed size and process each batch within its own transaction.
|
|
///
|
|
/// However, if you are processing objects with significant and
|
|
/// unpredictable variability in their processing time, this pattern may be
|
|
/// useful. If a few elements take orders of magnitude longer to process
|
|
/// than the majority, this will provide a reasonable balance between the
|
|
/// number of transactions and the amount of work performed in each
|
|
/// transaction. You must ensure that it's safe to process each object in a
|
|
/// separate transaction AND you also must ensure it's safe to process all
|
|
/// objects within a single transaction BECAUSE this method will split
|
|
/// transactions at arbitrary points.
|
|
///
|
|
/// - Parameter yieldTxAfter: A suggestion for the maximum amount of time to
|
|
/// keep the transaction open for a single batch. This method will start a
|
|
/// new transaction when `block` returns if more than `yieldTxAfter` seconds
|
|
/// have elapsed since the transaction was opened. Note: This means the
|
|
/// actual maximum transaction duration is unbounded because `block` may
|
|
/// never return or may run extremely slow queries.
|
|
public static func enumerateObjects<T, E>(
|
|
_ objects: some Sequence<T>,
|
|
db: any DB,
|
|
yieldTxAfter: TimeInterval = 1.0,
|
|
file: String = #file,
|
|
function: String = #function,
|
|
line: Int = #line,
|
|
block: (T, DBWriteTransaction) throws(E) -> Void,
|
|
) async throws(E) {
|
|
var isDone = false
|
|
var objectEnumerator = objects.makeIterator()
|
|
while !isDone {
|
|
try await db.awaitableWrite(file: file, function: function, line: line) { tx throws(E) -> Void in
|
|
let startTime = CACurrentMediaTime()
|
|
while true {
|
|
guard let object = objectEnumerator.next() else {
|
|
// We're done with everything, so exit the outer loop.
|
|
isDone = true
|
|
return
|
|
}
|
|
try block(object, tx)
|
|
let elapsedTime = CACurrentMediaTime() - startTime
|
|
guard elapsedTime < yieldTxAfter else {
|
|
// We're done with this batch, so we want another transaction.
|
|
return
|
|
}
|
|
// Process another object with this transaction...
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
public enum ProcessBatchResult<T> {
|
|
case more
|
|
case done(T)
|
|
}
|
|
|
|
/// Processes all elements in batches bound by time.
|
|
///
|
|
/// This method invokes `processBatch` repeatedly & in a tight loop. It
|
|
/// stops when `processBatch` returns `.done` (indicating there's nothing
|
|
/// left to do). Callers must ensure `processBatch` eventually returns
|
|
/// `.done`; they likely need to delete objects as part of each batch or
|
|
/// maintain a cursor to avoid processing the same elements multiple times.
|
|
///
|
|
/// This method is most useful for "fetch and delete" operations that are
|
|
/// trying to avoid DELETE-ing rows from the database while SELECT-ing them
|
|
/// via enumeration. This method will execute multiple batches within a
|
|
/// single transaction (if time allows), so those operations can fetch &
|
|
/// delete in small batches without exploding the number of transactions.
|
|
///
|
|
/// - parameter errorTxCompletion: The strategy to employ with the latest
|
|
/// transaction if an error is thrown: rollback or commit changes made so
|
|
/// far _within_ that latest transaction. Note that prior non-throwing
|
|
/// transactions would have already been committed and are not rolled back.
|
|
///
|
|
/// - Parameter buildTxContext: A block run once immediately when a new
|
|
/// transaction is opened, returning a context object shared by each batch
|
|
/// processed in that transaction.
|
|
///
|
|
/// - Parameter concludeTx: A block run once just before a transaction is
|
|
/// closed, which may be used to commit information about the batches
|
|
/// processed in that transaction.
|
|
public static func processAll<E: Error, TxContext, DoneResult>(
|
|
db: DB,
|
|
yieldTxAfter maximumDuration: TimeInterval = 0.5,
|
|
delayTwixtTx: TimeInterval = 0,
|
|
errorTxCompletion: GRDB.Database.TransactionCompletion = .commit,
|
|
buildTxContext: (DBWriteTransaction) -> TxContext,
|
|
processBatch: (DBWriteTransaction, inout TxContext) throws(E) -> ProcessBatchResult<DoneResult>,
|
|
concludeTx: (DBWriteTransaction, TxContext) -> Void,
|
|
) async throws(E) -> DoneResult {
|
|
return try await _processAll(
|
|
db: db,
|
|
yieldTxAfter: maximumDuration,
|
|
delayTwixtTx: delayTwixtTx,
|
|
errorTxCompletion: errorTxCompletion,
|
|
buildTxContext: buildTxContext,
|
|
processBatch: processBatch,
|
|
concludeTx: concludeTx,
|
|
)
|
|
}
|
|
|
|
/// Processes all elements in batches bound by time.
|
|
///
|
|
/// Like `processAll` but without "transaction contexts".
|
|
public static func processAll<E: Error, DoneResult>(
|
|
db: DB,
|
|
yieldTxAfter maximumDuration: TimeInterval = 0.5,
|
|
delayTwixtTx: TimeInterval = 0,
|
|
errorTxCompletion: GRDB.Database.TransactionCompletion = .commit,
|
|
processBatch: (DBWriteTransaction) throws(E) -> ProcessBatchResult<DoneResult>,
|
|
) async throws(E) -> DoneResult {
|
|
return try await _processAll(
|
|
db: db,
|
|
yieldTxAfter: maximumDuration,
|
|
delayTwixtTx: delayTwixtTx,
|
|
errorTxCompletion: errorTxCompletion,
|
|
buildTxContext: { _ in DummyTxContext() },
|
|
processBatch: { tx, _ throws(E) in try processBatch(tx) },
|
|
concludeTx: { _, _ in },
|
|
)
|
|
}
|
|
|
|
/// See docs on `processAll`.
|
|
private static func _processAll<E: Error, TxContext, DoneResult>(
|
|
db: DB,
|
|
yieldTxAfter maximumDuration: TimeInterval,
|
|
delayTwixtTx: TimeInterval,
|
|
errorTxCompletion: GRDB.Database.TransactionCompletion,
|
|
buildTxContext: (DBWriteTransaction) -> TxContext,
|
|
processBatch: (DBWriteTransaction, inout TxContext) throws(E) -> ProcessBatchResult<DoneResult>,
|
|
concludeTx: (DBWriteTransaction, TxContext) -> Void,
|
|
) async throws(E) -> DoneResult {
|
|
while true {
|
|
let txBlock: (DBWriteTransaction) throws(E) -> ProcessBatchResult<DoneResult> = { tx in
|
|
return try processBatchesInTransaction(
|
|
maximumDuration: maximumDuration,
|
|
buildTxContext: buildTxContext,
|
|
processBatch: processBatch,
|
|
concludeTx: concludeTx,
|
|
tx: tx,
|
|
)
|
|
}
|
|
|
|
let batchResult = switch errorTxCompletion {
|
|
case .commit:
|
|
try await db.awaitableWrite(block: txBlock)
|
|
case .rollback:
|
|
try await db.awaitableWriteWithRollbackIfThrows(block: txBlock)
|
|
}
|
|
|
|
switch batchResult {
|
|
case .more:
|
|
try? await Task.sleep(nanoseconds: delayTwixtTx.clampedNanoseconds)
|
|
case .done(let result):
|
|
return result
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
private struct DummyTxContext {}
|
|
|
|
/// Process as many batches in the given transaction as possible in the
|
|
/// given duration.
|
|
private static func processBatchesInTransaction<E: Error, TxContext, DoneResult>(
|
|
maximumDuration: CFTimeInterval,
|
|
buildTxContext: (DBWriteTransaction) -> TxContext,
|
|
processBatch: (DBWriteTransaction, inout TxContext) throws(E) -> ProcessBatchResult<DoneResult>,
|
|
concludeTx: (DBWriteTransaction, TxContext) -> Void,
|
|
tx: DBWriteTransaction,
|
|
) throws(E) -> ProcessBatchResult<DoneResult> {
|
|
let yieldDeadline = CACurrentMediaTime() + maximumDuration
|
|
var txContext = buildTxContext(tx)
|
|
defer { concludeTx(tx, txContext) }
|
|
|
|
while true {
|
|
let batchResult = try autoreleasepool { () throws(E) -> ProcessBatchResult in
|
|
return try processBatch(tx, &txContext)
|
|
}
|
|
|
|
if case .more = batchResult, CACurrentMediaTime() <= yieldDeadline {
|
|
continue
|
|
}
|
|
|
|
return batchResult
|
|
}
|
|
}
|
|
}
|