如何在 Combine 中安排异步调用的同步序列?
How to schedule a synchronous sequence of asynchronous calls in Combine?
我想在我的应用程序中处理一系列网络调用。每个调用都是异步的,flatMap()
似乎是正确的调用。但是,flatMap
同时处理所有参数,我需要顺序调用——下一个网络调用仅在前一个调用完成后才开始。我查找了一个 RxSwift 但它需要 concatMap
Combine 没有的运算符。这是我正在尝试做的事情的粗略概述,但是 flatMap
同时触发所有 myCalls
。
Publishers.Sequence(sequence: urls)
.flatMap { url in
Publishers.Future<Result, Error> { callback in
myCall { data, error in
if let data = data {
callback(.success(data))
} else if let error = error {
callback(.failure(error))
}
}
}
}
在操场上试验了一段时间后,我相信我找到了解决办法,但如果你有更好的主意,请分享。解决方法是在flatMap
中加入maxPublishers
参数,设置值为max(1)
Publishers.Sequence(sequence: urls)
.flatMap(maxPublishers: .max(1)) // <<<<--- here
{ url in
Publishers.Future<Result, Error> { callback in
myCall { data, error in
if let data = data {
callback(.success(data))
} else if let error = error {
callback(.failure(error))
}
}
}
}
你也可以在 observable 上使用 prepend(_:)
method 来创建连接序列,我想这类似于 RxSwift 中的 Observable.concat(:)
。
这是我尝试模拟您的用例的一个简单示例,其中我有几个不同的序列,一个接一个。
func dataTaskPublisher(_ urlString: String) -> AnyPublisher<(data: Data, response: URLResponse), Never> {
let interceptedError = (Data(), URLResponse())
return Publishers.Just(URL(string: urlString)!)
.flatMap {
URLSession.shared
.dataTaskPublisher(for: [=10=])
.replaceError(with: interceptedError)
}
.eraseToAnyPublisher()
}
let publisher: AnyPublisher<(data: Data, response: URLResponse), Never> = Publishers.Empty().eraseToAnyPublisher()
for urlString in [
"http://ipv4.download.thinkbroadband.com/1MB.zip",
"http://ipv4.download.thinkbroadband.com/50MB.zip",
"http://ipv4.download.thinkbroadband.com/10MB.zip"
] {
publisher = publisher.prepend(dataTaskPublisher(urlString)).eraseToAnyPublisher()
}
publisher.sink(receiveCompletion: { completion in
print("Completed")
}) { response in
print("Data: \(response)")
}
此处,prepend(_:)
运算符为序列添加前缀,因此,前置序列首先开始,完成并开始下一个序列。
如果您 运行 下面的代码,您应该看到首先下载 10 MB 的文件,然后是 50 MB,最后是 1 MB,因为最后一个前置文件首先开始,依此类推。
prepend(_:)
运算符还有其他变体,它接受数组,但似乎不能按顺序工作。
我想在我的应用程序中处理一系列网络调用。每个调用都是异步的,flatMap()
似乎是正确的调用。但是,flatMap
同时处理所有参数,我需要顺序调用——下一个网络调用仅在前一个调用完成后才开始。我查找了一个 RxSwift concatMap
Combine 没有的运算符。这是我正在尝试做的事情的粗略概述,但是 flatMap
同时触发所有 myCalls
。
Publishers.Sequence(sequence: urls)
.flatMap { url in
Publishers.Future<Result, Error> { callback in
myCall { data, error in
if let data = data {
callback(.success(data))
} else if let error = error {
callback(.failure(error))
}
}
}
}
在操场上试验了一段时间后,我相信我找到了解决办法,但如果你有更好的主意,请分享。解决方法是在flatMap
中加入maxPublishers
参数,设置值为max(1)
Publishers.Sequence(sequence: urls)
.flatMap(maxPublishers: .max(1)) // <<<<--- here
{ url in
Publishers.Future<Result, Error> { callback in
myCall { data, error in
if let data = data {
callback(.success(data))
} else if let error = error {
callback(.failure(error))
}
}
}
}
你也可以在 observable 上使用 prepend(_:)
method 来创建连接序列,我想这类似于 RxSwift 中的 Observable.concat(:)
。
这是我尝试模拟您的用例的一个简单示例,其中我有几个不同的序列,一个接一个。
func dataTaskPublisher(_ urlString: String) -> AnyPublisher<(data: Data, response: URLResponse), Never> {
let interceptedError = (Data(), URLResponse())
return Publishers.Just(URL(string: urlString)!)
.flatMap {
URLSession.shared
.dataTaskPublisher(for: [=10=])
.replaceError(with: interceptedError)
}
.eraseToAnyPublisher()
}
let publisher: AnyPublisher<(data: Data, response: URLResponse), Never> = Publishers.Empty().eraseToAnyPublisher()
for urlString in [
"http://ipv4.download.thinkbroadband.com/1MB.zip",
"http://ipv4.download.thinkbroadband.com/50MB.zip",
"http://ipv4.download.thinkbroadband.com/10MB.zip"
] {
publisher = publisher.prepend(dataTaskPublisher(urlString)).eraseToAnyPublisher()
}
publisher.sink(receiveCompletion: { completion in
print("Completed")
}) { response in
print("Data: \(response)")
}
此处,prepend(_:)
运算符为序列添加前缀,因此,前置序列首先开始,完成并开始下一个序列。
如果您 运行 下面的代码,您应该看到首先下载 10 MB 的文件,然后是 50 MB,最后是 1 MB,因为最后一个前置文件首先开始,依此类推。
prepend(_:)
运算符还有其他变体,它接受数组,但似乎不能按顺序工作。