Scala Cats Effects - IO 异步移位 - 它是如何工作的?

Scala Cats Effects - IO Async Shift - How Does it Work?

这是一些使用 IO Monad:

的 Scala 猫代码
import java.util.concurrent.{ExecutorService, Executors}

import cats.effect.IO

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.control.NonFatal

object Program extends App {

  type CallbackType = (Either[Throwable, Unit]) => Unit

  // IO.async[Unit] is like a Future that returns Unit on completion.
  // Unlike a regular Future, it doesn't start to run until unsafeRunSync is called.
  def forkAsync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.async[Unit] { callback: CallbackType =>
    // "callback" is a function that either takes a throwable (Left) or whatever toRun returns (Right).
    println("LalalaAsync: " + Thread.currentThread().getName)
    executor.execute(new Runnable {
      def run(): Unit = {
        val nothing: Unit = toRun() // Note: This line executes the body and returns nothing, which is of type Unit.
        try {
          callback(Right(nothing)) // On success, the callback returns nothing
        } catch {
          case NonFatal(t) => callback(Left(t)) // On failure, it returns an exception
        }
      }
    })
  }

  def forkSync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.apply {
    println("LalalaSync: " + Thread.currentThread().getName)
    executor.execute(new Runnable {
      def run(): Unit = {
        toRun()
      }
    })
  }

  val treadPool: ExecutorService = Executors.newSingleThreadExecutor()
  val mainThread: Thread = Thread.currentThread()

  val Global: ExecutionContextExecutor = ExecutionContext.global

  /*
  Output:
    1 Hello World printed synchronously from Main.main
    LalalaSync: scala-execution-context-global-12
    Hello World printed synchronously from thread pool.pool-1-thread-1
    LalalaAsync: scala-execution-context-global-12
    Hello World printed asynchronously from thread pool.pool-1-thread-1
    2 Hello World printed synchronously from Global .scala-execution-context-global-12
   */
  val program = for {
    _ <- IO {
      println("1 Hello World printed synchronously from Main." + Thread.currentThread().getName) // "main" thread
    }
    _ <- IO.shift(Global) // Shift to Global Execution Context
    _ <- forkSync { () =>
      println("Hello World printed synchronously from thread pool." + Thread.currentThread().getName) // "pool-1-thread-1" thread
    }(treadPool)
    _ <- forkAsync { () =>
      println("Hello World printed asynchronously from thread pool." + Thread.currentThread().getName) // "pool-1-thread-1" thread
    }(treadPool)
    _ <- IO.shift(Global) // Shift to Global Execution Context
    _ <- IO {
      println("2 Hello World printed synchronously from Global ." + Thread.currentThread().getName) // "scala-execution-context-global-13" thread
    }
  } yield ()

  program.unsafeRunSync()
}

您需要向 运行 添加:

libraryDependencies ++= Seq(
  "org.typelevel" %% "cats" % "0.9.0",
  "org.typelevel" %% "cats-effect" % "0.3"
),

到您的 build.sbt 文件。

注意输出:

  /*
  Output:
    1 Hello World printed synchronously from Main.main
    LalalaSync: scala-execution-context-global-12
    Hello World printed synchronously from thread pool.pool-1-thread-1
    LalalaAsync: scala-execution-context-global-12
    Hello World printed asynchronously from thread pool.pool-1-thread-1
    2 Hello World printed synchronously from Global .scala-execution-context-global-12
 */

基本上,我不明白 IO.shift(全球)或 IO.async 是如何工作的。

比如为什么我调用了"forkAsync"之后,如果不调用"IO.shift(Global)",后面的同步IO对象是运行 in "pool-1-thread-1" .另外,这个例子中的 forkAsync 和 forkSync 有什么区别?它们都在 ExecutionContext.global 中启动,然后在 "pool.pool-1-thread-1" 中执行一个 Runnable。

forkAsync 和 forkSync 是在做完全相同的事情还是 forkAsync 在做不同的事情?如果他们在做同样的事情,那么在 IO.async 中包装代码有什么意义?如果他们做的不是同一件事,他们有什么不同?

For example, why is it that after I call "forkAsync", if I don't call "IO.shift(Global)", the subsequent synchronous IO objects are run in "pool-1-thread-1".

更重要的问题是您为什么期望它在全球范围内评估 "subsequent synchronous IO objects"?

IO 内部没有线程池的概念,它不知道 global,所以它不能转移回你的默认线程池,因此你需要确实触发手动换档。

升级到最新版本 1.0.0 并且您在 ContextShift 中还有 evalOn,它将在指定的线程池上执行 IO 操作,然后返回到你的"global",我想这就是你想要的。

Also, what is the difference between forkAsync and forkSync in this example?

您的 forkSync 会触发 Runnable 的执行,但不会等待其完成。这是一场火灾,忘记了。这意味着后续的链式操作不会进行背压。

一些建议:

  1. 升级到最新版本(1.0.0)
  2. 阅读文档:https://typelevel.org/cats-effect/datatypes/io.html