猫效应和异步 IO 细节

Cats-effect and asynchronous IO specifics

这几天我一直在研究猫效应和 IO。而且我觉得我对这种效果有一些误解,或者只是我错过了它的意义。

  1. 首先 - 如果 IO 可以替代 Scala 的 Future,我们如何创建异步 IO 任务?使用 IO.shift?使用 IO.asyncIO.delay 是同步还是异步?我们可以用这样的代码 Async[F].delay(...) 来创建一个通用的异步任务吗?或者当我们用 unsafeToAsyncunsafeToFuture?
  2. 调用 IO 时发生异步
  3. 猫效应中的异步和并发有什么意义?为什么要分开?
  4. IO是绿色线程吗?如果是,为什么 cats-effect 中有一个 Fiber 对象?据我了解,Fiber 是绿色线程,但文档声称我们可以将 IOs 视为绿色线程。

我希望能对此做出一些澄清,因为我未能理解这些方面的猫效应文档,而且互联网也没有太大帮助...

if IO can replace Scala's Future, how can we create an async IO task

首先,我们需要澄清什么是异步任务。通常 async 表示 "does not block the OS thread",但由于您提到 Future,它有点模糊。说,如果我写:

Future { (1 to 1000000).foreach(println) }

它不会是 async,因为它是一个阻塞循环和阻塞输出,但它可能会在不同的 OS 线程上执行,由隐式管理执行上下文。等效的猫效应代码为:

for {
  _ <- IO.shift
  _ <- IO.delay { (1 to 1000000).foreach(println) }
} yield ()

(这不是较短的版本)

所以,

  • IO.shift 用于改变线程/线程池。 Future 对每个操作都执行此操作,但在性能方面并非免费。
  • IO.delay { ... } (a.k.a. IO { ... }) NOT 做任何异步并做 NOT 切换线程。它用于从同步副作用 APIs
  • 创建简单的 IO

现在,让我们回到真正的异步。这里要理解的是:

每个异步计算都可以表示为一个接受回调的函数。

无论您使用的是 API returns Future 还是 Java 的 CompletableFuture,或者像 NIO CompletionHandler 这样的东西,这一切都可以转换为回调。这就是 IO.async 的用途:您可以将任何采用回调 的 函数转换为 IO。如果是这样的话:

for {
  _ <- IO.async { ... }
  _ <- IO(println("Done"))
} yield ()

Done 只会在(并且如果)... 中的计算回调时打印。您可以将其视为阻塞绿色线程,而不是 OS 线程。

所以,

  • IO.async 用于将任何 已经异步的 计算转换为 IO.
  • IO.delay 用于将任何 完全同步的 计算转换为 IO.
  • 具有真正异步计算的代码表现得像阻塞绿色线程。

使用 Futures 最接近的类比是创建一个 scala.concurrent.Promise 并返回 p.future


Or async happens when we call IO with unsafeToAsync or unsafeToFuture?

有点。对于 IO 什么都不会发生 ,除非您调用其中之一(或使用 IOApp)。但是 IO 不保证您会在不同的 OS 线程上执行,甚至是异步执行,除非您使用 IO.shiftIO.async.

明确要求

您可以保证线程随时切换,例如(IO.shift *> myIO).unsafeRunAsyncAndForget()。这是可能的,因为 myIO 在被要求之前不会被执行,无论您是 val myIO 还是 def myIO.

但是,您无法神奇地将阻塞操作转换为非阻塞操作。 FutureIO.

都不可能做到这一点

What's the point of Async and Concurrent in cats-effect? Why they are separated?

AsyncConcurrent(以及 Sync)是 class 类型。它们的设计是为了让程序员可以避免被锁定到 cats.effect.IO,并且可以给你 API 支持你选择的任何东西,比如 monix Task 或 Scalaz 8 ZIO,甚至是 monad 转换器类型,比如 OptionT[Task, *something*]。诸如 fs2、monix 和 http4s 之类的库利用它们来为您提供更多的使用选择。

ConcurrentAsync 之上添加了额外的东西,其中最重要的是 .cancelable.start。这些与 Future 没有直接类比,因为它根本不支持取消。

.cancelable.async 的一个版本,它还允许您指定一些逻辑来取消您正在包装的操作。一个常见的例子是网络请求——如果您不再对结果感兴趣,您可以中止它们而不用等待服务器响应,并且不要浪费任何套接字或处理时间来读取响应。你可能永远不会直接使用它,但它有它的位置。

但如果不能取消,可取消操作有什么用呢?此处的关键观察是您无法从自身内部取消操作。必须由其他人做出该决定,并且这将 同时 与操作本身一起发生(这是类型 class 得名的地方)。这就是 .start 的用武之地。简而言之,

.start 是绿色线程的明确分支。

执行 someIO.start 类似于执行 val t = new Thread(someRunnable); t.start(),只不过它现在是绿色的。 Fiber 本质上是 Thread API 的精简版:你可以做 .join,就像 Thread#join(),但它不会阻止 [=181] =]线程;和 .cancel,这是 .interrupt().

的安全版本

请注意,还有其他方法可以分叉绿色线程。比如做并行操作:

val ids: List[Int] = List.range(1, 1000)
def processId(id: Int): IO[Unit] = ???
val processAll: IO[Unit] = ids.parTraverse_(processId)

会将所有 ID 分叉处理到绿色线程,然后将它们全部加入。或者使用 .race:

val fetchFromS3: IO[String] = ???
val fetchFromOtherNode: IO[String] = ???

val fetchWhateverIsFaster = IO.race(fetchFromS3, fetchFromOtherNode).map(_.merge)

将并行执行提取,为您提供第一个完成的结果并自动取消较慢的提取。因此,.start 和使用 Fiber 并不是分叉更多绿色线程的唯一方法,只是最明确的方法。答案是:

Is IO a green thread? If yes, why is there a Fiber object in cats-effect? As I understand the Fiber is the green thread, but docs claim we can think of IOs as green threads.

  • IO 就像一个绿色线程,这意味着您可以并行地拥有很多 运行 而不会增加 OS 线程的开销,并且 for 中的代码-comprehension 的行为就好像它正在阻塞要计算的结果。

  • Fiber是一个控制绿色线程显式分叉(等待完成或取消)的工具。