Akka Streams - 如何在图表中保留辅助汇的物化值
Akka Streams - How to keep materialized value of an auxiliary Sink in a Graph
我有一个返回 Flow
的函数,其逻辑涉及将图形的某些元素传递给作为参数传递的辅助 Sink
。我想保留辅助 Sink
的物化值,这样我就可以在启动构造流时根据它的值采取行动。
这是我正在构建的流程的粗略图片:
IN ~> (logic: In => Out) ~> Broadcast ~> AuxFilter ~> AuxSink
~> OutFilter ~> OUT
示例代码:
case class Incoming()
trait Element
case class Outcoming() extends Element
case class Persistent() extends Element
def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, NotUsed] = {
val isPersistent = Flow[Element].collect {
case persistent: Persistent => persistent
}
val isRunning = Flow[Element].collect {
case out: Outcoming => out
}
val magicFlow: Flow[Incoming, Element, NotUsed] = Flow[Incoming]
.map(_ => if (Random.nextBoolean()) Outcoming() else Persistent())
Flow.fromGraph {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val magic = b.add(magicFlow)
val bcast = b.add(Broadcast[Element](2))
val sink = b.add(isRunning)
bcast.out(0) ~> isPersistent ~> auxSink
magic.out ~> bcast.in
bcast.out(1) ~> isRunning ~> sink.in
FlowShape(magic.in, sink.out)
}
}
}
有没有办法以某种方式将 auxSink
的 Mat
传递给结果 Flow
?
谢谢。
回答我自己的问题...
找到了! Flow.alsoToMat
的来源准确地指出了我需要的逻辑 - 要访问辅助图的物化值(在我的例子中 auxSink
),必须将其形状导入通过传递构建的图中它作为 GraphDSL.create()
.
的参数
def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, Mat] = {
val isPersistent = ...
val isRunning = ...
val magicFlow = ...
Flow.fromGraph {
GraphDSL.create(auxSink) { implicit b => aux =>
import GraphDSL.Implicits._
val magic = b.add(magicFlow)
val bcast = b.add(Broadcast[Element](2))
val sink = b.add(isRunning)
magic ~> bcast ~> isPersistent ~> aux
bcast ~> isRunning ~> sink
FlowShape(magic.in, sink.out)
}
}
}
我有一个返回 Flow
的函数,其逻辑涉及将图形的某些元素传递给作为参数传递的辅助 Sink
。我想保留辅助 Sink
的物化值,这样我就可以在启动构造流时根据它的值采取行动。
这是我正在构建的流程的粗略图片:
IN ~> (logic: In => Out) ~> Broadcast ~> AuxFilter ~> AuxSink
~> OutFilter ~> OUT
示例代码:
case class Incoming()
trait Element
case class Outcoming() extends Element
case class Persistent() extends Element
def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, NotUsed] = {
val isPersistent = Flow[Element].collect {
case persistent: Persistent => persistent
}
val isRunning = Flow[Element].collect {
case out: Outcoming => out
}
val magicFlow: Flow[Incoming, Element, NotUsed] = Flow[Incoming]
.map(_ => if (Random.nextBoolean()) Outcoming() else Persistent())
Flow.fromGraph {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val magic = b.add(magicFlow)
val bcast = b.add(Broadcast[Element](2))
val sink = b.add(isRunning)
bcast.out(0) ~> isPersistent ~> auxSink
magic.out ~> bcast.in
bcast.out(1) ~> isRunning ~> sink.in
FlowShape(magic.in, sink.out)
}
}
}
有没有办法以某种方式将 auxSink
的 Mat
传递给结果 Flow
?
谢谢。
回答我自己的问题...
找到了! Flow.alsoToMat
的来源准确地指出了我需要的逻辑 - 要访问辅助图的物化值(在我的例子中 auxSink
),必须将其形状导入通过传递构建的图中它作为 GraphDSL.create()
.
def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, Mat] = {
val isPersistent = ...
val isRunning = ...
val magicFlow = ...
Flow.fromGraph {
GraphDSL.create(auxSink) { implicit b => aux =>
import GraphDSL.Implicits._
val magic = b.add(magicFlow)
val bcast = b.add(Broadcast[Element](2))
val sink = b.add(isRunning)
magic ~> bcast ~> isPersistent ~> aux
bcast ~> isRunning ~> sink
FlowShape(magic.in, sink.out)
}
}
}