我如何才能 运行 不阻塞主线程但在主线程失败时抛出异常的异步 IO 操作?

How can I run an Async IO operation that doesn't block the main thread but throws an exception in the main thread if it fails?

我正在研究 Cats 以完成上述任务。我试着把下面的例子写到

  1. 运行不阻塞主线程的IO操作
  2. 如果失败则在主线程中抛出异常
import cats.effect.{IO, Async}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

val apiCall = Future.successful("I come from the Future!")

val ioa: IO[String] =
  Async[IO].async { cb =>
    import scala.util.{Failure, Success}
    Thread.sleep(1000)
    throw new RuntimeException

    apiCall.onComplete {
      case Success(value) => cb(Right(value))
      case Failure(error) => cb(Left(error))
    }
 }

ioa.unsafeRunAsync(result => result match {
    case Left(result) => throw result
    case Right(_) =>
})

然而,它似乎在这两方面都失败了。它阻塞主线程,只在子线程中抛出异常。我想做的事情可行吗?

你可以这样做:

val longRunningOperation: IO[Unit] = ...
val app: IO[Unit] = ...

val program: IO[Unit] =
  IO.racePair(app, longRunningOperation).flatMap {
    case Left((_, fiberLongRunning)) =>  fiberLongRunning.cancel.void
    case Right((fiberApp, _)) => fiberApp.join.void
  }

override def run(args: List[String]): IO[ExitCode] =
  program.as(ExitCode.Success)

racePair 将 运行 两个 IOs 并发,如果两者之一失败,那么它将 cancel另一个。
现在,如果 longRunningOperation 没有错误地完成,那么等待 app;但是,如果 app 是第一个完成的,那么我决定 cancel longRunningOperation (你当然可以更改).


感谢 Adam Rosien 指出,如果您想 join 在这两种情况下最好是:

val program: IO[Unit] = (app, longRunningOperation).parTupled.void

你可以看到代码运行ning然后玩玩here