Akka 流:为什么 Sink.head 使用 alsoTo 广播终止流?
Akka streams: Why does Sink.head terminate stream with alsoTo broadcast?
让我们举一个非常简单的例子:
Source(1 to 10)
.alsoTo(Sink.foreach(v => println(s"each: $v")))
.toMat(Sink.head)(Keep.right)
.run()
根据 alsoTo
文档,我希望 Sink.foreach
打印所有元素,但是,它只打印第一个。如果我切换 Sink.foreach
和 Sink.head
位置,也会发生同样的情况。
如果广播是通过 GraphDSL
实现的,但是,即使其中一个接收器是 Sink.head
,也会消耗整个源。
编辑:
alsoTo
的文档说明如下:
Attaches the given Sink to this Flow, meaning that elements that pass through this Flow will also be sent to the Sink.
对我来说这听起来像是广播,但也许这就是我犯错误的地方。我也可以解释为 toMat
控制流程。因此,我希望以下内容能够从源中获取所有元素:
Source(1 to 10)
.alsoTo(Sink.head)
.toMat(Sink.seq)(Keep.right)
.run()
GraphDSL 版本如我所料:
val s1 = Sink.foreach[Int](v => println(s"each: $v"))
val s2 = Sink.head[Int]
val source = Source(1 to 10)
RunnableGraph.fromGraph(GraphDSL.create(s1, s2)((_, _)) { implicit builder => (s1, s2) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
source ~> broadcast.in
broadcast.out(0) ~> s1
broadcast.out(1) ~> s2
ClosedShape
}).run()
原因是Sink.head
消耗单个元素并完成自身。这以 cancel
的形式向上游传播,并且在此之后不会从源发送任何元素。
来自 akka.stream.impl.HeadOptionStage.onPush
的代码显示了它
def onPush(): Unit = {
p.trySuccess(Option(grab(in)))
completeStage()
}
哪里completeStage
Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called, then marks the operator as stopped.
更新
alsoTo
是使用这些参数配置的广播:
val bcast = b.add(Broadcast[Out](2, eagerCancel = true))
您的 GraphDSL
版本工作方式不同,因为默认广播是 eagerCancel = false
。
其中 eagerCancel
if true, broadcast cancels upstream if any of its downstreams cancel.
让我们举一个非常简单的例子:
Source(1 to 10)
.alsoTo(Sink.foreach(v => println(s"each: $v")))
.toMat(Sink.head)(Keep.right)
.run()
根据 alsoTo
文档,我希望 Sink.foreach
打印所有元素,但是,它只打印第一个。如果我切换 Sink.foreach
和 Sink.head
位置,也会发生同样的情况。
如果广播是通过 GraphDSL
实现的,但是,即使其中一个接收器是 Sink.head
,也会消耗整个源。
编辑:
alsoTo
的文档说明如下:
Attaches the given Sink to this Flow, meaning that elements that pass through this Flow will also be sent to the Sink.
对我来说这听起来像是广播,但也许这就是我犯错误的地方。我也可以解释为 toMat
控制流程。因此,我希望以下内容能够从源中获取所有元素:
Source(1 to 10)
.alsoTo(Sink.head)
.toMat(Sink.seq)(Keep.right)
.run()
GraphDSL 版本如我所料:
val s1 = Sink.foreach[Int](v => println(s"each: $v"))
val s2 = Sink.head[Int]
val source = Source(1 to 10)
RunnableGraph.fromGraph(GraphDSL.create(s1, s2)((_, _)) { implicit builder => (s1, s2) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
source ~> broadcast.in
broadcast.out(0) ~> s1
broadcast.out(1) ~> s2
ClosedShape
}).run()
原因是Sink.head
消耗单个元素并完成自身。这以 cancel
的形式向上游传播,并且在此之后不会从源发送任何元素。
来自 akka.stream.impl.HeadOptionStage.onPush
的代码显示了它
def onPush(): Unit = {
p.trySuccess(Option(grab(in)))
completeStage()
}
哪里completeStage
Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called, then marks the operator as stopped.
更新
alsoTo
是使用这些参数配置的广播:
val bcast = b.add(Broadcast[Out](2, eagerCancel = true))
您的 GraphDSL
版本工作方式不同,因为默认广播是 eagerCancel = false
。
其中 eagerCancel
if true, broadcast cancels upstream if any of its downstreams cancel.