Combine框架:如何在继续之前异步处理数组的每个元素

Combine framework: how to process each element of array asynchronously before proceeding

我在使用 iOS Combine 框架时遇到了一些心理障碍。

我正在将一些代码从 "manual" 从远程 API 获取代码转换为使用 Combine。基本上,API 是 SQL 和 REST(实际上它是 Salesforce,但这与问题无关)。代码用来做的是调用一个带有完成处理程序的 REST 查询方法。我正在做的是用 Combine Future 替换它。到目前为止,还不错。

问题出现在以下场景发生时(而且经常发生):

  1. 我们执行 REST 查询并返回 "objects".

  2. 的数组
  3. 但是这些"objects"并没有完全填充。他们每个人都需要来自某个相关对象的额外数据。因此,对于每个 "object",我们使用来自 "object" 的信息执行另一个 REST 查询,从而为我们提供 另一个 数组 "objects".

  4. 这可能允许也可能不允许我们完成第一个 "objects" 的填充 — 否则,我们可能必须使用 另一个 REST 查询每个"object"的信息,依此类推。

结果很多代码结构是这样的(这是伪代码):

func fetchObjects(completion: @escaping ([Object] -> Void) {
    let restQuery = ...
    RESTClient.performQuery(restQuery) { results in
        let partialObjects = results.map { ... }
        let group = DispatchGroup()
        for partialObject in partialObjects {
            let restQuery = ... // something based on partialObject
            group.enter()
            RESTClient.performQuery(restQuery) { results in
                group.leave()
                let partialObjects2 = results.map { ... }
                partialObject.property1 = // something from partialObjects2
                partialObject.property2 = // something from partialObjects2
                // and we could go down yet _another_ level in some cases
            }
        }
        group.notify {
            completion([partialObjects])
        }
    }
}

每次我在伪代码中说 results in 时,那是异步网络调用的完成处理程序。

好吧,我很清楚如何在 Combine 中链接异步调用,例如通过使用 Futures 和 flatMap(又是伪代码):

let future1 = Future...
future1.map {
    // do something
}.flatMap {
    let future2 = Future...
    return future2.map {
        // do something
    }
}
// ...

在那个代码中,我们形成 future2 的方式可以取决于我们从 future1 的执行中收到的值,而在 future2 上的 map 我们可以修改我们从上游接收到的内容,然后再向下传递到管道。没问题。都挺好看的。

但这并没有告诉我我在组合前代码中所做的事情,即循环。我在这里,在 loop 中执行 multiple 异步调用,在继续之前由 DispatchGroup 保持在适当的位置。问题是:

这样做的 Combine 模式是什么?

记住当时的情况。我有一些对象的 array。我想 loop 通过该数组,对循环中的 each 对象进行异步调用,异步获取新信息并在此基础上修改该对象, 在继续进行管道之前。每个循环可能涉及进一步的 嵌套 循环,甚至异步收集 更多 信息:

Fetch info from online database, it's an array
   |
   V
For each element in the array, fetch _more_ info, _that's_ an array
   |
   V
For each element in _that_ array, fetch _more_ info
   |
   V
Loop thru the accumulated info and populate that element of the original array 

执行此操作的旧代码看起来很糟糕,充满了嵌套的完成处理程序和由 DispatchGroup enter/leave/notify 保留的循环。 但它奏效了。我无法让我的 Combine 代码以同样的方式工作。我该怎么做?基本上我的管道输出是一些东西的数组,我觉得我需要将该数组拆分成单独的元素,对每个元素做一些异步,然后将元素放回一个数组中.怎么样?


我解决此问题的方法有效,但无法扩展,尤其是当异步调用需要到达管道链中 back 几个步骤的信息时。我一直在做这样的事情(我从 得到这个想法):

  1. 一组对象从上游到达。

  2. 我将 flatMapmap 数组输入到发布者数组中,每个发布者都以 Future 为首,进一步获取与 one 相关的在线内容 对象,然后是生成 modified 对象的管道。

  3. 现在我有一组管道,每个管道生成一个对象。我 merge 该数组并从 flatMap.

  4. 生成该发布者(一个 MergeMany)
  5. collect将结果值返回到一个数组中。

但这看起来仍然需要很多工作,更糟糕的是,当每个子管道本身需要生成一个子管道数组时,它不会扩展。这一切都变得难以理解,过去很容易到达完成块的信息(由于 Swift 的范围规则)不再到达主管道中的后续步骤(或者很难到达,因为我通过了更大的以及管道中更大的元组)。

一定有一些简单的 Combine 模式可以做到这一点,但我完全想念它。请告诉我它是什么。

使用已接受的答案,我得到了这个结构:

head // [Entity]
    .flatMap { entities -> AnyPublisher<Entity, Error> in
        Publishers.Sequence(sequence: entities).eraseToAnyPublisher()
    }.flatMap { entity -> AnyPublisher<Entity, Error> in
        self.makeFuture(for: entity) // [Derivative]
            .flatMap { derivatives -> AnyPublisher<Derivative, Error> in
                Publishers.Sequence(sequence: derivatives).eraseToAnyPublisher()
            }
            .flatMap { derivative -> AnyPublisher<Derivative2, Error> in
                self.makeFuture(for: derivative).eraseToAnyPublisher() // Derivative2
        }.collect().map { derivative2s -> Entity in
            self.configuredEntity(entity, from: derivative2s)
        }.eraseToAnyPublisher()
    }.collect()

这正是我一直在寻找的优雅密封性!所以想法是:

我们收到一个数组,我们需要异步处理每个元素。旧方法是一个 DispatchGroup 和一个 for...in 循环。合并等价物是:

  • for...in行的等价物是flatMap和Publishers.Sequence。

  • DispatchGroup(处理异步)的等价物是进一步的flatMap(在单个元素上)和一些发布者。在我的例子中,我从一个基于我们刚刚收到的单个元素的 Future 开始。

  • 相当于末尾的右花括号是collect(),等待所有元素处理完毕再把数组重新组合起来

所以综上所述,模式是:

  1. flatMap 数组到序列。
  2. flatMap 对发布者启动异步操作的单个元素。
  3. 根据需要继续该发布者的链接。
  4. collect回数组

通过 嵌套 该模式,我们可以利用 Swift 范围规则将我们需要处理的事情保留在范围内,直到我们获得足够的信息来生成处理的对象。

您最近的编辑和下面的评论:

I literally am asking is there a Combine equivalent of "don't proceed to the next step until this step, involving multiple asynchronous steps, has finished"

我认为这种模式可以通过 .flatMap 到数组发布者 (Publishers.Sequence) 来实现,它一个接一个地发出并完成,然后是任何需要的每元素异步处理, 并以 .collect 结束,等待所有元素完成后再继续

所以,在代码中,假设我们有这些函数:

func getFoos() -> AnyPublisher<[Foo], Error>
func getPartials(for: Foo) -> AnyPublisher<[Partial], Error>
func getMoreInfo(for: Partial, of: Foo) -> AnyPublisher<MoreInfo, Error>

我们可以做到以下几点:

getFoos()
.flatMap { fooArr in 
    fooArr.publisher.setFailureType(to: Error.self)
 }

// per-foo element async processing
.flatMap { foo in

  getPartials(for: foo)
    .flatMap { partialArr in
       partialArr.publisher.setFailureType(to: Error.self)
     }

     // per-partial of foo async processing
    .flatMap { partial in

       getMoreInfo(for: partial, of: foo)
         // build completed partial with more info
         .map { moreInfo in
            var newPartial = partial
            newPartial.moreInfo = moreInfo
            return newPartial
         }
     }
     .collect()
     // build completed foo with all partials
     .map { partialArr in
        var newFoo = foo
        newFoo.partials = partialArr
        return newFoo
     }
}
.collect()

(删除旧答案)