根据条件停止 Akka Streams 的正确方法
Proper way to stop Akka Streams on condition
我已经成功地使用 FileIO 来流式传输文件的内容,为每一行计算一些转换和 aggregate/reduce 结果。
现在我有一个非常具体的用例,我想在达到条件时停止流,这样就没有必要读取整个文件,但进程会尽快完成。实现此目标的推荐方法是什么?
如果停止条件是"on the outside of the stream"
有一个名为 KillSwitch
的高级构建块,您可以使用它来执行此操作:http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html 一旦通知终止开关,流就会关闭。
它有 abort(reason)
/ shutdown
等方法,请参阅此处 API:http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html
参考文档在这里:http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html#kill-switch-scala
示例用法为:
val countingSrc = Source(Stream.from(1)).delay(1.second,
DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]
val (killSwitch, last) = countingSrc
.viaMat(KillSwitches.single)(Keep.right)
.toMat(lastSnk)(Keep.both)
.run()
doSomethingElse()
killSwitch.shutdown()
Await.result(last, 1.second) shouldBe 2
如果停止条件在流内
您可以使用 takeWhile
来真正表达任何条件,尽管有时 take
或 limit
可能也足够 "take 10 lnes"。
如果您的逻辑非常先进,您可以使用 statefulMapConcat
构建一个特殊的阶段来处理该特殊逻辑,允许从字面上表达任何内容 - 这样您就可以随时完成流 "from the inside".
我已经成功地使用 FileIO 来流式传输文件的内容,为每一行计算一些转换和 aggregate/reduce 结果。
现在我有一个非常具体的用例,我想在达到条件时停止流,这样就没有必要读取整个文件,但进程会尽快完成。实现此目标的推荐方法是什么?
如果停止条件是"on the outside of the stream"
有一个名为 KillSwitch
的高级构建块,您可以使用它来执行此操作:http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html 一旦通知终止开关,流就会关闭。
它有 abort(reason)
/ shutdown
等方法,请参阅此处 API:http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html
参考文档在这里:http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html#kill-switch-scala
示例用法为:
val countingSrc = Source(Stream.from(1)).delay(1.second,
DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]
val (killSwitch, last) = countingSrc
.viaMat(KillSwitches.single)(Keep.right)
.toMat(lastSnk)(Keep.both)
.run()
doSomethingElse()
killSwitch.shutdown()
Await.result(last, 1.second) shouldBe 2
如果停止条件在流内
您可以使用 takeWhile
来真正表达任何条件,尽管有时 take
或 limit
可能也足够 "take 10 lnes"。
如果您的逻辑非常先进,您可以使用 statefulMapConcat
构建一个特殊的阶段来处理该特殊逻辑,允许从字面上表达任何内容 - 这样您就可以随时完成流 "from the inside".