为什么 Akka 流循环没有在此图中结束?
Why Akka streams cycle doesn't end in this graph?
我想创建一个在下沉之前循环 n 次的图形。我刚刚创建了这个满足我要求的示例,但在下沉后并没有结束,我真的不明白为什么。谁能赐教一下?
谢谢。
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, UniformFanOutShape}
import scala.concurrent.Future
object test {
def main(args: Array[String]) {
val ignore: Sink[Any, Future[Unit]] = Sink.ignore
val closed: RunnableGraph[Future[Unit]] = FlowGraph.closed(ignore) { implicit b =>
sink => {
import FlowGraph.Implicits._
val fileSource = Source.single((0, Array[String]()))
val merge = b.add(MergePreferred[(Int, Array[String])](1).named("merge"))
val afterMerge = Flow[(Int, Array[String])].map {
e =>
println("after merge")
e
}
val broadcastArray: UniformFanOutShape[(Int, Array[String]), (Int, Array[String])] = b.add(Broadcast[(Int, Array[String])](2).named("broadcastArray"))
val toRetry = Flow[(Int, Array[String])].filter {
case (r, s) => {
println("retry " + (r < 3) + " " + r)
r < 3
}
}.map {
case (r, s) => (r + 1, s)
}
val toSink = Flow[(Int, Array[String])].filter {
case (r, s) => {
println("sink " + (r >= 3) + " " + r)
r >= 3
}
}
merge.preferred <~ toRetry <~ broadcastArray
fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
}
}
implicit val system = ActorSystem()
implicit val _ = ActorMaterializer()
val run: Future[Unit] = closed.run()
import system.dispatcher
run.onComplete {
case _ => {
println("finished")
system.shutdown()
}
}
}
}`
流从未完成,因为合并从不发出完成信号。
格式化图形结构后,它基本上看起来像:
//ignoring the preferred which is inconsequential
fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
merge <~ toRetry <~ broadcastArray
未完成的问题根源于您的合并步骤:
// 2 inputs into merge
fileSource ~> merge
merge <~ toRetry
一旦 fileSource 发出了它的单个元素(即 (0, Array.empty[String])
),它就会发出一条 complete
消息进行合并。
但是,fileSource 的完成消息在合并时被阻止。来自 documentation:
akka.stream.scaladsl.MergePreferred
Completes when all upstreams complete (eagerClose=false) or one
upstream completes (eagerClose=true)
在其输入流 所有 完成之前,合并不会发送 complete
。
// fileSource is complete ~> merge
// merge <~ toRetry is still running
// complete fileSource + still running toRetry = still running merge
因此,合并将等到 toRetry
也完成。但是 toRetry
永远不会完成,因为它正在等待 merge
完成。
如果您希望您的特定图形在 fileSource 完成后完成,则只需设置 eagerClose=True
,这将导致 fileSource 完成后合并完成。例如:
//Add this true |
// V
val merge = b.add(MergePreferred[(Int, Array[String])](1, true).named("merge")
没有流循环
针对您的问题,存在更简单的解决方案。只需使用利用 tail recursive 函数的单个 Flow.map
阶段:
//Note: there is no use of akka in this implementation
type FileInputType = (Int, Array[String])
@scala.annotation.tailrec
def recursiveRetry(fileInput : FileInputType) : FileInputType =
fileInput match {
case (r,_) if r >= 3 => fileInput
case (r,a) => recursiveRetry((r+1, a))
}
你的流将减少到
//ring-fenced akka code
val recursiveRetryFlow = Flow[FileInputType] map recursiveRetry
fileSource ~> recursiveRetryFlow ~> toSink ~> sink
结果是一个更干净的流 & 它避免了 "business logic" 与 akka 代码混合。这允许完全独立于 any 第三方库的重试功能的单元测试。您嵌入到流中的重试循环是 "business logic"。因此,混合实现与 akka 紧密耦合,无论好坏。
此外,在分离解决方案中,循环包含在尾递归函数中,即 idiomatic Scala。
我想创建一个在下沉之前循环 n 次的图形。我刚刚创建了这个满足我要求的示例,但在下沉后并没有结束,我真的不明白为什么。谁能赐教一下?
谢谢。
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, UniformFanOutShape}
import scala.concurrent.Future
object test {
def main(args: Array[String]) {
val ignore: Sink[Any, Future[Unit]] = Sink.ignore
val closed: RunnableGraph[Future[Unit]] = FlowGraph.closed(ignore) { implicit b =>
sink => {
import FlowGraph.Implicits._
val fileSource = Source.single((0, Array[String]()))
val merge = b.add(MergePreferred[(Int, Array[String])](1).named("merge"))
val afterMerge = Flow[(Int, Array[String])].map {
e =>
println("after merge")
e
}
val broadcastArray: UniformFanOutShape[(Int, Array[String]), (Int, Array[String])] = b.add(Broadcast[(Int, Array[String])](2).named("broadcastArray"))
val toRetry = Flow[(Int, Array[String])].filter {
case (r, s) => {
println("retry " + (r < 3) + " " + r)
r < 3
}
}.map {
case (r, s) => (r + 1, s)
}
val toSink = Flow[(Int, Array[String])].filter {
case (r, s) => {
println("sink " + (r >= 3) + " " + r)
r >= 3
}
}
merge.preferred <~ toRetry <~ broadcastArray
fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
}
}
implicit val system = ActorSystem()
implicit val _ = ActorMaterializer()
val run: Future[Unit] = closed.run()
import system.dispatcher
run.onComplete {
case _ => {
println("finished")
system.shutdown()
}
}
}
}`
流从未完成,因为合并从不发出完成信号。
格式化图形结构后,它基本上看起来像:
//ignoring the preferred which is inconsequential
fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
merge <~ toRetry <~ broadcastArray
未完成的问题根源于您的合并步骤:
// 2 inputs into merge
fileSource ~> merge
merge <~ toRetry
一旦 fileSource 发出了它的单个元素(即 (0, Array.empty[String])
),它就会发出一条 complete
消息进行合并。
但是,fileSource 的完成消息在合并时被阻止。来自 documentation:
akka.stream.scaladsl.MergePreferred
Completes when all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
在其输入流 所有 完成之前,合并不会发送 complete
。
// fileSource is complete ~> merge
// merge <~ toRetry is still running
// complete fileSource + still running toRetry = still running merge
因此,合并将等到 toRetry
也完成。但是 toRetry
永远不会完成,因为它正在等待 merge
完成。
如果您希望您的特定图形在 fileSource 完成后完成,则只需设置 eagerClose=True
,这将导致 fileSource 完成后合并完成。例如:
//Add this true |
// V
val merge = b.add(MergePreferred[(Int, Array[String])](1, true).named("merge")
没有流循环
针对您的问题,存在更简单的解决方案。只需使用利用 tail recursive 函数的单个 Flow.map
阶段:
//Note: there is no use of akka in this implementation
type FileInputType = (Int, Array[String])
@scala.annotation.tailrec
def recursiveRetry(fileInput : FileInputType) : FileInputType =
fileInput match {
case (r,_) if r >= 3 => fileInput
case (r,a) => recursiveRetry((r+1, a))
}
你的流将减少到
//ring-fenced akka code
val recursiveRetryFlow = Flow[FileInputType] map recursiveRetry
fileSource ~> recursiveRetryFlow ~> toSink ~> sink
结果是一个更干净的流 & 它避免了 "business logic" 与 akka 代码混合。这允许完全独立于 any 第三方库的重试功能的单元测试。您嵌入到流中的重试循环是 "business logic"。因此,混合实现与 akka 紧密耦合,无论好坏。
此外,在分离解决方案中,循环包含在尾递归函数中,即 idiomatic Scala。