Combine框架:如何在继续之前异步处理数组的每个元素
Combine framework: how to process each element of array asynchronously before proceeding
我在使用 iOS Combine 框架时遇到了一些心理障碍。
我正在将一些代码从 "manual" 从远程 API 获取代码转换为使用 Combine。基本上,API 是 SQL 和 REST(实际上它是 Salesforce,但这与问题无关)。代码用来做的是调用一个带有完成处理程序的 REST 查询方法。我正在做的是用 Combine Future 替换它。到目前为止,还不错。
问题出现在以下场景发生时(而且经常发生):
我们执行 REST 查询并返回 "objects".
的数组
但是这些"objects"并没有完全填充。他们每个人都需要来自某个相关对象的额外数据。因此,对于每个 "object",我们使用来自 "object" 的信息执行另一个 REST 查询,从而为我们提供 另一个 数组 "objects".
这可能允许也可能不允许我们完成第一个 "objects" 的填充 — 否则,我们可能必须使用 另一个 REST 查询每个秒"object"的信息,依此类推。
结果很多代码结构是这样的(这是伪代码):
func fetchObjects(completion: @escaping ([Object] -> Void) {
let restQuery = ...
RESTClient.performQuery(restQuery) { results in
let partialObjects = results.map { ... }
let group = DispatchGroup()
for partialObject in partialObjects {
let restQuery = ... // something based on partialObject
group.enter()
RESTClient.performQuery(restQuery) { results in
group.leave()
let partialObjects2 = results.map { ... }
partialObject.property1 = // something from partialObjects2
partialObject.property2 = // something from partialObjects2
// and we could go down yet _another_ level in some cases
}
}
group.notify {
completion([partialObjects])
}
}
}
每次我在伪代码中说 results in
时,那是异步网络调用的完成处理程序。
好吧,我很清楚如何在 Combine 中链接异步调用,例如通过使用 Futures 和 flatMap
(又是伪代码):
let future1 = Future...
future1.map {
// do something
}.flatMap {
let future2 = Future...
return future2.map {
// do something
}
}
// ...
在那个代码中,我们形成 future2
的方式可以取决于我们从 future1
的执行中收到的值,而在 future2
上的 map
我们可以修改我们从上游接收到的内容,然后再向下传递到管道。没问题。都挺好看的。
但这并没有告诉我我在组合前代码中所做的事情,即循环。我在这里,在 loop 中执行 multiple 异步调用,在继续之前由 DispatchGroup 保持在适当的位置。问题是:
这样做的 Combine 模式是什么?
记住当时的情况。我有一些对象的 array。我想 loop 通过该数组,对循环中的 each 对象进行异步调用,异步获取新信息并在此基础上修改该对象, 在继续进行管道之前。每个循环可能涉及进一步的 嵌套 循环,甚至异步收集 更多 信息:
Fetch info from online database, it's an array
|
V
For each element in the array, fetch _more_ info, _that's_ an array
|
V
For each element in _that_ array, fetch _more_ info
|
V
Loop thru the accumulated info and populate that element of the original array
执行此操作的旧代码看起来很糟糕,充满了嵌套的完成处理程序和由 DispatchGroup enter
/leave
/notify
保留的循环。 但它奏效了。我无法让我的 Combine 代码以同样的方式工作。我该怎么做?基本上我的管道输出是一些东西的数组,我觉得我需要将该数组拆分成单独的元素,对每个元素做一些异步,然后将元素放回一个数组中.怎么样?
我解决此问题的方法有效,但无法扩展,尤其是当异步调用需要到达管道链中 back 几个步骤的信息时。我一直在做这样的事情(我从 得到这个想法):
一组对象从上游到达。
我将 flatMap
和 map
数组输入到发布者数组中,每个发布者都以 Future 为首,进一步获取与 one 相关的在线内容 对象,然后是生成 modified 对象的管道。
现在我有一组管道,每个管道生成一个对象。我 merge
该数组并从 flatMap
.
生成该发布者(一个 MergeMany)
我collect
将结果值返回到一个数组中。
但这看起来仍然需要很多工作,更糟糕的是,当每个子管道本身需要生成一个子管道数组时,它不会扩展。这一切都变得难以理解,过去很容易到达完成块的信息(由于 Swift 的范围规则)不再到达主管道中的后续步骤(或者很难到达,因为我通过了更大的以及管道中更大的元组)。
一定有一些简单的 Combine 模式可以做到这一点,但我完全想念它。请告诉我它是什么。
使用已接受的答案,我得到了这个结构:
head // [Entity]
.flatMap { entities -> AnyPublisher<Entity, Error> in
Publishers.Sequence(sequence: entities).eraseToAnyPublisher()
}.flatMap { entity -> AnyPublisher<Entity, Error> in
self.makeFuture(for: entity) // [Derivative]
.flatMap { derivatives -> AnyPublisher<Derivative, Error> in
Publishers.Sequence(sequence: derivatives).eraseToAnyPublisher()
}
.flatMap { derivative -> AnyPublisher<Derivative2, Error> in
self.makeFuture(for: derivative).eraseToAnyPublisher() // Derivative2
}.collect().map { derivative2s -> Entity in
self.configuredEntity(entity, from: derivative2s)
}.eraseToAnyPublisher()
}.collect()
这正是我一直在寻找的优雅密封性!所以想法是:
我们收到一个数组,我们需要异步处理每个元素。旧方法是一个 DispatchGroup 和一个 for...in
循环。合并等价物是:
for...in
行的等价物是flatMap
和Publishers.Sequence。
DispatchGroup(处理异步)的等价物是进一步的flatMap
(在单个元素上)和一些发布者。在我的例子中,我从一个基于我们刚刚收到的单个元素的 Future 开始。
相当于末尾的右花括号是collect()
,等待所有元素处理完毕再把数组重新组合起来
所以综上所述,模式是:
flatMap
数组到序列。
flatMap
对发布者启动异步操作的单个元素。
- 根据需要继续该发布者的链接。
collect
回数组
通过 嵌套 该模式,我们可以利用 Swift 范围规则将我们需要处理的事情保留在范围内,直到我们获得足够的信息来生成处理的对象。
您最近的编辑和下面的评论:
I literally am asking is there a Combine equivalent of "don't proceed to the next step until this step, involving multiple asynchronous steps, has finished"
我认为这种模式可以通过 .flatMap
到数组发布者 (Publishers.Sequence) 来实现,它一个接一个地发出并完成,然后是任何需要的每元素异步处理, 并以 .collect
结束,等待所有元素完成后再继续
所以,在代码中,假设我们有这些函数:
func getFoos() -> AnyPublisher<[Foo], Error>
func getPartials(for: Foo) -> AnyPublisher<[Partial], Error>
func getMoreInfo(for: Partial, of: Foo) -> AnyPublisher<MoreInfo, Error>
我们可以做到以下几点:
getFoos()
.flatMap { fooArr in
fooArr.publisher.setFailureType(to: Error.self)
}
// per-foo element async processing
.flatMap { foo in
getPartials(for: foo)
.flatMap { partialArr in
partialArr.publisher.setFailureType(to: Error.self)
}
// per-partial of foo async processing
.flatMap { partial in
getMoreInfo(for: partial, of: foo)
// build completed partial with more info
.map { moreInfo in
var newPartial = partial
newPartial.moreInfo = moreInfo
return newPartial
}
}
.collect()
// build completed foo with all partials
.map { partialArr in
var newFoo = foo
newFoo.partials = partialArr
return newFoo
}
}
.collect()
(删除旧答案)
我在使用 iOS Combine 框架时遇到了一些心理障碍。
我正在将一些代码从 "manual" 从远程 API 获取代码转换为使用 Combine。基本上,API 是 SQL 和 REST(实际上它是 Salesforce,但这与问题无关)。代码用来做的是调用一个带有完成处理程序的 REST 查询方法。我正在做的是用 Combine Future 替换它。到目前为止,还不错。
问题出现在以下场景发生时(而且经常发生):
我们执行 REST 查询并返回 "objects".
的数组
但是这些"objects"并没有完全填充。他们每个人都需要来自某个相关对象的额外数据。因此,对于每个 "object",我们使用来自 "object" 的信息执行另一个 REST 查询,从而为我们提供 另一个 数组 "objects".
这可能允许也可能不允许我们完成第一个 "objects" 的填充 — 否则,我们可能必须使用 另一个 REST 查询每个秒"object"的信息,依此类推。
结果很多代码结构是这样的(这是伪代码):
func fetchObjects(completion: @escaping ([Object] -> Void) {
let restQuery = ...
RESTClient.performQuery(restQuery) { results in
let partialObjects = results.map { ... }
let group = DispatchGroup()
for partialObject in partialObjects {
let restQuery = ... // something based on partialObject
group.enter()
RESTClient.performQuery(restQuery) { results in
group.leave()
let partialObjects2 = results.map { ... }
partialObject.property1 = // something from partialObjects2
partialObject.property2 = // something from partialObjects2
// and we could go down yet _another_ level in some cases
}
}
group.notify {
completion([partialObjects])
}
}
}
每次我在伪代码中说 results in
时,那是异步网络调用的完成处理程序。
好吧,我很清楚如何在 Combine 中链接异步调用,例如通过使用 Futures 和 flatMap
(又是伪代码):
let future1 = Future...
future1.map {
// do something
}.flatMap {
let future2 = Future...
return future2.map {
// do something
}
}
// ...
在那个代码中,我们形成 future2
的方式可以取决于我们从 future1
的执行中收到的值,而在 future2
上的 map
我们可以修改我们从上游接收到的内容,然后再向下传递到管道。没问题。都挺好看的。
但这并没有告诉我我在组合前代码中所做的事情,即循环。我在这里,在 loop 中执行 multiple 异步调用,在继续之前由 DispatchGroup 保持在适当的位置。问题是:
这样做的 Combine 模式是什么?
记住当时的情况。我有一些对象的 array。我想 loop 通过该数组,对循环中的 each 对象进行异步调用,异步获取新信息并在此基础上修改该对象, 在继续进行管道之前。每个循环可能涉及进一步的 嵌套 循环,甚至异步收集 更多 信息:
Fetch info from online database, it's an array
|
V
For each element in the array, fetch _more_ info, _that's_ an array
|
V
For each element in _that_ array, fetch _more_ info
|
V
Loop thru the accumulated info and populate that element of the original array
执行此操作的旧代码看起来很糟糕,充满了嵌套的完成处理程序和由 DispatchGroup enter
/leave
/notify
保留的循环。 但它奏效了。我无法让我的 Combine 代码以同样的方式工作。我该怎么做?基本上我的管道输出是一些东西的数组,我觉得我需要将该数组拆分成单独的元素,对每个元素做一些异步,然后将元素放回一个数组中.怎么样?
我解决此问题的方法有效,但无法扩展,尤其是当异步调用需要到达管道链中 back 几个步骤的信息时。我一直在做这样的事情(我从
一组对象从上游到达。
我将
flatMap
和map
数组输入到发布者数组中,每个发布者都以 Future 为首,进一步获取与 one 相关的在线内容 对象,然后是生成 modified 对象的管道。现在我有一组管道,每个管道生成一个对象。我
merge
该数组并从flatMap
. 生成该发布者(一个 MergeMany)
我
collect
将结果值返回到一个数组中。
但这看起来仍然需要很多工作,更糟糕的是,当每个子管道本身需要生成一个子管道数组时,它不会扩展。这一切都变得难以理解,过去很容易到达完成块的信息(由于 Swift 的范围规则)不再到达主管道中的后续步骤(或者很难到达,因为我通过了更大的以及管道中更大的元组)。
一定有一些简单的 Combine 模式可以做到这一点,但我完全想念它。请告诉我它是什么。
使用已接受的答案,我得到了这个结构:
head // [Entity]
.flatMap { entities -> AnyPublisher<Entity, Error> in
Publishers.Sequence(sequence: entities).eraseToAnyPublisher()
}.flatMap { entity -> AnyPublisher<Entity, Error> in
self.makeFuture(for: entity) // [Derivative]
.flatMap { derivatives -> AnyPublisher<Derivative, Error> in
Publishers.Sequence(sequence: derivatives).eraseToAnyPublisher()
}
.flatMap { derivative -> AnyPublisher<Derivative2, Error> in
self.makeFuture(for: derivative).eraseToAnyPublisher() // Derivative2
}.collect().map { derivative2s -> Entity in
self.configuredEntity(entity, from: derivative2s)
}.eraseToAnyPublisher()
}.collect()
这正是我一直在寻找的优雅密封性!所以想法是:
我们收到一个数组,我们需要异步处理每个元素。旧方法是一个 DispatchGroup 和一个 for...in
循环。合并等价物是:
for...in
行的等价物是flatMap
和Publishers.Sequence。DispatchGroup(处理异步)的等价物是进一步的
flatMap
(在单个元素上)和一些发布者。在我的例子中,我从一个基于我们刚刚收到的单个元素的 Future 开始。相当于末尾的右花括号是
collect()
,等待所有元素处理完毕再把数组重新组合起来
所以综上所述,模式是:
flatMap
数组到序列。flatMap
对发布者启动异步操作的单个元素。- 根据需要继续该发布者的链接。
collect
回数组
通过 嵌套 该模式,我们可以利用 Swift 范围规则将我们需要处理的事情保留在范围内,直到我们获得足够的信息来生成处理的对象。
您最近的编辑和下面的评论:
I literally am asking is there a Combine equivalent of "don't proceed to the next step until this step, involving multiple asynchronous steps, has finished"
我认为这种模式可以通过 .flatMap
到数组发布者 (Publishers.Sequence) 来实现,它一个接一个地发出并完成,然后是任何需要的每元素异步处理, 并以 .collect
结束,等待所有元素完成后再继续
所以,在代码中,假设我们有这些函数:
func getFoos() -> AnyPublisher<[Foo], Error>
func getPartials(for: Foo) -> AnyPublisher<[Partial], Error>
func getMoreInfo(for: Partial, of: Foo) -> AnyPublisher<MoreInfo, Error>
我们可以做到以下几点:
getFoos()
.flatMap { fooArr in
fooArr.publisher.setFailureType(to: Error.self)
}
// per-foo element async processing
.flatMap { foo in
getPartials(for: foo)
.flatMap { partialArr in
partialArr.publisher.setFailureType(to: Error.self)
}
// per-partial of foo async processing
.flatMap { partial in
getMoreInfo(for: partial, of: foo)
// build completed partial with more info
.map { moreInfo in
var newPartial = partial
newPartial.moreInfo = moreInfo
return newPartial
}
}
.collect()
// build completed foo with all partials
.map { partialArr in
var newFoo = foo
newFoo.partials = partialArr
return newFoo
}
}
.collect()
(删除旧答案)