如何优雅地使用 ZIO 并行执行多个效果

How to elegantly perform multiple effects in parallel with ZIO

我知道我可以使用

import zio.Task

def zip3Par[A, B, C](a: Task[A], b: Task[B], c: Task[C]): Task[(A, B, C)] =
  a.zipPar(b).zipWithPar(c) { case ((a, b), c) => (a, b, c) }

def zip4Par[A, B, C, D](a: Task[A], b: Task[B], c: Task[C], d: Task[D]): Task[(A, B, C, D)] =
  zip3Par(a, b, c).zipWithPar(d) { case ((a, b, c), d) => (a, b, c, d) }

并行执行 3 或 4 个任务,但我不知道是否有更优雅的解决方案?

您可以将 ZIO.collectAllPar 与任务列表一起使用:

def collectTasks(tasks: Task[Int]*):Task[List[Int]] = ZIO.collectAllPar(tasks)

然后你可以这样使用它:

val t1 = Task.effect{
  Thread.sleep(100)
  println("t1 started")
  Thread.sleep(1000)
  1
}

val t2 = Task.effect{
  println("t2 started")
  Thread.sleep(1000)
  2
}


val t3 = Task.effect{
  println("t3 started")
  Thread.sleep(1000)
  3
}

(new DefaultRuntime() {}).unsafeRun(collectTasks(t1,t2,t3))

它会同时运行你所有的任务。

如果没有 shapeless,在 Scala 2 中很难实现使用元组而不是列表的通用解决方案。它会在 Scala 3 中发生变化,因为那时它们可以作为异构列表处理。

添加到 Krzysztof Atłasik 的回答中,还有 collectAllParN,它的工作方式类似于 collectAllPAr,但允许您指定要使用的最大光纤数:

 val a = Task {
      println("t1 started")
      Thread.sleep(2000)
      println("t1 finished")
      1
    }
    val b = Task {
      println("t2 started")
      Thread.sleep(1000)
      println("t2 finished")
      2
    }
    val c = Task {
      println("t3 started")
      Thread.sleep(3000)
      println("t3 finished")
      3
    }
    val d = Task {
      println("t4 started")
      Thread.sleep(1000)
      println("t4 finished")
      4
    }

你可以运行这样:

 Task.collectAllParN(4)(List(a, b, c, d))

如果您有许多(成百上千个)并行任务,这将特别有用,可以避免溢出和内存错误。继续将要使用的纤程数更改为 2 或 3,然后亲眼看看执行情况如何变化。

并行执行的另一种选择是将任务放在 ZQueue 上,并在您的消费者收到任务后 fork 它们。

另请注意,还有 <&> 组合器。这是 zipPar 的别名。这将产生一个元组,如果你使用 for comprehensions,我建议你看一下 better-monadic-for,它修复了 for comprehensions

中元组的问题

下面是一个使用 <&> 组合子和 map 的例子:

(t1 <&> t2 <&> t3 <&> t4) map { case i1 <*> i2 <*> i3 <*> i4 => s"$i1, $i2, $i3, $i4" }

ZIO.collectAllParZIO.collectAllParN 仅在所有 ZIO 具有相同的 return 类型时才有效。那不是问题。

从 ZIO 2.x 开始,您可以使用 foreachPar 来控制与 withParallelism 的并行性。一个简单的例子可能看起来像这样

ZIO.foreachPar(urls)(download).withParallelism(8)

当我们想要运行具有无限制最大纤维数的并行效果时,可以使用withParallelismUnbounded方法:

ZIO.foreachPar(urls)(download).withParallelismUnbounded

所有以 N 结尾的并行运算符,例如 foreachParNcollectAllParN,现已在 2.x 中弃用。