Akka Reactive Streams 总是落后一条消息

Akka Reactive Streams always one message behind

出于某种原因,我的 Akka 流总是在 "emitting"(?) 第一条消息之前等待第二条消息。

下面是一些演示我的问题的示例代码。

val rx = Source((1 to 100).toStream.map { t =>
  Thread.sleep(1000)
  println(s"doing $t")
  t
})
rx.runForeach(println)

产量输出:

doing 1
doing 2
1
doing 3
2
doing 4
3
doing 5
4
doing 6
5
...

我想要的:

doing 1
1
doing 2
2
doing 3
3
doing 4
4
doing 5
5
doing 6
6
...

您正在使用 .toStream(),这意味着整个集合都是惰性的。没有它,您的输出将首先是一百个 "doing",然后是从 1 到 100 的数字。但是,Stream 仅计算第一个元素,它给出 "doing 1" 输出,这是它停止了。需要时将评估下一个元素。

现在,我在文档中找不到任何关于此的详细信息,但我认为 runForeach 有一个实现,它在调用当前元素的函数之前获取下一个元素。因此,在对元素 n 调用 println 之前,它首先检查元素 n+1(例如检查它是否存在),这会导致 "doing n+1" 消息。然后它在当前元素上执行你的 println 函数,导致消息 "n" 。

runForeach 之前,您真的需要 map() 吗?我的意思是,您需要对数据进行两次旅行吗?我知道我可能说的很明显,但是如果您像这样一次性处理数据:

val rx = Source((1 to 100).toStream)
rx.runForeach({ t =>
  Thread.sleep(1000)
  println(s"doing $t")
  // do something with 't', which is now equal to what "doing" says
})

那么你就没有什么时候求值的问题了。

您的代码现在设置的方式,您正在完全转换 Source,然后才允许它开始向下游发射元素。通过删除表示源的数字范围内的 toStream,您可以清楚地看到该行为(如@slouc 所述)。如果这样做,您将看到 Source 在开始响应下游需求之前首先完全转换。如果你真的想 运行 一个 Source 变成一个 Sink 并且在中间有一个转换步骤,那么你可以尝试这样构造:

val transform =
  Flow[Int].map{ t =>
    Thread.sleep(1000)
    println(s"doing $t")
    t
  }

Source((1 to 100).toStream).
  via(transform ).
  to(Sink.foreach(println)).
  run

如果您进行了该更改,那么您将获得预期的效果,即在开始处理下一个元素之前,流向下游的元素在流程中一直得到处理。