Akka Streams 按源单累加

Akka Streams Accumulate by Source Single

我正在尝试使用 akka 流来积累数据并作为批处理使用:

val myFlow: Flow[String, Unit, NotUsed] = Flow[String].collect {
    case record =>
      println(record)
      Future(record)
  }.mapAsync(1)(x => x).groupedWithin(3, 30 seconds)
    .mapAsync(10)(records =>
      someBatchOperation(records))
    )

我对上面代码的期望是在 3 条记录准备好或 30 秒过去之前不进行任何操作。但是当我用 Source.single("test") 发送一些请求时,它正在处理这条记录而不等待其他人或 30 秒。

如何使用此流程等待其他记录到来或空闲 30 秒?

记录来自一个一个的 API 请求,我正尝试在流程中累积这些数据,例如:

Source.single(apiRecord).via(myFlow).runWith(Sink.ignore)

它确实做到了。让我们考虑以下几点:

Source(Stream.from(1)).throttle(1, 400 milli).groupedWithin(3, 1 seconds).runWith(Sink.foreach(i => println(s"Done with ${i} ${System.currentTimeMillis}")))

在我终止进程之前,该行的输出是:

Done with Vector(1, 2, 3) 1599495716345
Done with Vector(4, 5) 1599495717348
Done with Vector(6, 7, 8) 1599495718330
Done with Vector(9, 10) 1599495719350
Done with Vector(11, 12, 13) 1599495720330
Done with Vector(14, 15) 1599495721350
Done with Vector(16, 17, 18) 1599495722328
Done with Vector(19, 20) 1599495723348
Done with Vector(21, 22, 23) 1599495724330

正如我们所见,每次发射 2 个元素到 3 个元素之间的时间差超过 1 秒。这是有道理的,因为在 1 秒延迟之后,到达打印行需要更多时间。

我们每次发出 2 个元素到 3 个元素之间的差异不到一秒。因为它有足够的元素继续下去。

为什么它在您的示例中不起作用?

当您使用 Source.single 时,源会为其自身添加一个完整的阶段。您可以在 source code of akka 中看到它。 在这种情况下,groupedWithin 流知道它不会再获取任何元素,因此它可以发出 "test" 字符串。为了实际测试这个流,尝试创建一个更大的流。

当使用 Source(1 到 10) 时,它实际上转换为 Source.Single,这也完成了流。我们可以看到here.