如何在没有自定义订阅者的情况下调整 Combine 的发布者需求?
How to adjust Combine's Publisher demand without a custom Subscriber?
在Combine framework there's a concept of a demand, which allows signalling backpressure to publishers.
假设我有一个简单的发布者:
let numbers = Publishers.Sequence<ClosedRange<Int>, Error>(sequence: 0...100)
我想下载某些使用这些数字作为参数的 URL。我还希望下一次下载仅在上一次下载完成后才开始。
一个天真的方法看起来像这样:
let subscription = numbers.sink(receiveCompletion: { _ in }, receiveValue: {
let url = URL(string: "https://httpbin.org/get?value=\([=11=])")!
URLSession.shared.dataTask(with: url) {
[=11=].map { print(String(data: [=11=], encoding: .utf8)!) }
}.resume()
})
不幸的是,这不能满足在开始下一个下载之前等待上一个下载完成的要求。据我所知,sink
函数会 return AnyCancellable
, not of type Subscription
. If the latter was the case, we could call the request
类型的值 subscription
上的函数,在上传完成后具有特定需求。
控制 sink
或任何其他标准 Combine Subscriber
提供的订阅需求的最佳方法是什么?
事实证明,flatMap
operator takes an additional maxPublishers
argument that takes a Subscribers.Demand
value. In combination with the Future
发布者,这允许 numbers
发布者等待,直到未来能够在发送下一个值之前处理给定值。
将此应用于原始代码,一个接一个地下载值将如下所示:
enum DownloadError: Error {
case noData
}
let subscription = numbers.flatMap(maxPublishers: .max(1)) { number in
Future { promise in
let url = URL(string: "https://httpbin.org/get?value=\(number)")!
URLSession.shared.dataTask(with: url) {
switch ([=10=], ) {
case let (data?, nil):
promise(.success(data))
case let (nil, error?):
promise(.failure(error))
default:
promise(.failure(DownloadError.noData))
}
}.resume()
}
}.sink(
receiveCompletion: { _ in print("errors should be handled here") },
receiveValue: { print(String(data: [=10=], encoding: .utf8)!) }
)
在Combine framework there's a concept of a demand, which allows signalling backpressure to publishers.
假设我有一个简单的发布者:
let numbers = Publishers.Sequence<ClosedRange<Int>, Error>(sequence: 0...100)
我想下载某些使用这些数字作为参数的 URL。我还希望下一次下载仅在上一次下载完成后才开始。
一个天真的方法看起来像这样:
let subscription = numbers.sink(receiveCompletion: { _ in }, receiveValue: {
let url = URL(string: "https://httpbin.org/get?value=\([=11=])")!
URLSession.shared.dataTask(with: url) {
[=11=].map { print(String(data: [=11=], encoding: .utf8)!) }
}.resume()
})
不幸的是,这不能满足在开始下一个下载之前等待上一个下载完成的要求。据我所知,sink
函数会 return AnyCancellable
, not of type Subscription
. If the latter was the case, we could call the request
类型的值 subscription
上的函数,在上传完成后具有特定需求。
控制 sink
或任何其他标准 Combine Subscriber
提供的订阅需求的最佳方法是什么?
事实证明,flatMap
operator takes an additional maxPublishers
argument that takes a Subscribers.Demand
value. In combination with the Future
发布者,这允许 numbers
发布者等待,直到未来能够在发送下一个值之前处理给定值。
将此应用于原始代码,一个接一个地下载值将如下所示:
enum DownloadError: Error {
case noData
}
let subscription = numbers.flatMap(maxPublishers: .max(1)) { number in
Future { promise in
let url = URL(string: "https://httpbin.org/get?value=\(number)")!
URLSession.shared.dataTask(with: url) {
switch ([=10=], ) {
case let (data?, nil):
promise(.success(data))
case let (nil, error?):
promise(.failure(error))
default:
promise(.failure(DownloadError.noData))
}
}.resume()
}
}.sink(
receiveCompletion: { _ in print("errors should be handled here") },
receiveValue: { print(String(data: [=10=], encoding: .utf8)!) }
)