使用 Kotlin 协程的多线程

Multithreading using Kotlin Coroutines

我正在试验 Kotlin Coroutines 并有以下代码:

fun main(args: Array<String>) = runBlocking {
    val cores = Runtime.getRuntime().availableProcessors()
    println("number of cores: $cores")

    val jobs = List(10) {
        async(CommonPool) {
            delay(100)
            println("async #$it on thread ${Thread.currentThread().name}")
        }
    }
    jobs.forEach { it.join() }
}

这是我的输出:

number of cores: 4
async number:0 on thread ForkJoinPool.commonPool-worker-2
async number:2 on thread ForkJoinPool.commonPool-worker-3
async number:3 on thread ForkJoinPool.commonPool-worker-3
async number:4 on thread ForkJoinPool.commonPool-worker-3
async number:5 on thread ForkJoinPool.commonPool-worker-3
async number:1 on thread ForkJoinPool.commonPool-worker-1
async number:7 on thread ForkJoinPool.commonPool-worker-3
async number:6 on thread ForkJoinPool.commonPool-worker-2
async number:9 on thread ForkJoinPool.commonPool-worker-3
async number:8 on thread ForkJoinPool.commonPool-worker-1

根据 Roman Elizarov 对另一个协程相关问题的

"The launch just creates new coroutine, while CommonPool dispatches coroutines to a ForkJoinPool.commonPool() which does use multiple threads and thus executes on multiple CPUs in this example."

根据Java 8 documentation:

"For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors."

为什么只使用了 3 个工作线程?即使我将异步任务的数量增加到 1000+,也有相同的 3 个工作线程。

我的配置: Mac/High 双核 Sierra cpu(Hyper-threading,因此有 4 个可见内核),Kotlin 1.2,kotlinx-coroutines-core:0.19.3 和 JVM 1.8

如果您查看 CommonPool 的实现,您会注意到它正在 java.util.concurrent.ForkJoinPool 或具有以下大小的线程池上工作:

(Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)

使用 4 个可用处理器,这将导致 3 这回答了为什么您看到 3 个工作线程。

ForkJoinPool-size 可以确定如下(将相同):

ForkJoinPool.commonPool().parallelism

如果您使用协程版本>=1.0

,请参阅

从 Coroutines 1.0 开始,代码看起来会略有不同,因为 CommonPool 现在将替换为 Dispatchers.Default

fun main(args: Array<String>) = runBlocking {
    val cores = Runtime.getRuntime().availableProcessors()
    println("number of cores: $cores")

    val jobs = List(10) {
        async(Dispatchers.Default) {
            delay(100)
            println("async #$it on thread ${Thread.currentThread().name}")
        }
    }
    jobs.forEach { it.join() }
}

此外,您现在将获得以下内容:

It is backed by a shared pool of threads on JVM. By default, the maximal number of threads used by this dispatcher is equal to the number CPU cores, but is at least two.