何时在 Akka Streams Flows 中实际使用 materialiser,何时需要保留值

When materialiser is actually used in Akka Streams Flows and when do we need to Keep values

我正在尝试学习 Akka Streams,但在这里遇到了这种具体化问题。

每个教程都会显示一些基础知识 source via to run 示例,其中没有解释 Keep.leftKeep.right 之间的实际情况。所以我写了这段代码,要求 IntelliJ 为值添加类型注释并开始挖掘源代码。

val single: Source[Int, NotUsed] = Source(Seq(1, 2, 3, 4, 5))
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
val sink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)


val run1: RunnableGraph[Future[Int]] =
  single.viaMat(flow)(Keep.right).toMat(sink)(Keep.right)

val run2: RunnableGraph[NotUsed] =
  single.viaMat(flow)(Keep.right).toMat(sink)(Keep.left)

val run3: RunnableGraph[(NotUsed, Future[Int])] =
  single.viaMat(flow)(Keep.right).toMat(sink)(Keep.both)

val run4: RunnableGraph[NotUsed] =
  single.viaMat(flow)(Keep.right).toMat(sink)(Keep.none)

到目前为止我可以理解,在执行结束时我们需要 Sink 类型 Future[Int] 的值。但是我想不出任何情况下我需要保留一些值。

在第三个示例中,可以访问物化输出的左右值。

run3.run()._2 onComplete {
  case Success(value) ⇒ println(value)
  case Failure(exception) ⇒ println(exception.getMessage)
}

如果我将其更改为 viaMat(flowMultiply)(Keep.left)noneboth.

,它实际上以完全相同的方式工作

但是在什么情况下可以在图中使用物化值?如果价值无论如何都在内部流动,我们为什么需要它?如果我们不想保留它,为什么我们需要其中一个值?

请您提供一个示例,说明从左到右的更改不仅会破坏编译器,而且实际上会给程序逻辑带来差异?

对于大多数流,您只关心流末尾的值。因此,大多数 Source 和几乎所有标准 Flow 运算符都具有 NotUsed 的物化值,语法糖 .runWith 归结为 .toMat(sink)(Keep.right).run .

人们可能关心 SourceFlow 阶段的物化值的地方是您希望能够控制流之外的阶段。一个例子是 Source.actorRef,它允许您将消息发送给转发到流的参与者:您需要 Source 的物化 ActorRef 才能实际发送消息给它。同样,您可能仍然需要 Sink 的物化值(无论是知道流处理发生 (Future[Done]) 还是流末尾的实际值)。在这样的流中,您可能会有类似的内容:

val stream: RunnableGraph[(ActorRef, Future[Done])] = 
  Source.actorRef(...)
    .viaMat(calculateStuffFlow)(Keep.left)  // propagates the ActorRef
    .toMat(Sink.foreach { ... })(Keep.both)

val (sendToStream, done) = stream.run()

另一个相当常见的用例是在 Alpakka Kafka 集成中,消费者可以将控制器作为物化值:此控制器允许您停止消费某个主题并且不会取消订阅,直到任何挂起的偏移量提交已经发生。