对于 MongoDB 的多个并行请求,应选择 Kotlin 协程中的哪个调度程序?

Which dispatcher in Kotlin coroutines should be chosen for multiple parallel requests to MongoDB?

有一个参数列表,每个参数都是 MongoDB 查询的输入。 查询可能不同,但为了简单起见,我们只保留一个封装在 callMongoReactive.

fun callMongoReactive(p: Param): Mono<Result>{
    // ...
}

fun queryInParallel(parameters: List<Param>): List<Result> =
    parameters
        .map { async { mongo.findSomething(it).awaitSingle() } }
        .awaitAll()

假设参数列表大小不大于 20。

使这些异步请求运行同时并行的协程调度程序使用的最佳策略是什么?

是否应该创建自定义调度程序(具有专用线程池)或者是否有针对此类情况的标准方法?

如果您想要一个非常笼统的答案,我会说您可能不想关心这个级别。你不必。

在这种特定情况下,原来的 API 是反应式的(立即 returns Mono),这意味着实际工作不会在调用 [= 的线程上执行12=]/findSomething,但是 mongoDB 决定使用什么线程池来支持这个 Mono。这意味着在这种情况下调度程序的选择并不重要。

所以特别是在这种情况下,我会选择最简单的选项:不要选择。使用 coroutineScope 并公开一个 suspend 函数,以便调用者决定协程上下文(包括调度程序):

suspend fun queryInParallel(parameters: List<Param>): List<Result> = coroutineScope {
    parameters
        .map { async { mongo.findSomething(it).awaitSingle() } }
        .awaitAll()
}

这是“工作的并行分解”的常用成语。我会说这是最常见的。

What is the optimal strategy of coroutine dispatchers usage that makes these async requests run in parallel simultaneously?

值得注意的是,上面的成语表示并发,但是async的主体是否会在中运行并行与否取决于调用者选择的调度程序。

(重申一下,调度程序只影响那些异步的主体,在这种特定情况下,它们不会使用线程那么多,因为它们无论如何都会调用 non-blocking 方法,所以它真的不会'没关系。)

现在,在 确实 重要的情况下,任何由 1 个以上线程支持的调度程序都将允许并行处理。有几个现有的调度程序可能很有用,无需创建您自己的线程池。 Dispatchers.Default 的线程数与其 运行 所在机器的内核数相适应,因此非常适合 CPU-bound 工作。 Dispatchers.IO 根据需要缩放线程数,如果您有很多阻塞 IO 并想避免饥饿,这很有用。

您还可以在任何调度程序上使用 limitedParallelism 以仅使用有限数量的线程来查看它,这在某些您不想创建额外线程池的情况下可能很有用,但您确实希望限制可用线程的数量超过 built-in 调度程序提供的数量。

如果您想隔离系统的某些部分以防某个子系统行为不当和线程不足,那么创建自定义线程池可能会很有趣。不过,它确实有内存开销,因为您正在创建更多线程。