使用不同选项的 akka 流物化值

akka stream materialized value using different options

我是 akka 流的新手,想了解流中的物化是如何工作的

//Print sum of elements from 1 to 10
val newSource = Source(1 to 10)
val flow      = Flow[Int].fold(0)((a, b) => a + b)
val sink      = Sink.foreach(println)
val sumFuture = newSource.via(flow).toMat(sink)(Keep.left).run()

它使用 Keep.leftKeep.right 打印值 55。两者有何不同?

我想探索一下 Keep.leftKeep.right 给出不同的值以及我们如何使用 Keep.both

物质化的价值可以由汇和源产生。通过将源与接收器组合来创建 运行nable 图。 Keep 定义合并时要保留的物化值

  • Keep.right 选择接收器的物化值
  • Keep.left 选择源的物化值
  • Keep.both 以元组的形式选择两者
  • Keep.none 忽略两者并选择 NotUsed,即指示没有物化值的标记。

默认情况下,Keep.left用于操作viato

以下示例突出显示了这一点

给定一个 Source[Int, String] 和一个 Sink[Int, Future[Int]]

val source: Source[Int, String] = Source(List(1, 2, 3)).mapMaterializedValue(_ => "Source Mat Value")
val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)

我们可以组合 sourcesink 来创建具有不同物化值的 运行 可用图。

val left: String = source.to(sink).run() //same as toMat(...)(Keep.left)
val right: Future[Int] = source.toMat(sink)(Keep.right).run()
val both: (String, Future[Int]) = source.toMat(sink)(Keep.both).run()

现在,如果我们运行它并打印它产生的每个物化值如下

left=Source Mat Value
right=Future(Success(6))
both=(Source Mat Value,Future(Success(6)))

请不要混淆物化值与流元素的处理。

考虑以下 fold 个阶段

val flowFold: Flow[Int, Int, NotUsed] = Flow[Int].fold(0)(_ + _)
val sinkFold: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)

flowFoldfold 函数应用于流中的每个元素,并将 表示 fold 结果的单个 值推送到下游。如果需要,可以进一步处理此元素。

sinkFold 是图中的最后阶段,它不能将元素推到更下游的位置。当图形处理完所有元素并完成时,它使用物化值 Future[Int] 到 return fold 结果。

if the value of Flow.fold is 55 should this be the materialised value of the flow instead of NotUsed.

不,值 55 不是物化值。它作为一个元素被推送到下游接收器。

您可以在 Sink.head

的帮助下在物化值中“捕获”元素 55
val flow: Flow[Int, Int, NotUsed] = Flow[Int].fold(0)(_ + _)
val alternativeFoldSink: Sink[Int, Future[Int]] = flow.toMat(Sink.head)(Keep.right)

Every stage can produce materialised value then (why) can't Flow.fold generate materialised value.

是的,每个阶段都可能产生物化价值。但是 Flow.fold 的设计不是这样做的。大多数 Flow 定义不提供物化值。如果你想使用物化值和fold,我建议使用Sink.fold

重要的是要记住一个流阶段可以有

  • 一个具体化值,在流被具体化时创建,在任何元素通过该具体化之前创建。因此,它不能依赖于传递 through/into/out 流的值。

  • 在流运行时传递到流的下一阶段的零个或多个输出值。

每个阶段都有具体化的价值。不是汇的每个阶段都可能有输出值。对于源,一般来说,物化值提供了一些影响流行为的方法(例如 Source.actorRef 的物化值是一个 ActorRef ,它允许您通过向流发送消息来将元素推送到流ActorRef,或 Alpakka Kafka 中的各种 Kafka 消费者来源允许您停止从 Kafka 消费,而不会停止流,直到流被耗尽。

对于接收器,一般来说,从接收器中获取值的唯一方法是通过物化值(因为没有输出)。由于必须在任何数据流过流之前创建物化值,这就是为什么大多数接收器物化为 Future(尚不可用的数据占位符)并且通常不会完成该值的原因直到流完成(因为 Future 最多写入一次)。

每个阶段都有一个物化值,但并不是每个阶段都有一个有意义的物化值:对于那些,特殊的 NotUsed 值(单例)编码为“无意义”。大多数流程阶段都属于此类:它们的存在仅仅是为了将输入转换为输出。