如何结束无限的akka​​流

How to end an infinite akka stream

我是 Akka Streams 的新手,但有一个案例我想用它从无限源中寻找排列。一个有限来源的简化示例可能如下所示。

val future = Source(1 to 100)
    .map { i => if (i % 20 == 0) println(i); i }
    .filter(_ == 42)
    .runWith(Sink.fold[Int, Int](0)(Keep.right))

这个例子输出:

20
40
60
80
100

我显然对源通过 42 没意见,但我不想在能够得到结果之前耗尽整个流。

val result: Int = Await.result(future, 1.second)
result should be(42)

问题是,当我找到我要找的东西后,我应该如何结束直播?

val future = Source(1 to 100)
    .map { i => if (i % 20 == 0) println(i); i }
    .filter(_ == 42)
    .runWith(Sink.head)

概括为 N 个值,例如大于等于42的10个值,可以用grouped:

val N = 10

val future : Future[Seq[Int]] = 
  Source(1 to 100).map { i => if (i % 20 == 0) println(i); i }
                  .filter(_ >= 42)
                  .grouped(N)
                  .runWith(Sink.head)