在 Akka Streams 中使用 Keep.right / Keep.left 不会影响结果
Usage of Keep.right / Keep.left within Akka Streams does not affect the result
我想举出不同的例子来理解 Keep.left 或 Keep.right
的工作
我有以下代码:
val numSource = Source(1 to 10)
val numSource = Source(1 to 10)
val incrementFlow = Flow[Int].map(x => x +1)
val doubleFlow = Flow[Int].map(x => x * 10)
val someFuture: NotUsed = numSource.via(incrementFlow).via(doubleFlow).to(Sink.foreach(println)).run
val someOtherFuture: Future[Done] = numSource.viaMat(incrementFlow)(Keep.left).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
val someOtherFuture2: Future[Done] = numSource.viaMat(incrementFlow)(Keep.right).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
我假设 someOtherFuture viaMat(incrementFlow)(Keep.left)
将忽略元素的增量,因为我使用的是源的物化值(而不是结果 = Keep.right),图表的结果将等于 num * 10。
但是所有 3 行都给我返回相同的结果:
//20,30,40 .. 110
我在这里缺少什么?我已经检查了文档并尝试实现简单的示例,但看起来我错了具体化的想法。
或者可能是因为我正在使用序列图 w/o 任何接收两个输入的合并流?
最好将物化值理解为流外代码的一种方式(无论是 运行 还是 completed/failed)以某种方式与流交互:它是 return 运行 流的值,我们可以在您发布的代码中看到 Keep.right
/Keep.left
的效果:
val someFuture: NotUsed = numSource.via(incrementFlow).via(doubleFlow).to(Sink.foreach(println)).run
val someOtherFuture: Future[Done] = numSource.viaMat(incrementFlow)(Keep.left).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
val someOtherFuture2: Future[Done] = numSource.viaMat(incrementFlow)(Keep.right).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
numSource
、incrementFlow
和 doubleFlow
的物化值是 NotUsed
,这基本上意味着这些阶段不会向流与流交互。
对于 Sink.foreach
,物化值是一个 Future[Done]
,如果流运行到源耗尽并且所有其他阶段都成功,则它将成功完成 Done
,或者完成包含流失败原因的失败。
我想举出不同的例子来理解 Keep.left 或 Keep.right
的工作我有以下代码:
val numSource = Source(1 to 10)
val numSource = Source(1 to 10)
val incrementFlow = Flow[Int].map(x => x +1)
val doubleFlow = Flow[Int].map(x => x * 10)
val someFuture: NotUsed = numSource.via(incrementFlow).via(doubleFlow).to(Sink.foreach(println)).run
val someOtherFuture: Future[Done] = numSource.viaMat(incrementFlow)(Keep.left).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
val someOtherFuture2: Future[Done] = numSource.viaMat(incrementFlow)(Keep.right).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
我假设 someOtherFuture viaMat(incrementFlow)(Keep.left)
将忽略元素的增量,因为我使用的是源的物化值(而不是结果 = Keep.right),图表的结果将等于 num * 10。
但是所有 3 行都给我返回相同的结果:
//20,30,40 .. 110
我在这里缺少什么?我已经检查了文档并尝试实现简单的示例,但看起来我错了具体化的想法。 或者可能是因为我正在使用序列图 w/o 任何接收两个输入的合并流?
最好将物化值理解为流外代码的一种方式(无论是 运行 还是 completed/failed)以某种方式与流交互:它是 return 运行 流的值,我们可以在您发布的代码中看到 Keep.right
/Keep.left
的效果:
val someFuture: NotUsed = numSource.via(incrementFlow).via(doubleFlow).to(Sink.foreach(println)).run
val someOtherFuture: Future[Done] = numSource.viaMat(incrementFlow)(Keep.left).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
val someOtherFuture2: Future[Done] = numSource.viaMat(incrementFlow)(Keep.right).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
numSource
、incrementFlow
和 doubleFlow
的物化值是 NotUsed
,这基本上意味着这些阶段不会向流与流交互。
对于 Sink.foreach
,物化值是一个 Future[Done]
,如果流运行到源耗尽并且所有其他阶段都成功,则它将成功完成 Done
,或者完成包含流失败原因的失败。