使用不同选项的 akka 流物化值
akka stream materialized value using different options
我是 akka 流的新手,想了解流中的物化是如何工作的
//Print sum of elements from 1 to 10
val newSource = Source(1 to 10)
val flow = Flow[Int].fold(0)((a, b) => a + b)
val sink = Sink.foreach(println)
val sumFuture = newSource.via(flow).toMat(sink)(Keep.left).run()
它使用 Keep.left
和 Keep.right
打印值 55
。两者有何不同?
我想探索一下 Keep.left
和 Keep.right
给出不同的值以及我们如何使用 Keep.both
物质化的价值可以由汇和源产生。通过将源与接收器组合来创建 运行nable 图。 Keep
定义合并时要保留的物化值
Keep.right
选择接收器的物化值
Keep.left
选择源的物化值
Keep.both
以元组的形式选择两者
Keep.none
忽略两者并选择 NotUsed
,即指示没有物化值的标记。
默认情况下,Keep.left
用于操作via
、to
等
以下示例突出显示了这一点
给定一个 Source[Int, String]
和一个 Sink[Int, Future[Int]]
val source: Source[Int, String] = Source(List(1, 2, 3)).mapMaterializedValue(_ => "Source Mat Value")
val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)
我们可以组合 source
和 sink
来创建具有不同物化值的 运行 可用图。
val left: String = source.to(sink).run() //same as toMat(...)(Keep.left)
val right: Future[Int] = source.toMat(sink)(Keep.right).run()
val both: (String, Future[Int]) = source.toMat(sink)(Keep.both).run()
现在,如果我们运行它并打印它产生的每个物化值如下
left=Source Mat Value
right=Future(Success(6))
both=(Source Mat Value,Future(Success(6)))
请不要混淆物化值与流元素的处理。
考虑以下 fold
个阶段
val flowFold: Flow[Int, Int, NotUsed] = Flow[Int].fold(0)(_ + _)
val sinkFold: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)
flowFold
将 fold
函数应用于流中的每个元素,并将 表示 fold
结果的单个 值推送到下游。如果需要,可以进一步处理此元素。
而 sinkFold
是图中的最后阶段,它不能将元素推到更下游的位置。当图形处理完所有元素并完成时,它使用物化值 Future[Int]
到 return fold
结果。
if the value of Flow.fold
is 55 should this be the materialised value of the flow instead of NotUsed
.
不,值 55
不是物化值。它作为一个元素被推送到下游接收器。
您可以在 Sink.head
的帮助下在物化值中“捕获”元素 55
val flow: Flow[Int, Int, NotUsed] = Flow[Int].fold(0)(_ + _)
val alternativeFoldSink: Sink[Int, Future[Int]] = flow.toMat(Sink.head)(Keep.right)
Every stage can produce materialised value then (why) can't Flow.fold
generate materialised value.
是的,每个阶段都可能产生物化价值。但是 Flow.fold
的设计不是这样做的。大多数 Flow
定义不提供物化值。如果你想使用物化值和fold
,我建议使用Sink.fold
重要的是要记住一个流阶段可以有
一个具体化值,在流被具体化时创建,在任何元素通过该具体化之前创建。因此,它不能依赖于传递 through/into/out 流的值。
在流运行时传递到流的下一阶段的零个或多个输出值。
每个阶段都有具体化的价值。不是汇的每个阶段都可能有输出值。对于源,一般来说,物化值提供了一些影响流行为的方法(例如 Source.actorRef
的物化值是一个 ActorRef
,它允许您通过向流发送消息来将元素推送到流ActorRef
,或 Alpakka Kafka 中的各种 Kafka 消费者来源允许您停止从 Kafka 消费,而不会停止流,直到流被耗尽。
对于接收器,一般来说,从接收器中获取值的唯一方法是通过物化值(因为没有输出)。由于必须在任何数据流过流之前创建物化值,这就是为什么大多数接收器物化为 Future
(尚不可用的数据占位符)并且通常不会完成该值的原因直到流完成(因为 Future
最多写入一次)。
每个阶段都有一个物化值,但并不是每个阶段都有一个有意义的物化值:对于那些,特殊的 NotUsed
值(单例)编码为“无意义”。大多数流程阶段都属于此类:它们的存在仅仅是为了将输入转换为输出。
我是 akka 流的新手,想了解流中的物化是如何工作的
//Print sum of elements from 1 to 10
val newSource = Source(1 to 10)
val flow = Flow[Int].fold(0)((a, b) => a + b)
val sink = Sink.foreach(println)
val sumFuture = newSource.via(flow).toMat(sink)(Keep.left).run()
它使用 Keep.left
和 Keep.right
打印值 55
。两者有何不同?
我想探索一下 Keep.left
和 Keep.right
给出不同的值以及我们如何使用 Keep.both
物质化的价值可以由汇和源产生。通过将源与接收器组合来创建 运行nable 图。 Keep
定义合并时要保留的物化值
Keep.right
选择接收器的物化值Keep.left
选择源的物化值Keep.both
以元组的形式选择两者Keep.none
忽略两者并选择NotUsed
,即指示没有物化值的标记。
默认情况下,Keep.left
用于操作via
、to
等
以下示例突出显示了这一点
给定一个 Source[Int, String]
和一个 Sink[Int, Future[Int]]
val source: Source[Int, String] = Source(List(1, 2, 3)).mapMaterializedValue(_ => "Source Mat Value")
val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)
我们可以组合 source
和 sink
来创建具有不同物化值的 运行 可用图。
val left: String = source.to(sink).run() //same as toMat(...)(Keep.left)
val right: Future[Int] = source.toMat(sink)(Keep.right).run()
val both: (String, Future[Int]) = source.toMat(sink)(Keep.both).run()
现在,如果我们运行它并打印它产生的每个物化值如下
left=Source Mat Value
right=Future(Success(6))
both=(Source Mat Value,Future(Success(6)))
请不要混淆物化值与流元素的处理。
考虑以下 fold
个阶段
val flowFold: Flow[Int, Int, NotUsed] = Flow[Int].fold(0)(_ + _)
val sinkFold: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)
flowFold
将 fold
函数应用于流中的每个元素,并将 表示 fold
结果的单个 值推送到下游。如果需要,可以进一步处理此元素。
而 sinkFold
是图中的最后阶段,它不能将元素推到更下游的位置。当图形处理完所有元素并完成时,它使用物化值 Future[Int]
到 return fold
结果。
if the value of
Flow.fold
is 55 should this be the materialised value of the flow instead ofNotUsed
.
不,值 55
不是物化值。它作为一个元素被推送到下游接收器。
您可以在 Sink.head
55
val flow: Flow[Int, Int, NotUsed] = Flow[Int].fold(0)(_ + _)
val alternativeFoldSink: Sink[Int, Future[Int]] = flow.toMat(Sink.head)(Keep.right)
Every stage can produce materialised value then (why) can't
Flow.fold
generate materialised value.
是的,每个阶段都可能产生物化价值。但是 Flow.fold
的设计不是这样做的。大多数 Flow
定义不提供物化值。如果你想使用物化值和fold
,我建议使用Sink.fold
重要的是要记住一个流阶段可以有
一个具体化值,在流被具体化时创建,在任何元素通过该具体化之前创建。因此,它不能依赖于传递 through/into/out 流的值。
在流运行时传递到流的下一阶段的零个或多个输出值。
每个阶段都有具体化的价值。不是汇的每个阶段都可能有输出值。对于源,一般来说,物化值提供了一些影响流行为的方法(例如 Source.actorRef
的物化值是一个 ActorRef
,它允许您通过向流发送消息来将元素推送到流ActorRef
,或 Alpakka Kafka 中的各种 Kafka 消费者来源允许您停止从 Kafka 消费,而不会停止流,直到流被耗尽。
对于接收器,一般来说,从接收器中获取值的唯一方法是通过物化值(因为没有输出)。由于必须在任何数据流过流之前创建物化值,这就是为什么大多数接收器物化为 Future
(尚不可用的数据占位符)并且通常不会完成该值的原因直到流完成(因为 Future
最多写入一次)。
每个阶段都有一个物化值,但并不是每个阶段都有一个有意义的物化值:对于那些,特殊的 NotUsed
值(单例)编码为“无意义”。大多数流程阶段都属于此类:它们的存在仅仅是为了将输入转换为输出。