Kotlin:有没有工具可以让我在执行挂起函数时控制并行度?

Kotlin: Is there a tool that allows me to control parallelism when executing suspend functions?

我正在尝试多次执行某个挂起函数,同时执行的挂起函数不会超过 N 个。

对于熟悉 Akka 和 Scala Streaming 库的人来说,mapAsync

我使用一个输入通道(如在 kotlin 通道中)和 N 个输出通道进行了自己的实现。但是看起来很麻烦,效率也不高。

我目前使用的代码有点像这样:

val inChannel = Channel<T>()
val outChannels = (0..n).map{
  Channel<T>()
}
launch{
   var i = 0
   for(t in inChannel){
     
     outChannels[i].offer(t)
     i = ((i+1)%n)
   }
}
outChannels.forEach{outChannel ->
  launch{
     for(t in outChannel){
        fn(t)
     }
  }
}

当然它有错误管理和一切,但仍然...

编辑:我做了以下测试,但失败了。

test("Parallelism is correctly capped") {
            val scope = CoroutineScope(Dispatchers.Default.limitedParallelism(3))
            var num = 0
            (1..100).map {
                scope.launch {
                    num ++
                    println("started $it")
                    delay(Long.MAX_VALUE)
                }
            }

            delay(500)
            assertEquals(3,num)

        }

您可以在 Dispatcher 上使用 limitedParallelism 函数(在 v1.6.0 中试验),并使用返回的调度程序调用您的异步函数。函数 returns 是对原始调度程序的一个视图,它将并行度限制在您提供的限制范围内。你可以这样使用它:

val limit = 2 // Or some other number

val dispatcher = Dispatchers.Default
val limitedDispatcher = dispatcher.limitedParallelism(limit)

for (n in 0..100) {
    scope.launch(limitedDispatcher) {
        executeTask(n)
    }
}

您提出的问题需要@marstran 的回答。如果您想要的是在任何给定时间(并行)不超过 N 个协程正在被主动执行,那么 limitedParallelism 是可行的方法:

val maxThreads: Int = TODO("some max number of threads")
val limitedDispatcher = Dispatchers.Default.limitedParallelism(maxThreads)

elements.forEach { elt ->
    scope.launch(limitedDispatcher) {
        doSomething(elt)
    }
}

现在,如果您想要甚至限制 并发性 ,以便最多 N 个协程同时 运行(可能交错),无论线程如何,您可以使用 Semaphore 代替:

val maxConcurrency: Int = TODO("some max number of concurrency coroutines")
val semaphore = Semaphore(maxConcurrency)

elements.forEach { elt ->
    scope.async {
        semaphore.withPermit {
            doSomething(elt)
        }
    }
}

您也可以结合使用这两种方法。

其他回答已经说明了,看你需要限制并行还是并发。如果你需要限制并发,那么你可以像你原来的解决方案那样做,但只有一个通道:

val channel = Channel<T>()
repeat(n) {
    launch {
        for(t in channel){
            fn(t)
        }
    }
}

另请注意,您的示例中的 offer() 不保证任务将永远执行。如果轮循中的下一个消费者仍然忙于前一个任务,则新任务将被忽略。