如何结束无限的akka流
How to end an infinite akka stream
我是 Akka Streams 的新手,但有一个案例我想用它从无限源中寻找排列。一个有限来源的简化示例可能如下所示。
val future = Source(1 to 100)
.map { i => if (i % 20 == 0) println(i); i }
.filter(_ == 42)
.runWith(Sink.fold[Int, Int](0)(Keep.right))
这个例子输出:
20
40
60
80
100
我显然对源通过 42
没意见,但我不想在能够得到结果之前耗尽整个流。
val result: Int = Await.result(future, 1.second)
result should be(42)
问题是,当我找到我要找的东西后,我应该如何结束直播?
val future = Source(1 to 100)
.map { i => if (i % 20 == 0) println(i); i }
.filter(_ == 42)
.runWith(Sink.head)
概括为 N
个值,例如大于等于42的10个值,可以用grouped
:
val N = 10
val future : Future[Seq[Int]] =
Source(1 to 100).map { i => if (i % 20 == 0) println(i); i }
.filter(_ >= 42)
.grouped(N)
.runWith(Sink.head)
我是 Akka Streams 的新手,但有一个案例我想用它从无限源中寻找排列。一个有限来源的简化示例可能如下所示。
val future = Source(1 to 100)
.map { i => if (i % 20 == 0) println(i); i }
.filter(_ == 42)
.runWith(Sink.fold[Int, Int](0)(Keep.right))
这个例子输出:
20
40
60
80
100
我显然对源通过 42
没意见,但我不想在能够得到结果之前耗尽整个流。
val result: Int = Await.result(future, 1.second)
result should be(42)
问题是,当我找到我要找的东西后,我应该如何结束直播?
val future = Source(1 to 100)
.map { i => if (i % 20 == 0) println(i); i }
.filter(_ == 42)
.runWith(Sink.head)
概括为 N
个值,例如大于等于42的10个值,可以用grouped
:
val N = 10
val future : Future[Seq[Int]] =
Source(1 to 100).map { i => if (i % 20 == 0) println(i); i }
.filter(_ >= 42)
.grouped(N)
.runWith(Sink.head)