即使一个程序遇到错误也会完成的 parMapN
parMapN that finishes even when one program encounters an error
使用parMapN
,可以并行执行多个IO
,像这样:
import cats.implicits._
import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val ioA = IO(for(i <- 1 to 100) { println(s"A$i"); Thread.sleep(100) })
val ioB = IO(for(i <- 1 to 100) { println(s"B$i"); Thread.sleep(100) })
val ioC = IO(for(i <- 1 to 100) { println(s"C$i"); Thread.sleep(100) })
val program = (ioA, ioB, ioC).parMapN { (_, _, _) => () }
program.unsafeRunSync()
示例输出:
A1
C1
B1
A2
C2
B2
A3
C3
B3
A4
C4
B4
A5
B5
C5
A6
B6
C6
A7
B7
C7
A8
...
根据 documentation,如果 IO
中的任何一个以失败告终,未完成的任务将被取消。更改此机制的最佳方法是什么,以便所有 IO
无论如何都完成?
在我的例子中,一些 IO
s 没有 return 任何东西(IO[Unit]
),我仍然想确保一切运行直到它完成或遇到错误。
好吧,我在发布问题后不久就找到了一个可能的答案。不确定这是否是处理此问题的最佳方法,但像这样定义我的 IO
s 对我有用:
val ioA = IO(for(i <- 1 to 100) { println(s"A$i"); Thread.sleep(100) }).attempt
val ioB = IO(for(i <- 1 to 100) { println(s"B$i"); Thread.sleep(100) }).attempt
val ioC = IO(for(i <- 1 to 100) { println(s"C$i"); Thread.sleep(100) }).attempt
据我所知,您的示例代码没有引发任何错误。
所以你应该有像下面这样的代码才能看到这个功能:
val ioA = IO(for(i <- 1 to 100) { println(s"A$i"); Thread.sleep(100) })
val ioB = IO.raiseError[Unit](new Exception("boom"))
val ioC = IO(for(i <- 1 to 100) { println(s"C$i"); Thread.sleep(100) })
而且看起来不太好,因为 attempt
函数会将内部结构更改为 IO[Either[_,_]]
,这不是您的意图,是吗?
使用parMapN
,可以并行执行多个IO
,像这样:
import cats.implicits._
import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val ioA = IO(for(i <- 1 to 100) { println(s"A$i"); Thread.sleep(100) })
val ioB = IO(for(i <- 1 to 100) { println(s"B$i"); Thread.sleep(100) })
val ioC = IO(for(i <- 1 to 100) { println(s"C$i"); Thread.sleep(100) })
val program = (ioA, ioB, ioC).parMapN { (_, _, _) => () }
program.unsafeRunSync()
示例输出:
A1
C1
B1
A2
C2
B2
A3
C3
B3
A4
C4
B4
A5
B5
C5
A6
B6
C6
A7
B7
C7
A8
...
根据 documentation,如果 IO
中的任何一个以失败告终,未完成的任务将被取消。更改此机制的最佳方法是什么,以便所有 IO
无论如何都完成?
在我的例子中,一些 IO
s 没有 return 任何东西(IO[Unit]
),我仍然想确保一切运行直到它完成或遇到错误。
好吧,我在发布问题后不久就找到了一个可能的答案。不确定这是否是处理此问题的最佳方法,但像这样定义我的 IO
s 对我有用:
val ioA = IO(for(i <- 1 to 100) { println(s"A$i"); Thread.sleep(100) }).attempt
val ioB = IO(for(i <- 1 to 100) { println(s"B$i"); Thread.sleep(100) }).attempt
val ioC = IO(for(i <- 1 to 100) { println(s"C$i"); Thread.sleep(100) }).attempt
据我所知,您的示例代码没有引发任何错误。 所以你应该有像下面这样的代码才能看到这个功能:
val ioA = IO(for(i <- 1 to 100) { println(s"A$i"); Thread.sleep(100) })
val ioB = IO.raiseError[Unit](new Exception("boom"))
val ioC = IO(for(i <- 1 to 100) { println(s"C$i"); Thread.sleep(100) })
而且看起来不太好,因为 attempt
函数会将内部结构更改为 IO[Either[_,_]]
,这不是您的意图,是吗?