Move some v2 upload code

This commit is contained in:
Pete Walters 2024-07-26 11:39:45 -05:00 committed by GitHub
parent 97dc9a380b
commit 731237981f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 59 additions and 67 deletions

View File

@ -20,11 +20,11 @@ public struct AttachmentUpload<Metadata: UploadMetadata> {
private let logger: PrefixedLogger
private let localMetadata: Metadata
private let formSource: Upload.FormSource
private let uploadForm: Upload.Form
public init(
localMetadata: Metadata,
formSource: Upload.FormSource,
uploadForm: Upload.Form,
signalService: OWSSignalServiceProtocol,
networkManager: NetworkManager,
chatConnectionManager: ChatConnectionManager,
@ -32,7 +32,7 @@ public struct AttachmentUpload<Metadata: UploadMetadata> {
logger: PrefixedLogger
) {
self.localMetadata = localMetadata
self.formSource = formSource
self.uploadForm = uploadForm
self.signalService = signalService
self.networkManager = networkManager
self.chatConnectionManager = chatConnectionManager
@ -50,74 +50,35 @@ public struct AttachmentUpload<Metadata: UploadMetadata> {
progress?(buildProgress(done: 0, total: localMetadata.encryptedDataLength))
return try await attemptUpload(localMetadata: localMetadata, formSource: formSource, progress: progress)
return try await attemptUpload(localMetadata: localMetadata, uploadForm: uploadForm, progress: progress)
}
/// The retriable parts of the upload.
/// 1. Fetch upload form
/// 2. Create upload endpoint
/// 3. Get the target URL from the endpoint
/// 4. Initate the upload via the endpoint
/// 1. Create upload endpoint
/// 2. Get the target URL from the endpoint
/// 3. Initate the upload via the endpoint
///
/// - Parameters:
/// - localMetadata: The metadata and URL path for the local upload data
/// will create a new request and fetch a new form.
/// - count: For certain failure types, the attempt will be retried. This keeps track and will error
/// out after a certain number of retries.
/// - progressBlock: Callback notified up upload progress.
/// - returns: `Upload.Result` reflecting the metadata of the final upload result.
///
/// Resumption of an active upload can be handled at a lower level, but if the endpoint returns an
/// error that requires a full restart, this is the method that will be called to fetch a new upload form and
/// rebuild the endpoint and upload state before trying again
private func attemptUpload(
localMetadata: Metadata,
formSource: Upload.FormSource,
count: UInt = 0,
uploadForm: Upload.Form,
progress: Upload.ProgressBlock?
) async throws -> Upload.Result<Metadata> {
logger.info("Begin upload.")
do {
let form: Upload.Form
switch formSource {
case .local(let localForm):
form = localForm
case .remote:
let formRequest = Upload.FormRequest(
signalService: signalService,
networkManager: networkManager,
chatConnectionManager: chatConnectionManager
)
form = try await formRequest.start()
}
let attempt = try await buildAttempt(for: localMetadata, form: form, logger: logger)
try await performResumableUpload(attempt: attempt, progress: progress)
return Upload.Result(
cdnKey: attempt.cdnKey,
cdnNumber: attempt.cdnNumber,
localUploadMetadata: localMetadata,
beginTimestamp: attempt.beginTimestamp,
finishTimestamp: Date().ows_millisecondsSince1970
)
} catch {
// Anything besides 'restart' should be handled below this method,
// or is an unhandled error that should be thrown to the caller
if
case Upload.Error.uploadFailure(let recoveryMode) = error,
case .restart(let backOff) = recoveryMode
{
switch backOff {
case .immediately:
break
case .afterDelay(let delay):
try await sleep(for: delay)
}
return try await attemptUpload(localMetadata: localMetadata, formSource: formSource, count: count + 1, progress: progress)
} else {
throw error
}
}
let attempt = try await buildAttempt(for: localMetadata, form: uploadForm, logger: logger)
try await performResumableUpload(attempt: attempt, progress: progress)
return Upload.Result(
cdnKey: attempt.cdnKey,
cdnNumber: attempt.cdnNumber,
localUploadMetadata: localMetadata,
beginTimestamp: attempt.beginTimestamp,
finishTimestamp: Date().ows_millisecondsSince1970
)
}
/// Consult the UploadEndpoint to determine how much has already been uploaded.
@ -222,7 +183,7 @@ public struct AttachmentUpload<Metadata: UploadMetadata> {
attempt.logger.warn("Retry upload immediately.")
case .afterDelay(let delay):
attempt.logger.warn("Retry upload after \(delay) seconds.")
try await sleep(for: delay)
try await Upload.sleep(for: delay)
}
case .restart:
// Restart is handled at a higher level since the whole
@ -281,11 +242,6 @@ public struct AttachmentUpload<Metadata: UploadMetadata> {
).awaitable()
}
private func sleep(for delay: TimeInterval) async throws {
let delayInNs = UInt64(delay * Double(NSEC_PER_SEC))
try await Task.sleep(nanoseconds: delayInNs)
}
private func buildProgress(done: Int, total: UInt32) -> Progress {
let progress = Progress(parent: nil, userInfo: nil)
progress.totalUnitCount = Int64(total)
@ -294,6 +250,13 @@ public struct AttachmentUpload<Metadata: UploadMetadata> {
}
}
extension Upload {
static func sleep(for delay: TimeInterval) async throws {
let delayInNs = UInt64(delay * Double(NSEC_PER_SEC))
try await Task.sleep(nanoseconds: delayInNs)
}
}
extension Upload {
struct FormRequest {
private let signalService: OWSSignalServiceProtocol

View File

@ -59,7 +59,7 @@ public actor AttachmentUploadManagerImpl: AttachmentUploadManager {
do {
let upload = AttachmentUpload(
localMetadata: localUploadMetadata,
formSource: .local(form),
uploadForm: form,
signalService: signalService,
networkManager: networkManager,
chatConnectionManager: chatConnectionManager,
@ -92,11 +92,16 @@ public actor AttachmentUploadManagerImpl: AttachmentUploadManager {
}
let metadata = try attachmentEncrypter.encryptAttachment(at: sourceURL, output: temporaryFile)
let localMetadata = try Upload.LocalUploadMetadata.validateAndBuild(fileUrl: temporaryFile, metadata: metadata)
let form = try await Upload.FormRequest(
signalService: signalService,
networkManager: networkManager,
chatConnectionManager: chatConnectionManager
).start()
do {
let upload = AttachmentUpload(
localMetadata: localMetadata,
formSource: .remote,
uploadForm: form,
signalService: signalService,
networkManager: networkManager,
chatConnectionManager: chatConnectionManager,
@ -123,11 +128,15 @@ public actor AttachmentUploadManagerImpl: AttachmentUploadManager {
}
/// Entry point for uploading an `AttachmentStream`
/// Fetches the `AttachmentStream`, builds the AttachmentUpload, begins the
/// Fetches the `AttachmentStream`, fetches an upload form, builds the AttachmentUpload, begins the
/// upload, and updates the `AttachmentStream` upon success.
///
/// It is assumed any errors that could be retried or otherwise handled will have happend at a lower level,
/// so any error encountered here is considered unrecoverable and thrown to the caller.
///
/// Resumption of an active upload can be handled at a lower level, but if the endpoint returns an
/// error that requires a full restart, this is the method that will be called to fetch a new upload form and
/// rebuild the endpoint and upload state before trying again.
public func uploadAttachment(attachmentId: Attachment.IDType) async throws {
let logger = PrefixedLogger(prefix: "[Upload]", suffix: "[\(attachmentId)]")
@ -150,9 +159,15 @@ public actor AttachmentUploadManagerImpl: AttachmentUploadManager {
}
do {
let form = try await Upload.FormRequest(
signalService: signalService,
networkManager: networkManager,
chatConnectionManager: chatConnectionManager
).start()
let upload = AttachmentUpload(
localMetadata: localMetadata,
formSource: .remote,
uploadForm: form,
signalService: signalService,
networkManager: networkManager,
chatConnectionManager: chatConnectionManager,
@ -172,7 +187,21 @@ public actor AttachmentUploadManagerImpl: AttachmentUploadManager {
)
} catch {
if error.isNetworkFailureOrTimeout {
// Anything besides 'restart' should be handled below this method,
// or is an unhandled error that should be thrown to the caller
if
case Upload.Error.uploadFailure(let recoveryMode) = error,
case .restart(let backOff) = recoveryMode
{
switch backOff {
case .immediately:
break
case .afterDelay(let delay):
try await Upload.sleep(for: delay)
}
return try await uploadAttachment(attachmentId: attachmentId)
} else if error.isNetworkFailureOrTimeout {
logger.warn("Upload failed due to network error")
} else if error is CancellationError {
logger.warn("Upload cancelled")