如何在 Combine 中安排异步调用的同步序列?

How to schedule a synchronous sequence of asynchronous calls in Combine?

我想在我的应用程序中处理一系列网络调用。每个调用都是异步的,flatMap() 似乎是正确的调用。但是,flatMap 同时处理所有参数,我需要顺序调用——下一个网络调用仅在前一个调用完成后才开始。我查找了一个 RxSwift 但它需要 concatMap Combine 没有的运算符。这是我正在尝试做的事情的粗略概述,但是 flatMap 同时触发所有 myCalls

Publishers.Sequence(sequence: urls)
  .flatMap { url in
    Publishers.Future<Result, Error> { callback in 
        myCall { data, error in 
            if let data = data {
                callback(.success(data))
            } else if let error = error {
                callback(.failure(error))
            }
        }
    }
  }

在操场上试验了一段时间后,我相信我找到了解决办法,但如果你有更好的主意,请分享。解决方法是在flatMap中加入maxPublishers参数,设置值为max(1)

Publishers.Sequence(sequence: urls)
  .flatMap(maxPublishers: .max(1)) // <<<<--- here
  { url in 
    Publishers.Future<Result, Error> { callback in 
      myCall { data, error in 
        if let data = data {
          callback(.success(data))
        } else if let error = error {
          callback(.failure(error))
        }
      }
    }
  }

你也可以在 observable 上使用 prepend(_:) method 来创建连接序列,我想这类似于 RxSwift 中的 Observable.concat(:)

这是我尝试模拟您的用例的一个简单示例,其中我有几个不同的序列,一个接一个。

func dataTaskPublisher(_ urlString: String) -> AnyPublisher<(data: Data, response: URLResponse), Never> {
    let interceptedError = (Data(), URLResponse())
    return Publishers.Just(URL(string: urlString)!)
                        .flatMap {
                            URLSession.shared
                                        .dataTaskPublisher(for: [=10=])
                                        .replaceError(with: interceptedError)
                        }
                        .eraseToAnyPublisher()
}

let publisher: AnyPublisher<(data: Data, response: URLResponse), Never> = Publishers.Empty().eraseToAnyPublisher()


for urlString in [
    "http://ipv4.download.thinkbroadband.com/1MB.zip",
    "http://ipv4.download.thinkbroadband.com/50MB.zip",
    "http://ipv4.download.thinkbroadband.com/10MB.zip"
    ] {
        publisher = publisher.prepend(dataTaskPublisher(urlString)).eraseToAnyPublisher()
}

publisher.sink(receiveCompletion: { completion in
    print("Completed")
}) { response in
    print("Data: \(response)")
}

此处,prepend(_:) 运算符为序列添加前缀,因此,前置序列首先开始,完成并开始下一个序列。

如果您 运行 下面的代码,您应该看到首先下载 10 MB 的文件,然后是 50 MB,最后是 1 MB,因为最后一个前置文件首先开始,依此类推。

prepend(_:) 运算符还有其他变体,它接受数组,但似乎不能按顺序工作。