scala.concurrent.blocking - 它实际上做了什么?

scala.concurrent.blocking - what does it actually do?

我花了一段时间学习 Scala 执行上下文、底层线程模型和并发性等主题。你能解释一下 scala.concurrent.blocking "adjust the runtime behavior""may improve performance or avoid deadlocks" 是如何按照 scaladoc?

the documentation 中,它作为一种等待 api 的方法出现,但未实现 Awaitable。 (也许还应该包装长 运行 计算?)。

它的实际作用是什么?

Following through the source 不会轻易泄露其秘密。

blocking 旨在向 ExecutionContext 暗示所包含的代码正在阻塞并可能导致线程饥饿。这将使线程池有机会产生新线程以防止饥饿。这就是"adjust the runtime behavior"的意思。但这并不神奇,并且不适用于每个 ExecutionContext.

考虑这个例子:

import scala.concurrent._
val ec = scala.concurrent.ExecutionContext.Implicits.global

(0 to 100) foreach { n =>
    Future {
        println("starting Future: " + n)
        blocking { Thread.sleep(3000) }
        println("ending Future: " + n)
    }(ec)
}

这是使用默认全局 ExecutionContext。 运行 代码原样,您会注意到 100 个 Future 都立即执行,但是如果您删除 blocking,它们一次只会执行几个。默认 ExecutionContext 将通过生成新线程对阻塞调用(标记为如此)做出反应,因此不会因 运行ning Futures.

而超载

现在看这个有 4 个线程的固定池的例子:

import java.util.concurrent.Executors
val executorService = Executors.newFixedThreadPool(4)
val ec = ExecutionContext.fromExecutorService(executorService)

(0 to 100) foreach { n =>
    Future {
        println("starting Future: " + n)
        blocking { Thread.sleep(3000) }
        println("ending Future: " + n)
    }(ec)
}

这个 ExecutionContext 不是为处理生成新线程而构建的,所以即使我的阻塞代码被 blocking 包围,你可以看到它仍然最多只执行 4 Futures一次。这就是我们说 "may improve performance or avoid deadlocks" 的原因——不能保证。正如我们在后者ExecutionContext中看到的那样,根本无法保证。

它是如何工作的?作为链接,blocking 执行此代码:

BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission)

BlockContext.current 从当前线程中检索 BlockContext,已见 hereBlockContext 通常只是混合了 BlockContext 特征的 Thread。如源代码所示,它要么存储在 ThreadLocal 中,要么在那里找不到, 它是当前线程的模式匹配。如果当前线程不是 BlockContext,则使用 DefaultBlockContext

接下来,blockOn 在当前 BlockContext 上调用。 blockOnBlockContext 中的抽象方法,因此它的实现取决于 ExecutionContext 如何处理它。如果我们查看 implementation for DefaultBlockContext(当当前线程不是 BlockContext 时),我们会看到 blockOn 实际上什么也不做。所以在非 BlockContext 中使用 blocking 意味着根本没有做任何特别的事情,代码是 运行 原样,没有副作用。

BlockContext 线程怎么样?例如,在 global 上下文中,看到 hereblockOn 做的更多。深入挖掘,您可以看到它在幕后使用 ForkJoinPool,在同一代码段中定义的 DefaultThreadFactory 用于在 ForkJoinPool 中生成新线程。如果 BlockContext(线程)不执行 blockOnForkJoinPool 不知道您正在阻塞,也不会尝试生成更多线程作为响应。

Scala 的 Await 也使用 blocking 实现。