262 lines
8.7 KiB
Swift
262 lines
8.7 KiB
Swift
import Foundation
|
||
import Dispatch
|
||
|
||
private func _when<T>(_ promises: [Promise<T>]) -> Promise<Void> {
|
||
let root = Promise<Void>.pending()
|
||
var countdown = promises.count
|
||
guard countdown > 0 else {
|
||
#if swift(>=4.0)
|
||
root.fulfill(())
|
||
#else
|
||
root.fulfill()
|
||
#endif
|
||
return root.promise
|
||
}
|
||
|
||
#if PMKDisableProgress || os(Linux)
|
||
var progress: (completedUnitCount: Int, totalUnitCount: Int) = (0, 0)
|
||
#else
|
||
let progress = Progress(totalUnitCount: Int64(promises.count))
|
||
progress.isCancellable = false
|
||
progress.isPausable = false
|
||
#endif
|
||
|
||
let barrier = DispatchQueue(label: "org.promisekit.barrier.when", attributes: .concurrent)
|
||
|
||
for promise in promises {
|
||
promise.state.pipe { resolution in
|
||
barrier.sync(flags: .barrier) {
|
||
switch resolution {
|
||
case .rejected(let error, let token):
|
||
token.consumed = true
|
||
if root.promise.isPending {
|
||
progress.completedUnitCount = progress.totalUnitCount
|
||
root.reject(error)
|
||
}
|
||
case .fulfilled:
|
||
guard root.promise.isPending else { return }
|
||
progress.completedUnitCount += 1
|
||
countdown -= 1
|
||
if countdown == 0 {
|
||
#if swift(>=4.0)
|
||
root.fulfill(())
|
||
#else
|
||
root.fulfill()
|
||
#endif
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
return root.promise
|
||
}
|
||
|
||
/**
|
||
Wait for all promises in a set to fulfill.
|
||
|
||
For example:
|
||
|
||
when(fulfilled: promise1, promise2).then { results in
|
||
//…
|
||
}.catch { error in
|
||
switch error {
|
||
case URLError.notConnectedToInternet:
|
||
//…
|
||
case CLError.denied:
|
||
//…
|
||
}
|
||
}
|
||
|
||
- Note: If *any* of the provided promises reject, the returned promise is immediately rejected with that error.
|
||
- Warning: In the event of rejection the other promises will continue to resolve and, as per any other promise, will either fulfill or reject. This is the right pattern for `getter` style asynchronous tasks, but often for `setter` tasks (eg. storing data on a server), you most likely will need to wait on all tasks and then act based on which have succeeded and which have failed, in such situations use `when(resolved:)`.
|
||
- Parameter promises: The promises upon which to wait before the returned promise resolves.
|
||
- Returns: A new promise that resolves when all the provided promises fulfill or one of the provided promises rejects.
|
||
- Note: `when` provides `NSProgress`.
|
||
- SeeAlso: `when(resolved:)`
|
||
*/
|
||
public func when<T>(fulfilled promises: [Promise<T>]) -> Promise<[T]> {
|
||
return _when(promises).then(on: zalgo) { promises.map{ $0.value! } }
|
||
}
|
||
|
||
/// Wait for all promises in a set to fulfill.
|
||
public func when(fulfilled promises: Promise<Void>...) -> Promise<Void> {
|
||
return _when(promises)
|
||
}
|
||
|
||
/// Wait for all promises in a set to fulfill.
|
||
public func when(fulfilled promises: [Promise<Void>]) -> Promise<Void> {
|
||
return _when(promises)
|
||
}
|
||
|
||
/// Wait for all promises in a set to fulfill.
|
||
public func when<U, V>(fulfilled pu: Promise<U>, _ pv: Promise<V>) -> Promise<(U, V)> {
|
||
return _when([pu.asVoid(), pv.asVoid()]).then(on: zalgo) { (pu.value!, pv.value!) }
|
||
}
|
||
|
||
/// Wait for all promises in a set to fulfill.
|
||
public func when<U, V, W>(fulfilled pu: Promise<U>, _ pv: Promise<V>, _ pw: Promise<W>) -> Promise<(U, V, W)> {
|
||
return _when([pu.asVoid(), pv.asVoid(), pw.asVoid()]).then(on: zalgo) { (pu.value!, pv.value!, pw.value!) }
|
||
}
|
||
|
||
/// Wait for all promises in a set to fulfill.
|
||
public func when<U, V, W, X>(fulfilled pu: Promise<U>, _ pv: Promise<V>, _ pw: Promise<W>, _ px: Promise<X>) -> Promise<(U, V, W, X)> {
|
||
return _when([pu.asVoid(), pv.asVoid(), pw.asVoid(), px.asVoid()]).then(on: zalgo) { (pu.value!, pv.value!, pw.value!, px.value!) }
|
||
}
|
||
|
||
/// Wait for all promises in a set to fulfill.
|
||
public func when<U, V, W, X, Y>(fulfilled pu: Promise<U>, _ pv: Promise<V>, _ pw: Promise<W>, _ px: Promise<X>, _ py: Promise<Y>) -> Promise<(U, V, W, X, Y)> {
|
||
return _when([pu.asVoid(), pv.asVoid(), pw.asVoid(), px.asVoid(), py.asVoid()]).then(on: zalgo) { (pu.value!, pv.value!, pw.value!, px.value!, py.value!) }
|
||
}
|
||
|
||
/**
|
||
Generate promises at a limited rate and wait for all to fulfill.
|
||
|
||
For example:
|
||
|
||
func downloadFile(url: URL) -> Promise<Data> {
|
||
// ...
|
||
}
|
||
|
||
let urls: [URL] = /*…*/
|
||
let urlGenerator = urls.makeIterator()
|
||
|
||
let generator = AnyIterator<Promise<Data>> {
|
||
guard url = urlGenerator.next() else {
|
||
return nil
|
||
}
|
||
|
||
return downloadFile(url)
|
||
}
|
||
|
||
when(generator, concurrently: 3).then { datum: [Data] -> Void in
|
||
// ...
|
||
}
|
||
|
||
- Warning: Refer to the warnings on `when(fulfilled:)`
|
||
- Parameter promiseGenerator: Generator of promises.
|
||
- Returns: A new promise that resolves when all the provided promises fulfill or one of the provided promises rejects.
|
||
- SeeAlso: `when(resolved:)`
|
||
*/
|
||
|
||
public func when<T, PromiseIterator: IteratorProtocol>(fulfilled promiseIterator: PromiseIterator, concurrently: Int) -> Promise<[T]> where PromiseIterator.Element == Promise<T> {
|
||
|
||
guard concurrently > 0 else {
|
||
return Promise(error: PMKError.whenConcurrentlyZero)
|
||
}
|
||
|
||
var generator = promiseIterator
|
||
var root = Promise<[T]>.pending()
|
||
var pendingPromises = 0
|
||
var promises: [Promise<T>] = []
|
||
|
||
let barrier = DispatchQueue(label: "org.promisekit.barrier.when", attributes: [.concurrent])
|
||
|
||
func dequeue() {
|
||
guard root.promise.isPending else { return } // don’t continue dequeueing if root has been rejected
|
||
|
||
var shouldDequeue = false
|
||
barrier.sync {
|
||
shouldDequeue = pendingPromises < concurrently
|
||
}
|
||
guard shouldDequeue else { return }
|
||
|
||
var index: Int!
|
||
var promise: Promise<T>!
|
||
|
||
barrier.sync(flags: .barrier) {
|
||
guard let next = generator.next() else { return }
|
||
|
||
promise = next
|
||
index = promises.count
|
||
|
||
pendingPromises += 1
|
||
promises.append(next)
|
||
}
|
||
|
||
func testDone() {
|
||
barrier.sync {
|
||
if pendingPromises == 0 {
|
||
#if !swift(>=3.3) || (swift(>=4) && !swift(>=4.1))
|
||
root.fulfill(promises.flatMap{ $0.value })
|
||
#else
|
||
root.fulfill(promises.compactMap{ $0.value })
|
||
#endif
|
||
}
|
||
}
|
||
}
|
||
|
||
guard promise != nil else {
|
||
return testDone()
|
||
}
|
||
|
||
promise.state.pipe { resolution in
|
||
barrier.sync(flags: .barrier) {
|
||
pendingPromises -= 1
|
||
}
|
||
|
||
switch resolution {
|
||
case .fulfilled:
|
||
dequeue()
|
||
testDone()
|
||
case .rejected(let error, let token):
|
||
token.consumed = true
|
||
root.reject(error)
|
||
}
|
||
}
|
||
|
||
dequeue()
|
||
}
|
||
|
||
dequeue()
|
||
|
||
return root.promise
|
||
}
|
||
|
||
/**
|
||
Waits on all provided promises.
|
||
|
||
`when(fulfilled:)` rejects as soon as one of the provided promises rejects. `when(resolved:)` waits on all provided promises and **never** rejects.
|
||
|
||
when(resolved: promise1, promise2, promise3).then { results in
|
||
for result in results where case .fulfilled(let value) {
|
||
//…
|
||
}
|
||
}.catch { error in
|
||
// invalid! Never rejects
|
||
}
|
||
|
||
- Returns: A new promise that resolves once all the provided promises resolve.
|
||
- Warning: The returned promise can *not* be rejected.
|
||
- Note: Any promises that error are implicitly consumed, your UnhandledErrorHandler will not be called.
|
||
*/
|
||
public func when<T>(resolved promises: Promise<T>...) -> Promise<[Result<T>]> {
|
||
return when(resolved: promises)
|
||
}
|
||
|
||
/// Waits on all provided promises.
|
||
public func when<T>(resolved promises: [Promise<T>]) -> Promise<[Result<T>]> {
|
||
guard !promises.isEmpty else { return Promise(value: []) }
|
||
|
||
var countdown = promises.count
|
||
let barrier = DispatchQueue(label: "org.promisekit.barrier.join", attributes: .concurrent)
|
||
|
||
return Promise { fulfill, reject in
|
||
for promise in promises {
|
||
promise.state.pipe { resolution in
|
||
if case .rejected(_, let token) = resolution {
|
||
token.consumed = true // all errors are implicitly consumed
|
||
}
|
||
var done = false
|
||
barrier.sync(flags: .barrier) {
|
||
countdown -= 1
|
||
done = countdown == 0
|
||
}
|
||
if done {
|
||
fulfill(promises.map { Result($0.state.get()!) })
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|