Always call concludeTx, even if processBatch throws
This commit is contained in:
parent
a32dfeefab
commit
96e4392d59
@ -65,7 +65,7 @@ struct InfoMessageGroupUpdateMigrator {
|
||||
|
||||
try await TimeGatedBatch.processAll(
|
||||
db: db,
|
||||
buildTxContext: { tx throws(CancellationError) -> TxContext in
|
||||
buildTxContext: { tx -> TxContext in
|
||||
let lastMigratedInfoMessageRowID = kvStore.fetchValue(
|
||||
Int64.self,
|
||||
forKey: StoreKeys.lastMigratedInfoMessageRowID,
|
||||
@ -160,7 +160,7 @@ struct InfoMessageGroupUpdateMigrator {
|
||||
context.lastMigratedInfoMessageRowID = infoMessage.rowID
|
||||
return .more
|
||||
},
|
||||
concludeTx: { tx, context throws(CancellationError) in
|
||||
concludeTx: { tx, context in
|
||||
// We've directly modified TSInteractions that may be cached, so
|
||||
// clear said caches.
|
||||
modelReadCaches().evacuateAllCaches()
|
||||
|
||||
@ -166,7 +166,7 @@ class BackupListMediaManagerImpl: BackupListMediaManager {
|
||||
let task = serialTaskQueue.enqueue { [self] in
|
||||
try await _queryListMediaIfNeeded()
|
||||
}
|
||||
let backgroundTask = OWSBackgroundTask(label: #function) { [task] status in
|
||||
let backgroundTask = OWSBackgroundTask(label: "[ListMediaManager]") { [task] status in
|
||||
switch status {
|
||||
case .expired:
|
||||
task.cancel()
|
||||
@ -356,7 +356,7 @@ class BackupListMediaManagerImpl: BackupListMediaManager {
|
||||
try await TimeGatedBatch.processAll(
|
||||
db: db,
|
||||
delayTwixtTx: 0.2,
|
||||
buildTxContext: { tx throws(CancellationError) -> TxContext in
|
||||
buildTxContext: { tx -> TxContext in
|
||||
let lastEnumeratedAttachmentId: Attachment.IDType? = kvStore.getInt64(
|
||||
Constants.lastEnumeratedAttachmentIdKey,
|
||||
transaction: tx,
|
||||
@ -442,7 +442,7 @@ class BackupListMediaManagerImpl: BackupListMediaManager {
|
||||
|
||||
return .more
|
||||
},
|
||||
concludeTx: { tx, txContext throws(CancellationError) in
|
||||
concludeTx: { tx, txContext in
|
||||
let startAttachmentLogString = txContext.originalLastEnumeratedAttachmentId.map { String($0) } ?? "nil"
|
||||
let endAttachmentLogString = txContext.lastEnumeratedAttachmentId.map { String($0) } ?? "nil"
|
||||
logger.info("Checked attachments [\(startAttachmentLogString)...\(endAttachmentLogString)]. didFinish \(txContext.didFinish)")
|
||||
@ -477,7 +477,7 @@ class BackupListMediaManagerImpl: BackupListMediaManager {
|
||||
try await TimeGatedBatch.processAll(
|
||||
db: db,
|
||||
delayTwixtTx: 0.2,
|
||||
buildTxContext: { tx throws(CancellationError) -> Void in
|
||||
buildTxContext: { tx -> Void in
|
||||
// Nothing – we use the in-memory integrityChecker.
|
||||
},
|
||||
processBatch: { tx, _ throws(CancellationError) in
|
||||
@ -507,7 +507,7 @@ class BackupListMediaManagerImpl: BackupListMediaManager {
|
||||
|
||||
return .more
|
||||
},
|
||||
concludeTx: { tx, _ throws(CancellationError) in
|
||||
concludeTx: { tx, _ in
|
||||
if
|
||||
let integrityCheckResult = integrityChecker.result,
|
||||
let serializedResult = try? JSONEncoder().encode(integrityCheckResult)
|
||||
|
||||
@ -103,9 +103,9 @@ public enum TimeGatedBatch {
|
||||
yieldTxAfter maximumDuration: TimeInterval = 0.5,
|
||||
delayTwixtTx: TimeInterval = 0,
|
||||
errorTxCompletion: GRDB.Database.TransactionCompletion = .commit,
|
||||
buildTxContext: (DBWriteTransaction) throws(E) -> TxContext,
|
||||
buildTxContext: (DBWriteTransaction) -> TxContext,
|
||||
processBatch: (DBWriteTransaction, inout TxContext) throws(E) -> ProcessBatchResult<DoneResult>,
|
||||
concludeTx: (DBWriteTransaction, TxContext) throws(E) -> Void,
|
||||
concludeTx: (DBWriteTransaction, TxContext) -> Void,
|
||||
) async throws(E) -> DoneResult {
|
||||
return try await _processAll(
|
||||
db: db,
|
||||
@ -133,9 +133,9 @@ public enum TimeGatedBatch {
|
||||
yieldTxAfter: maximumDuration,
|
||||
delayTwixtTx: delayTwixtTx,
|
||||
errorTxCompletion: errorTxCompletion,
|
||||
buildTxContext: { _ throws(E) in DummyTxContext() },
|
||||
buildTxContext: { _ in DummyTxContext() },
|
||||
processBatch: { tx, _ throws(E) in try processBatch(tx) },
|
||||
concludeTx: { _, _ throws(E) in },
|
||||
concludeTx: { _, _ in },
|
||||
)
|
||||
}
|
||||
|
||||
@ -145,9 +145,9 @@ public enum TimeGatedBatch {
|
||||
yieldTxAfter maximumDuration: TimeInterval,
|
||||
delayTwixtTx: TimeInterval,
|
||||
errorTxCompletion: GRDB.Database.TransactionCompletion,
|
||||
buildTxContext: (DBWriteTransaction) throws(E) -> TxContext,
|
||||
buildTxContext: (DBWriteTransaction) -> TxContext,
|
||||
processBatch: (DBWriteTransaction, inout TxContext) throws(E) -> ProcessBatchResult<DoneResult>,
|
||||
concludeTx: (DBWriteTransaction, TxContext) throws(E) -> Void,
|
||||
concludeTx: (DBWriteTransaction, TxContext) -> Void,
|
||||
) async throws(E) -> DoneResult {
|
||||
while true {
|
||||
let txBlock: (DBWriteTransaction) throws(E) -> ProcessBatchResult<DoneResult> = { tx in
|
||||
@ -184,21 +184,24 @@ public enum TimeGatedBatch {
|
||||
/// given duration.
|
||||
private static func processBatchesInTransaction<E: Error, TxContext, DoneResult>(
|
||||
maximumDuration: CFTimeInterval,
|
||||
buildTxContext: (DBWriteTransaction) throws(E) -> TxContext,
|
||||
buildTxContext: (DBWriteTransaction) -> TxContext,
|
||||
processBatch: (DBWriteTransaction, inout TxContext) throws(E) -> ProcessBatchResult<DoneResult>,
|
||||
concludeTx: (DBWriteTransaction, TxContext) throws(E) -> Void,
|
||||
concludeTx: (DBWriteTransaction, TxContext) -> Void,
|
||||
tx: DBWriteTransaction,
|
||||
) throws(E) -> ProcessBatchResult<DoneResult> {
|
||||
let yieldDeadline = CACurrentMediaTime() + maximumDuration
|
||||
var txContext = try buildTxContext(tx)
|
||||
var txContext = buildTxContext(tx)
|
||||
defer { concludeTx(tx, txContext) }
|
||||
|
||||
while true {
|
||||
let batchResult = try autoreleasepool { () throws(E) -> ProcessBatchResult in
|
||||
return try processBatch(tx, &txContext)
|
||||
}
|
||||
|
||||
if case .more = batchResult, CACurrentMediaTime() <= yieldDeadline {
|
||||
continue
|
||||
}
|
||||
try concludeTx(tx, txContext)
|
||||
|
||||
return batchResult
|
||||
}
|
||||
}
|
||||
|
||||
@ -223,4 +223,37 @@ class DBTimeBatchingTest: XCTestCase {
|
||||
XCTAssertEqual(processBatchCount, maxBatchCount)
|
||||
XCTAssertEqual(concludeTxCount, maxBatchCount)
|
||||
}
|
||||
|
||||
func testTxContextThrows() async {
|
||||
var buildTxContextCount = 0
|
||||
var processBatchCount = 0
|
||||
var concludeTxCount = 0
|
||||
|
||||
struct TxContext {
|
||||
var id = 0
|
||||
}
|
||||
|
||||
try? await TimeGatedBatch.processAll(
|
||||
db: InMemoryDB(),
|
||||
buildTxContext: { _ in
|
||||
buildTxContextCount += 1
|
||||
return TxContext()
|
||||
},
|
||||
processBatch: { _, context -> TimeGatedBatch.ProcessBatchResult<Void> in
|
||||
struct DummyError: Error {}
|
||||
|
||||
processBatchCount += 1
|
||||
context.id += 1
|
||||
throw DummyError()
|
||||
},
|
||||
concludeTx: { _, context in
|
||||
XCTAssertEqual(context.id, 1)
|
||||
concludeTxCount += 1
|
||||
},
|
||||
)
|
||||
|
||||
XCTAssertEqual(buildTxContextCount, 1)
|
||||
XCTAssertEqual(processBatchCount, 1)
|
||||
XCTAssertEqual(concludeTxCount, 1)
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user