如何在没有自定义订阅者的情况下调整 Combine 的发布者需求?

How to adjust Combine's Publisher demand without a custom Subscriber?

Combine framework there's a concept of a demand, which allows signalling backpressure to publishers.

假设我有一个简单的发布者:

let numbers = Publishers.Sequence<ClosedRange<Int>, Error>(sequence: 0...100)

我想下载某些使用这些数字作为参数的 URL。我还希望下一次下载仅在上一次下载完成后才开始。

一个天真的方法看起来像这样:

let subscription = numbers.sink(receiveCompletion: { _ in }, receiveValue: {
  let url = URL(string: "https://httpbin.org/get?value=\([=11=])")!
  URLSession.shared.dataTask(with: url) { 
    [=11=].map { print(String(data: [=11=], encoding: .utf8)!) }
  }.resume()
})

不幸的是,这不能满足在开始下一个下载之前等待上一个下载完成的要求。据我所知,sink 函数会 return AnyCancellable, not of type Subscription. If the latter was the case, we could call the request 类型的值 subscription 上的函数,在上传完成后具有特定需求。

控制 sink 或任何其他标准 Combine Subscriber 提供的订阅需求的最佳方法是什么?

事实证明,flatMap operator takes an additional maxPublishers argument that takes a Subscribers.Demand value. In combination with the Future 发布者,这允许 numbers 发布者等待,直到未来能够在发送下一个值之前处理给定值。

将此应用于原始代码,一个接一个地下载值将如下所示:

enum DownloadError: Error {
    case noData
}

let subscription = numbers.flatMap(maxPublishers: .max(1)) { number in
    Future { promise in
        let url = URL(string: "https://httpbin.org/get?value=\(number)")!
        URLSession.shared.dataTask(with: url) {
            switch ([=10=], ) {
            case let (data?, nil):
                promise(.success(data))
            case let (nil, error?):
                promise(.failure(error))
            default:
                promise(.failure(DownloadError.noData))
            }
        }.resume()
    }
}.sink(
    receiveCompletion: { _ in print("errors should be handled here") },
    receiveValue: { print(String(data: [=10=], encoding: .utf8)!) }
)