ZStream 忽略并行操作,而是顺序执行

ZStream ignores parallel operation and executes it sequentially instead

以下代码应该并行执行 putStrLn 效果,因为 mapMPar:

val runtime = zio.Runtime.default
val foo = ZIO.sleep(5.second) *> ZIO("foo")
val bar = ZIO("bar")

val k = ZStream.fromEffect(foo) ++ ZStream.fromEffect(bar)
val r = k.mapMPar(3)(x => console.putStrLn(s"Processing `${x}`"))

runtime.unsafeRun(r.runDrain)

但实际上它总是在 bar 之前处理 foo 无论如何。我是不是漏掉了什么或者这是一个错误?

我认为您的示例并没有达到您的期望。 fromEffect 创建一个流,基本上说“我有一个最终会生成单个项目的效果”,然后第一个流在生成该项目之前等待 5 秒。由于流的性质,++concat 运算符是惰性的,这意味着它无法开始处理,直到所有项目都从第一个流中 consumed (这不会发生 5 秒)。结果你的流真的看起来像这样:

--5s--(foo)(bar)|

而不是我想象的你认为它应该喜欢的:

(bar)--5s--(foo)|

也许最好的思考方式是,对于大多数流,你有一条单车道高速公路,一次只能移动一个项目,所有后续项目都被排在首位的项目阻挡.一旦你撞上 Par 障碍物,你就会打开多条车道,这意味着速度更快的东西可能会超车。

因此,我可以通过执行以下操作来实现所需的行为:

val k = ZStream("foo", "bar")
val r = k.mapMPar(3)(x => putStrLn(s"$x:enter") *> (ZIO.sleep(5.second) *> putStrLn(s"Processing `${x}`")) <* putStrLn(s"$x:exit"))

r.runDrain

或者写的稍微紧凑一点:

ZStream("foo", "bar").mapMPar(3)(x => for {
  _ <- putStrLn(s"$x:enter")
  _ <- ZIO.sleep(5.seconds) *> putStrLn(s"Processing `$x`")
  _ <- putStrLn(s"$x:exit")
} yield ()).runDrain