Akka 流 - 将接收器连接到源?

Akka stream - connect sink to source?

我遇到过 sink(或中间 flow)实际上会产生一些必须推回(或附加)到 [=12= 的副作用数据的情况].有没有办法使用流 DSL 来完成此操作?我可以使用一些阻塞队列或类似的队列来创建 source,然后将数据直接推送到该队列,但是这会破坏流的抽象。也许有我不知道的更好的解决方案?

正如 Viktor 所说,您可以使用圆形图。

例如,partition 阶段允许您 select 流的特定元素。

  def partitionFunction(i: Int): Int = if (i % 2 == 0) 0 else 1

  val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val in = Source(1 to 10)
    val out = Sink.foreach[Int](println)

    val addOne = Flow[Int].map(_ + 1)

    val partition = builder.add(Partition[Int](2, partitionFunction))
    val merge = builder.add(Merge[Int](2))

                            in ~> merge ~> partition
    partition.out(0) ~> addOne ~> merge
    partition.out(1) ~> out

    ClosedShape
  })

在此示例中,源 in 连接到 merge 的一个输入。然后整数通过 partition 阶段,该阶段将分隔偶数和奇数。

偶数正在经历 addOne 流程,然后进入 merge 的第二个输入(这将使他们再次回到 partition 阶段)。

奇数直接去水槽out.

这允许将一些值反馈到图中,但它很容易导致循环(这就是 addOne 阶段在这里很重要的原因,没有它偶数就会被困住在图中)。

Reactive-kafka 做了类似的事情(至少在 0.8 版本中):它将 Sink 消费的消息提交回源(Kafka 消费者)。

KafkaCommitterSink 是实现。不过,它并不是真正的圆形图,据我所知,它 'updates' 独立于流程的来源。