PromiseKit/Sources/when.swift
2018-06-04 19:23:16 -04:00

262 lines
8.7 KiB
Swift
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import Foundation
import Dispatch
private func _when<T>(_ promises: [Promise<T>]) -> Promise<Void> {
let root = Promise<Void>.pending()
var countdown = promises.count
guard countdown > 0 else {
#if swift(>=4.0)
root.fulfill(())
#else
root.fulfill()
#endif
return root.promise
}
#if PMKDisableProgress || os(Linux)
var progress: (completedUnitCount: Int, totalUnitCount: Int) = (0, 0)
#else
let progress = Progress(totalUnitCount: Int64(promises.count))
progress.isCancellable = false
progress.isPausable = false
#endif
let barrier = DispatchQueue(label: "org.promisekit.barrier.when", attributes: .concurrent)
for promise in promises {
promise.state.pipe { resolution in
barrier.sync(flags: .barrier) {
switch resolution {
case .rejected(let error, let token):
token.consumed = true
if root.promise.isPending {
progress.completedUnitCount = progress.totalUnitCount
root.reject(error)
}
case .fulfilled:
guard root.promise.isPending else { return }
progress.completedUnitCount += 1
countdown -= 1
if countdown == 0 {
#if swift(>=4.0)
root.fulfill(())
#else
root.fulfill()
#endif
}
}
}
}
}
return root.promise
}
/**
Wait for all promises in a set to fulfill.
For example:
when(fulfilled: promise1, promise2).then { results in
//
}.catch { error in
switch error {
case URLError.notConnectedToInternet:
//
case CLError.denied:
//
}
}
- Note: If *any* of the provided promises reject, the returned promise is immediately rejected with that error.
- Warning: In the event of rejection the other promises will continue to resolve and, as per any other promise, will either fulfill or reject. This is the right pattern for `getter` style asynchronous tasks, but often for `setter` tasks (eg. storing data on a server), you most likely will need to wait on all tasks and then act based on which have succeeded and which have failed, in such situations use `when(resolved:)`.
- Parameter promises: The promises upon which to wait before the returned promise resolves.
- Returns: A new promise that resolves when all the provided promises fulfill or one of the provided promises rejects.
- Note: `when` provides `NSProgress`.
- SeeAlso: `when(resolved:)`
*/
public func when<T>(fulfilled promises: [Promise<T>]) -> Promise<[T]> {
return _when(promises).then(on: zalgo) { promises.map{ $0.value! } }
}
/// Wait for all promises in a set to fulfill.
public func when(fulfilled promises: Promise<Void>...) -> Promise<Void> {
return _when(promises)
}
/// Wait for all promises in a set to fulfill.
public func when(fulfilled promises: [Promise<Void>]) -> Promise<Void> {
return _when(promises)
}
/// Wait for all promises in a set to fulfill.
public func when<U, V>(fulfilled pu: Promise<U>, _ pv: Promise<V>) -> Promise<(U, V)> {
return _when([pu.asVoid(), pv.asVoid()]).then(on: zalgo) { (pu.value!, pv.value!) }
}
/// Wait for all promises in a set to fulfill.
public func when<U, V, W>(fulfilled pu: Promise<U>, _ pv: Promise<V>, _ pw: Promise<W>) -> Promise<(U, V, W)> {
return _when([pu.asVoid(), pv.asVoid(), pw.asVoid()]).then(on: zalgo) { (pu.value!, pv.value!, pw.value!) }
}
/// Wait for all promises in a set to fulfill.
public func when<U, V, W, X>(fulfilled pu: Promise<U>, _ pv: Promise<V>, _ pw: Promise<W>, _ px: Promise<X>) -> Promise<(U, V, W, X)> {
return _when([pu.asVoid(), pv.asVoid(), pw.asVoid(), px.asVoid()]).then(on: zalgo) { (pu.value!, pv.value!, pw.value!, px.value!) }
}
/// Wait for all promises in a set to fulfill.
public func when<U, V, W, X, Y>(fulfilled pu: Promise<U>, _ pv: Promise<V>, _ pw: Promise<W>, _ px: Promise<X>, _ py: Promise<Y>) -> Promise<(U, V, W, X, Y)> {
return _when([pu.asVoid(), pv.asVoid(), pw.asVoid(), px.asVoid(), py.asVoid()]).then(on: zalgo) { (pu.value!, pv.value!, pw.value!, px.value!, py.value!) }
}
/**
Generate promises at a limited rate and wait for all to fulfill.
For example:
func downloadFile(url: URL) -> Promise<Data> {
// ...
}
let urls: [URL] = /**/
let urlGenerator = urls.makeIterator()
let generator = AnyIterator<Promise<Data>> {
guard url = urlGenerator.next() else {
return nil
}
return downloadFile(url)
}
when(generator, concurrently: 3).then { datum: [Data] -> Void in
// ...
}
- Warning: Refer to the warnings on `when(fulfilled:)`
- Parameter promiseGenerator: Generator of promises.
- Returns: A new promise that resolves when all the provided promises fulfill or one of the provided promises rejects.
- SeeAlso: `when(resolved:)`
*/
public func when<T, PromiseIterator: IteratorProtocol>(fulfilled promiseIterator: PromiseIterator, concurrently: Int) -> Promise<[T]> where PromiseIterator.Element == Promise<T> {
guard concurrently > 0 else {
return Promise(error: PMKError.whenConcurrentlyZero)
}
var generator = promiseIterator
var root = Promise<[T]>.pending()
var pendingPromises = 0
var promises: [Promise<T>] = []
let barrier = DispatchQueue(label: "org.promisekit.barrier.when", attributes: [.concurrent])
func dequeue() {
guard root.promise.isPending else { return } // dont continue dequeueing if root has been rejected
var shouldDequeue = false
barrier.sync {
shouldDequeue = pendingPromises < concurrently
}
guard shouldDequeue else { return }
var index: Int!
var promise: Promise<T>!
barrier.sync(flags: .barrier) {
guard let next = generator.next() else { return }
promise = next
index = promises.count
pendingPromises += 1
promises.append(next)
}
func testDone() {
barrier.sync {
if pendingPromises == 0 {
#if !swift(>=3.3) || (swift(>=4) && !swift(>=4.1))
root.fulfill(promises.flatMap{ $0.value })
#else
root.fulfill(promises.compactMap{ $0.value })
#endif
}
}
}
guard promise != nil else {
return testDone()
}
promise.state.pipe { resolution in
barrier.sync(flags: .barrier) {
pendingPromises -= 1
}
switch resolution {
case .fulfilled:
dequeue()
testDone()
case .rejected(let error, let token):
token.consumed = true
root.reject(error)
}
}
dequeue()
}
dequeue()
return root.promise
}
/**
Waits on all provided promises.
`when(fulfilled:)` rejects as soon as one of the provided promises rejects. `when(resolved:)` waits on all provided promises and **never** rejects.
when(resolved: promise1, promise2, promise3).then { results in
for result in results where case .fulfilled(let value) {
//
}
}.catch { error in
// invalid! Never rejects
}
- Returns: A new promise that resolves once all the provided promises resolve.
- Warning: The returned promise can *not* be rejected.
- Note: Any promises that error are implicitly consumed, your UnhandledErrorHandler will not be called.
*/
public func when<T>(resolved promises: Promise<T>...) -> Promise<[Result<T>]> {
return when(resolved: promises)
}
/// Waits on all provided promises.
public func when<T>(resolved promises: [Promise<T>]) -> Promise<[Result<T>]> {
guard !promises.isEmpty else { return Promise(value: []) }
var countdown = promises.count
let barrier = DispatchQueue(label: "org.promisekit.barrier.join", attributes: .concurrent)
return Promise { fulfill, reject in
for promise in promises {
promise.state.pipe { resolution in
if case .rejected(_, let token) = resolution {
token.consumed = true // all errors are implicitly consumed
}
var done = false
barrier.sync(flags: .barrier) {
countdown -= 1
done = countdown == 0
}
if done {
fulfill(promises.map { Result($0.state.get()!) })
}
}
}
}
}