Akka 流 - 列表到单个元素的 mapAsync

Akka stream - List to mapAsync of individual elements

我的流有一个 Flow,它的输出是 List[Any] 对象。我想要一个 mapAsync,然后是其他一些阶段,每个阶段处理一个单独的元素而不是列表。我该怎么做?

实际上我想连接

的输出
Flow[Any].map { msg =>
  someListDerivedFrom(msg)
}

将由 -

消耗
Flow[Any].mapAsyncUnordered(4) { listElement =>
  actorRef ? listElement
}.someOtherStuff

我该怎么做?

我认为您要查找的组合器是 mapConcat。这个组合器将接受一个输入参数和 return 一个 Iterable 的东西。一个简单的例子如下:

implicit val system = ActorSystem()
implicit val mater = ActorMaterializer()

val source = Source(List(List(1,2,3), List(4,5,6)))
val sink = Sink.foreach[Int](println)

val graph =
  source.
    mapConcat(identity).
    to(sink)
graph.run

在这里,我的 Source 吐出 List 元素,而我的 Sink 接受那些 List 中的基础类型。由于类型不同,我无法将它们直接连接在一起。但是如果我在它们之间应用 mapConcat,它们可以连接起来,因为组合器会将那些 List 元素展平,将它们的各个元素 (Int) 发送到下游。因为mapConcat的输入元素已经是一个Iterable,那么你只需要在mapConcat的body中使用identify函数就可以了。