Fix group message processor error handling
This commit is contained in:
parent
8973744341
commit
09ace20318
@ -93,6 +93,8 @@ internal class SpecificGroupMessageProcessor {
|
||||
do throws(RetryableError) {
|
||||
defer { backgroundTask.end() }
|
||||
|
||||
var newestGuaranteedFailureJobId: Int64?
|
||||
|
||||
// ...process them, until we hit an error that requires backoff.
|
||||
while true {
|
||||
do throws(CancellationError) {
|
||||
@ -122,7 +124,10 @@ internal class SpecificGroupMessageProcessor {
|
||||
$0.mightBeAbleToMakeProgress = false
|
||||
}
|
||||
|
||||
let hasMore = try await self.processBatch(batchLimit: batchSize)
|
||||
let hasMore = try await self.processBatch(
|
||||
batchLimit: batchSize,
|
||||
newestGuaranteedFailureJobId: &newestGuaranteedFailureJobId
|
||||
)
|
||||
if !hasMore {
|
||||
return
|
||||
}
|
||||
@ -263,7 +268,7 @@ internal class SpecificGroupMessageProcessor {
|
||||
/// This method may process only a subset of the jobs.
|
||||
///
|
||||
/// - Returns: True if there are more jobs to process.
|
||||
private func processBatch(batchLimit: Int) async throws(RetryableError) -> Bool {
|
||||
private func processBatch(batchLimit: Int, newestGuaranteedFailureJobId: inout Int64?) async throws(RetryableError) -> Bool {
|
||||
let databaseStorage = SSKEnvironment.shared.databaseStorageRef
|
||||
|
||||
let hasMore: Bool
|
||||
@ -290,7 +295,10 @@ internal class SpecificGroupMessageProcessor {
|
||||
}
|
||||
|
||||
if let asyncJob {
|
||||
let didUpdateGroup = try await updateGroup(jobInfo: asyncJob)
|
||||
let didUpdateGroup = try await updateGroup(
|
||||
jobInfo: asyncJob,
|
||||
newestGuaranteedFailureJobId: &newestGuaranteedFailureJobId
|
||||
)
|
||||
await databaseStorage.awaitableWrite { tx in
|
||||
if didUpdateGroup {
|
||||
self.performLocalProcessingSync(jobInfo: asyncJob, tx: tx)
|
||||
@ -318,6 +326,18 @@ internal class SpecificGroupMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
private func newestJobId() -> Int64? {
|
||||
let databaseStorage = SSKEnvironment.shared.databaseStorageRef
|
||||
return databaseStorage.read { tx in
|
||||
do {
|
||||
return try self.finder.newestJobId(tx: tx)
|
||||
} catch {
|
||||
DatabaseCorruptionState.flagDatabaseReadCorruptionIfNecessary(error: error)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func didCompleteJob(_ job: GroupMessageProcessorJob, tx: DBWriteTransaction) {
|
||||
failIfThrows {
|
||||
try self.finder.removeJob(withRowId: job.id, tx: tx)
|
||||
@ -346,14 +366,37 @@ internal class SpecificGroupMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
private func updateGroup(jobInfo: IncomingGroupsV2MessageJobInfo) async throws(RetryableError) -> Bool {
|
||||
private func updateGroup(
|
||||
jobInfo: IncomingGroupsV2MessageJobInfo,
|
||||
newestGuaranteedFailureJobId: inout Int64?
|
||||
) async throws(RetryableError) -> Bool {
|
||||
// First, we try to update the group locally using changes embedded in
|
||||
// the group context (if any).
|
||||
if try await updateUsingEmbeddedGroupUpdate(jobInfo: jobInfo) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Next, we check if we've already failed to fetch state from the server.
|
||||
// If we've hit a terminal error, that error applies to ALL of the
|
||||
// already-enqueued messages. (It doesn't apply to newly-enqueued messages,
|
||||
// though, which is why we compare the job's ID.)
|
||||
if let newestGuaranteedFailureJobId, jobInfo.job.id <= newestGuaranteedFailureJobId {
|
||||
return false
|
||||
}
|
||||
// If we're going to check with the server, capture the newest job ID
|
||||
// BEFORE issuing the request. This ensures that jobs enqueued after we
|
||||
// start this fetch will issue their own fetch.
|
||||
let newestJobId: Int64? = self.newestJobId()
|
||||
|
||||
// If that fails, fall back to a fetch via the service.
|
||||
return try await tryToUpdateUsingService(jobInfo: jobInfo)
|
||||
if try await tryToUpdateUsingService(jobInfo: jobInfo) {
|
||||
return true
|
||||
}
|
||||
|
||||
// If we can't fetch via the service, store that result for reuse in future
|
||||
// invocations of this method.
|
||||
newestGuaranteedFailureJobId = newestJobId
|
||||
return false
|
||||
}
|
||||
|
||||
/// Try to apply a single embedded (peer-to-peer) update.
|
||||
|
||||
@ -33,6 +33,19 @@ struct GroupMessageProcessorJobStore {
|
||||
}
|
||||
}
|
||||
|
||||
func newestJobId(tx: DBReadTransaction) throws -> Int64? {
|
||||
do {
|
||||
let sql = """
|
||||
SELECT \(GroupMessageProcessorJob.CodingKeys.id.rawValue)
|
||||
FROM \(GroupMessageProcessorJob.databaseTableName)
|
||||
ORDER BY \(GroupMessageProcessorJob.CodingKeys.id.rawValue) DESC
|
||||
"""
|
||||
return try Int64.fetchOne(tx.database, sql: sql)
|
||||
} catch {
|
||||
throw error.grdbErrorForLogging
|
||||
}
|
||||
}
|
||||
|
||||
public func existsJob(forGroupId groupId: Data, tx: DBReadTransaction) throws -> Bool {
|
||||
let sql = """
|
||||
SELECT 1 FROM \(GroupMessageProcessorJob.databaseTableName)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user