Akka 流 - 将接收器连接到源?
Akka stream - connect sink to source?
我遇到过 sink
(或中间 flow
)实际上会产生一些必须推回(或附加)到 [=12= 的副作用数据的情况].有没有办法使用流 DSL 来完成此操作?我可以使用一些阻塞队列或类似的队列来创建 source
,然后将数据直接推送到该队列,但是这会破坏流的抽象。也许有我不知道的更好的解决方案?
正如 Viktor 所说,您可以使用圆形图。
例如,partition
阶段允许您 select 流的特定元素。
def partitionFunction(i: Int): Int = if (i % 2 == 0) 0 else 1
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.foreach[Int](println)
val addOne = Flow[Int].map(_ + 1)
val partition = builder.add(Partition[Int](2, partitionFunction))
val merge = builder.add(Merge[Int](2))
in ~> merge ~> partition
partition.out(0) ~> addOne ~> merge
partition.out(1) ~> out
ClosedShape
})
在此示例中,源 in
连接到 merge
的一个输入。然后整数通过 partition
阶段,该阶段将分隔偶数和奇数。
偶数正在经历 addOne
流程,然后进入 merge
的第二个输入(这将使他们再次回到 partition
阶段)。
奇数直接去水槽out
.
这允许将一些值反馈到图中,但它很容易导致循环(这就是 addOne
阶段在这里很重要的原因,没有它偶数就会被困住在图中)。
Reactive-kafka 做了类似的事情(至少在 0.8 版本中):它将 Sink 消费的消息提交回源(Kafka 消费者)。
KafkaCommitterSink 是实现。不过,它并不是真正的圆形图,据我所知,它 'updates' 独立于流程的来源。
我遇到过 sink
(或中间 flow
)实际上会产生一些必须推回(或附加)到 [=12= 的副作用数据的情况].有没有办法使用流 DSL 来完成此操作?我可以使用一些阻塞队列或类似的队列来创建 source
,然后将数据直接推送到该队列,但是这会破坏流的抽象。也许有我不知道的更好的解决方案?
正如 Viktor 所说,您可以使用圆形图。
例如,partition
阶段允许您 select 流的特定元素。
def partitionFunction(i: Int): Int = if (i % 2 == 0) 0 else 1
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.foreach[Int](println)
val addOne = Flow[Int].map(_ + 1)
val partition = builder.add(Partition[Int](2, partitionFunction))
val merge = builder.add(Merge[Int](2))
in ~> merge ~> partition
partition.out(0) ~> addOne ~> merge
partition.out(1) ~> out
ClosedShape
})
在此示例中,源 in
连接到 merge
的一个输入。然后整数通过 partition
阶段,该阶段将分隔偶数和奇数。
偶数正在经历 addOne
流程,然后进入 merge
的第二个输入(这将使他们再次回到 partition
阶段)。
奇数直接去水槽out
.
这允许将一些值反馈到图中,但它很容易导致循环(这就是 addOne
阶段在这里很重要的原因,没有它偶数就会被困住在图中)。
Reactive-kafka 做了类似的事情(至少在 0.8 版本中):它将 Sink 消费的消息提交回源(Kafka 消费者)。
KafkaCommitterSink 是实现。不过,它并不是真正的圆形图,据我所知,它 'updates' 独立于流程的来源。