Akka 流 - 列表到单个元素的 mapAsync
Akka stream - List to mapAsync of individual elements
我的流有一个 Flow,它的输出是 List[Any] 对象。我想要一个 mapAsync,然后是其他一些阶段,每个阶段处理一个单独的元素而不是列表。我该怎么做?
实际上我想连接
的输出
Flow[Any].map { msg =>
someListDerivedFrom(msg)
}
将由 -
消耗
Flow[Any].mapAsyncUnordered(4) { listElement =>
actorRef ? listElement
}.someOtherStuff
我该怎么做?
我认为您要查找的组合器是 mapConcat
。这个组合器将接受一个输入参数和 return 一个 Iterable
的东西。一个简单的例子如下:
implicit val system = ActorSystem()
implicit val mater = ActorMaterializer()
val source = Source(List(List(1,2,3), List(4,5,6)))
val sink = Sink.foreach[Int](println)
val graph =
source.
mapConcat(identity).
to(sink)
graph.run
在这里,我的 Source
吐出 List
元素,而我的 Sink
接受那些 List
中的基础类型。由于类型不同,我无法将它们直接连接在一起。但是如果我在它们之间应用 mapConcat
,它们可以连接起来,因为组合器会将那些 List
元素展平,将它们的各个元素 (Int
) 发送到下游。因为mapConcat
的输入元素已经是一个Iterable
,那么你只需要在mapConcat
的body中使用identify
函数就可以了。
我的流有一个 Flow,它的输出是 List[Any] 对象。我想要一个 mapAsync,然后是其他一些阶段,每个阶段处理一个单独的元素而不是列表。我该怎么做?
实际上我想连接
的输出Flow[Any].map { msg =>
someListDerivedFrom(msg)
}
将由 -
消耗Flow[Any].mapAsyncUnordered(4) { listElement =>
actorRef ? listElement
}.someOtherStuff
我该怎么做?
我认为您要查找的组合器是 mapConcat
。这个组合器将接受一个输入参数和 return 一个 Iterable
的东西。一个简单的例子如下:
implicit val system = ActorSystem()
implicit val mater = ActorMaterializer()
val source = Source(List(List(1,2,3), List(4,5,6)))
val sink = Sink.foreach[Int](println)
val graph =
source.
mapConcat(identity).
to(sink)
graph.run
在这里,我的 Source
吐出 List
元素,而我的 Sink
接受那些 List
中的基础类型。由于类型不同,我无法将它们直接连接在一起。但是如果我在它们之间应用 mapConcat
,它们可以连接起来,因为组合器会将那些 List
元素展平,将它们的各个元素 (Int
) 发送到下游。因为mapConcat
的输入元素已经是一个Iterable
,那么你只需要在mapConcat
的body中使用identify
函数就可以了。