在 Akka Stream 中连接 Scan 和 Broadcast 有更好的方法吗?
Is there a nicer way to connect Scan and Broadcast in Akka Stream?
假设我想创建一个 Flow
,它需要 Int
并输出元组 (doubled int, sum)
。所以我扇出整数,map
在一个边上,scan
在另一边上。然后我 zip
他们这是结果:
object Main extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val flow = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Int](2))
val zip = b.add(Zip[Int, Int])
val flowMap = b.add(Flow[Int].map(_ * 2))
val flowScan = b.add(Flow[Int].scan(0)(_ + _))
broadcast.out(0) ~> flowMap ~> zip.in0
broadcast.out(1) ~> flowScan ~> zip.in1
FlowShape(broadcast.in, zip.out)
})
Source(1 to 5).via(flow).to(Sink.foreach(println)).run()
}
不幸的是,这没有输出任何东西。我研究了一下,发现:
Broadcast
当所有输出停止背压并且有可用的输入元素时发出,
Scan
下游背压时的背压。
这使得整个流程陷入僵局,没有任何反应。有人知道如何实现结果吗:
(2,0)
(4,1)
(6,3)
(8,6)
(10,10)
以一种很好的方式?到目前为止我找到的唯一解决方案是使用 .buffer
:
val flowScan = b.add(Flow[Int].buffer(1, OverflowStrategy.backpressure).scan(0)(_ + _))
但我不太喜欢这个解决方案,因为它不仅描述了逻辑,还描述了一些技术细节...
死锁的原因是扫描会在第一次请求时发出起始值,所以 0
在这种情况下并没有向上游传递需求,这意味着需求只达到 broadcast.out(0)
正如您所说,broadcast
仅在所有下游都有需求时才会发出。
buffer 看起来是个技术问题,其实是按照你想达到的效果表达图,你想压缩两个分支,但是scan-one永远是一个元素在前面另一个。这对于 akka-streams 的工作方式非常重要。
所以你的结果实际上与 broadcast+zip 在没有一些额外的图形节点的情况下所做的不匹配,我认为最清晰地表达你想要发生的事情的方法是在扫描之前单独放置缓冲区,这更清楚地表明一个分支将领先于另一个分支:
broadcast.out(0) ~> flowMap ~> zip.in0
broadcast.out(1) ~> buffer ~> flowScan ~> zip.in1
假设我想创建一个 Flow
,它需要 Int
并输出元组 (doubled int, sum)
。所以我扇出整数,map
在一个边上,scan
在另一边上。然后我 zip
他们这是结果:
object Main extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val flow = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Int](2))
val zip = b.add(Zip[Int, Int])
val flowMap = b.add(Flow[Int].map(_ * 2))
val flowScan = b.add(Flow[Int].scan(0)(_ + _))
broadcast.out(0) ~> flowMap ~> zip.in0
broadcast.out(1) ~> flowScan ~> zip.in1
FlowShape(broadcast.in, zip.out)
})
Source(1 to 5).via(flow).to(Sink.foreach(println)).run()
}
不幸的是,这没有输出任何东西。我研究了一下,发现:
Broadcast
当所有输出停止背压并且有可用的输入元素时发出,Scan
下游背压时的背压。
这使得整个流程陷入僵局,没有任何反应。有人知道如何实现结果吗:
(2,0)
(4,1)
(6,3)
(8,6)
(10,10)
以一种很好的方式?到目前为止我找到的唯一解决方案是使用 .buffer
:
val flowScan = b.add(Flow[Int].buffer(1, OverflowStrategy.backpressure).scan(0)(_ + _))
但我不太喜欢这个解决方案,因为它不仅描述了逻辑,还描述了一些技术细节...
死锁的原因是扫描会在第一次请求时发出起始值,所以 0
在这种情况下并没有向上游传递需求,这意味着需求只达到 broadcast.out(0)
正如您所说,broadcast
仅在所有下游都有需求时才会发出。
buffer 看起来是个技术问题,其实是按照你想达到的效果表达图,你想压缩两个分支,但是scan-one永远是一个元素在前面另一个。这对于 akka-streams 的工作方式非常重要。
所以你的结果实际上与 broadcast+zip 在没有一些额外的图形节点的情况下所做的不匹配,我认为最清晰地表达你想要发生的事情的方法是在扫描之前单独放置缓冲区,这更清楚地表明一个分支将领先于另一个分支:
broadcast.out(0) ~> flowMap ~> zip.in0
broadcast.out(1) ~> buffer ~> flowScan ~> zip.in1