如何优雅地使用 ZIO 并行执行多个效果
How to elegantly perform multiple effects in parallel with ZIO
我知道我可以使用
import zio.Task
def zip3Par[A, B, C](a: Task[A], b: Task[B], c: Task[C]): Task[(A, B, C)] =
a.zipPar(b).zipWithPar(c) { case ((a, b), c) => (a, b, c) }
def zip4Par[A, B, C, D](a: Task[A], b: Task[B], c: Task[C], d: Task[D]): Task[(A, B, C, D)] =
zip3Par(a, b, c).zipWithPar(d) { case ((a, b, c), d) => (a, b, c, d) }
并行执行 3 或 4 个任务,但我不知道是否有更优雅的解决方案?
您可以将 ZIO.collectAllPar
与任务列表一起使用:
def collectTasks(tasks: Task[Int]*):Task[List[Int]] = ZIO.collectAllPar(tasks)
然后你可以这样使用它:
val t1 = Task.effect{
Thread.sleep(100)
println("t1 started")
Thread.sleep(1000)
1
}
val t2 = Task.effect{
println("t2 started")
Thread.sleep(1000)
2
}
val t3 = Task.effect{
println("t3 started")
Thread.sleep(1000)
3
}
(new DefaultRuntime() {}).unsafeRun(collectTasks(t1,t2,t3))
它会同时运行你所有的任务。
如果没有 shapeless,在 Scala 2 中很难实现使用元组而不是列表的通用解决方案。它会在 Scala 3 中发生变化,因为那时它们可以作为异构列表处理。
添加到 Krzysztof Atłasik 的回答中,还有 collectAllParN,它的工作方式类似于 collectAllPAr,但允许您指定要使用的最大光纤数:
val a = Task {
println("t1 started")
Thread.sleep(2000)
println("t1 finished")
1
}
val b = Task {
println("t2 started")
Thread.sleep(1000)
println("t2 finished")
2
}
val c = Task {
println("t3 started")
Thread.sleep(3000)
println("t3 finished")
3
}
val d = Task {
println("t4 started")
Thread.sleep(1000)
println("t4 finished")
4
}
你可以运行这样:
Task.collectAllParN(4)(List(a, b, c, d))
如果您有许多(成百上千个)并行任务,这将特别有用,可以避免溢出和内存错误。继续将要使用的纤程数更改为 2 或 3,然后亲眼看看执行情况如何变化。
并行执行的另一种选择是将任务放在 ZQueue 上,并在您的消费者收到任务后 fork 它们。
另请注意,还有 <&>
组合器。这是 zipPar
的别名。这将产生一个元组,如果你使用 for comprehensions,我建议你看一下 better-monadic-for
,它修复了 for comprehensions
中元组的问题
下面是一个使用 <&>
组合子和 map 的例子:
(t1 <&> t2 <&> t3 <&> t4) map {
case i1 <*> i2 <*> i3 <*> i4 => s"$i1, $i2, $i3, $i4"
}
ZIO.collectAllPar
和 ZIO.collectAllParN
仅在所有 ZIO
具有相同的 return 类型时才有效。那不是问题。
从 ZIO 2.x 开始,您可以使用 foreachPar
来控制与 withParallelism
的并行性。一个简单的例子可能看起来像这样
ZIO.foreachPar(urls)(download).withParallelism(8)
当我们想要运行具有无限制最大纤维数的并行效果时,可以使用withParallelismUnbounded
方法:
ZIO.foreachPar(urls)(download).withParallelismUnbounded
所有以 N 结尾的并行运算符,例如 foreachParN
和 collectAllParN
,现已在 2.x 中弃用。
我知道我可以使用
import zio.Task
def zip3Par[A, B, C](a: Task[A], b: Task[B], c: Task[C]): Task[(A, B, C)] =
a.zipPar(b).zipWithPar(c) { case ((a, b), c) => (a, b, c) }
def zip4Par[A, B, C, D](a: Task[A], b: Task[B], c: Task[C], d: Task[D]): Task[(A, B, C, D)] =
zip3Par(a, b, c).zipWithPar(d) { case ((a, b, c), d) => (a, b, c, d) }
并行执行 3 或 4 个任务,但我不知道是否有更优雅的解决方案?
您可以将 ZIO.collectAllPar
与任务列表一起使用:
def collectTasks(tasks: Task[Int]*):Task[List[Int]] = ZIO.collectAllPar(tasks)
然后你可以这样使用它:
val t1 = Task.effect{
Thread.sleep(100)
println("t1 started")
Thread.sleep(1000)
1
}
val t2 = Task.effect{
println("t2 started")
Thread.sleep(1000)
2
}
val t3 = Task.effect{
println("t3 started")
Thread.sleep(1000)
3
}
(new DefaultRuntime() {}).unsafeRun(collectTasks(t1,t2,t3))
它会同时运行你所有的任务。
如果没有 shapeless,在 Scala 2 中很难实现使用元组而不是列表的通用解决方案。它会在 Scala 3 中发生变化,因为那时它们可以作为异构列表处理。
添加到 Krzysztof Atłasik 的回答中,还有 collectAllParN,它的工作方式类似于 collectAllPAr,但允许您指定要使用的最大光纤数:
val a = Task {
println("t1 started")
Thread.sleep(2000)
println("t1 finished")
1
}
val b = Task {
println("t2 started")
Thread.sleep(1000)
println("t2 finished")
2
}
val c = Task {
println("t3 started")
Thread.sleep(3000)
println("t3 finished")
3
}
val d = Task {
println("t4 started")
Thread.sleep(1000)
println("t4 finished")
4
}
你可以运行这样:
Task.collectAllParN(4)(List(a, b, c, d))
如果您有许多(成百上千个)并行任务,这将特别有用,可以避免溢出和内存错误。继续将要使用的纤程数更改为 2 或 3,然后亲眼看看执行情况如何变化。
并行执行的另一种选择是将任务放在 ZQueue 上,并在您的消费者收到任务后 fork 它们。
另请注意,还有 <&>
组合器。这是 zipPar
的别名。这将产生一个元组,如果你使用 for comprehensions,我建议你看一下 better-monadic-for
,它修复了 for comprehensions
下面是一个使用 <&>
组合子和 map 的例子:
(t1 <&> t2 <&> t3 <&> t4) map {
case i1 <*> i2 <*> i3 <*> i4 => s"$i1, $i2, $i3, $i4"
}
ZIO.collectAllPar
和 ZIO.collectAllParN
仅在所有 ZIO
具有相同的 return 类型时才有效。那不是问题。
从 ZIO 2.x 开始,您可以使用 foreachPar
来控制与 withParallelism
的并行性。一个简单的例子可能看起来像这样
ZIO.foreachPar(urls)(download).withParallelism(8)
当我们想要运行具有无限制最大纤维数的并行效果时,可以使用withParallelismUnbounded
方法:
ZIO.foreachPar(urls)(download).withParallelismUnbounded
所有以 N 结尾的并行运算符,例如 foreachParN
和 collectAllParN
,现已在 2.x 中弃用。