如何在 akka 流中使用 mapAsync 使用分组的子流
How to consume grouped sub streams with mapAsync in akka streams
我需要做一些与此非常相似的事情https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala
我的问题是我有一个未知数量的组,如果 mapAsync 的并行数小于我得到的组数并且在最后一个接收器中出错
Tearing down
SynchronousFileSink(/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt)
due to upstream error
(akka.stream.impl.StreamSubscriptionTimeoutSupport$$anon)
我尝试按照 akka 流模式指南中的建议在中间放置一个缓冲区 http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html
groupBy {
case LoglevelPattern(level) => level
case other => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
// write lines of each group to a separate file
mapAsync(parallelism = 2) {....
但结果相同
扩展 jrudolph 的完全正确的评论...
在这种情况下您不需要 mapAsync
。作为一个基本示例,假设您有一个元组来源
import akka.stream.scaladsl.{Source, Sink}
def data() = List(("foo", 1),
("foo", 2),
("bar", 1),
("foo", 3),
("bar", 2))
val originalSource = Source(data)
然后您可以执行 groupBy 来创建 Source of Sources
def getID(tuple : (String, Int)) = tuple._1
//a Source of (String, Source[(String, Int),_])
val groupedSource = originalSource groupBy getID
每个分组的 Source 都可以与 map
并行处理,不需要任何花哨的东西。以下是每个分组在独立流中求和的示例:
import akka.actor.ActorSystem
import akka.stream.ACtorMaterializer
implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher
def getValues(tuple : (String, Int)) = tuple._2
//does not have to be a def, we can re-use the same sink over-and-over
val sumSink = Sink.fold[Int,Int](0)(_ + _)
//a Source of (String, Future[Int])
val sumSource =
groupedSource map { case (id, src) =>
id -> {src map getValues runWith sumSink} //calculate sum in independent stream
}
现在所有 "foo"
个数字与所有 "bar"
个数字并行求和。
mapAsync
当你有一个 returns 一个 Future[T]
的封装函数并且你试图发出一个 T
时使用;你的问题不是这种情况。此外,mapAsync 涉及 waiting for results which is not reactive...
我需要做一些与此非常相似的事情https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala
我的问题是我有一个未知数量的组,如果 mapAsync 的并行数小于我得到的组数并且在最后一个接收器中出错
Tearing down SynchronousFileSink(/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt) due to upstream error (akka.stream.impl.StreamSubscriptionTimeoutSupport$$anon)
我尝试按照 akka 流模式指南中的建议在中间放置一个缓冲区 http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html
groupBy {
case LoglevelPattern(level) => level
case other => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
// write lines of each group to a separate file
mapAsync(parallelism = 2) {....
但结果相同
扩展 jrudolph 的完全正确的评论...
在这种情况下您不需要 mapAsync
。作为一个基本示例,假设您有一个元组来源
import akka.stream.scaladsl.{Source, Sink}
def data() = List(("foo", 1),
("foo", 2),
("bar", 1),
("foo", 3),
("bar", 2))
val originalSource = Source(data)
然后您可以执行 groupBy 来创建 Source of Sources
def getID(tuple : (String, Int)) = tuple._1
//a Source of (String, Source[(String, Int),_])
val groupedSource = originalSource groupBy getID
每个分组的 Source 都可以与 map
并行处理,不需要任何花哨的东西。以下是每个分组在独立流中求和的示例:
import akka.actor.ActorSystem
import akka.stream.ACtorMaterializer
implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher
def getValues(tuple : (String, Int)) = tuple._2
//does not have to be a def, we can re-use the same sink over-and-over
val sumSink = Sink.fold[Int,Int](0)(_ + _)
//a Source of (String, Future[Int])
val sumSource =
groupedSource map { case (id, src) =>
id -> {src map getValues runWith sumSink} //calculate sum in independent stream
}
现在所有 "foo"
个数字与所有 "bar"
个数字并行求和。
mapAsync
当你有一个 returns 一个 Future[T]
的封装函数并且你试图发出一个 T
时使用;你的问题不是这种情况。此外,mapAsync 涉及 waiting for results which is not reactive...