Kotlin:有没有工具可以让我在执行挂起函数时控制并行度?
Kotlin: Is there a tool that allows me to control parallelism when executing suspend functions?
我正在尝试多次执行某个挂起函数,同时执行的挂起函数不会超过 N 个。
对于熟悉 Akka 和 Scala Streaming 库的人来说,mapAsync
。
我使用一个输入通道(如在 kotlin 通道中)和 N 个输出通道进行了自己的实现。但是看起来很麻烦,效率也不高。
我目前使用的代码有点像这样:
val inChannel = Channel<T>()
val outChannels = (0..n).map{
Channel<T>()
}
launch{
var i = 0
for(t in inChannel){
outChannels[i].offer(t)
i = ((i+1)%n)
}
}
outChannels.forEach{outChannel ->
launch{
for(t in outChannel){
fn(t)
}
}
}
当然它有错误管理和一切,但仍然...
编辑:我做了以下测试,但失败了。
test("Parallelism is correctly capped") {
val scope = CoroutineScope(Dispatchers.Default.limitedParallelism(3))
var num = 0
(1..100).map {
scope.launch {
num ++
println("started $it")
delay(Long.MAX_VALUE)
}
}
delay(500)
assertEquals(3,num)
}
您可以在 Dispatcher
上使用 limitedParallelism
函数(在 v1.6.0 中试验),并使用返回的调度程序调用您的异步函数。函数 returns 是对原始调度程序的一个视图,它将并行度限制在您提供的限制范围内。你可以这样使用它:
val limit = 2 // Or some other number
val dispatcher = Dispatchers.Default
val limitedDispatcher = dispatcher.limitedParallelism(limit)
for (n in 0..100) {
scope.launch(limitedDispatcher) {
executeTask(n)
}
}
您提出的问题需要@marstran 的回答。如果您想要的是在任何给定时间(并行)不超过 N 个协程正在被主动执行,那么 limitedParallelism
是可行的方法:
val maxThreads: Int = TODO("some max number of threads")
val limitedDispatcher = Dispatchers.Default.limitedParallelism(maxThreads)
elements.forEach { elt ->
scope.launch(limitedDispatcher) {
doSomething(elt)
}
}
现在,如果您想要甚至限制 并发性 ,以便最多 N 个协程同时 运行(可能交错),无论线程如何,您可以使用 Semaphore 代替:
val maxConcurrency: Int = TODO("some max number of concurrency coroutines")
val semaphore = Semaphore(maxConcurrency)
elements.forEach { elt ->
scope.async {
semaphore.withPermit {
doSomething(elt)
}
}
}
您也可以结合使用这两种方法。
其他回答已经说明了,看你需要限制并行还是并发。如果你需要限制并发,那么你可以像你原来的解决方案那样做,但只有一个通道:
val channel = Channel<T>()
repeat(n) {
launch {
for(t in channel){
fn(t)
}
}
}
另请注意,您的示例中的 offer()
不保证任务将永远执行。如果轮循中的下一个消费者仍然忙于前一个任务,则新任务将被忽略。
我正在尝试多次执行某个挂起函数,同时执行的挂起函数不会超过 N 个。
对于熟悉 Akka 和 Scala Streaming 库的人来说,mapAsync
。
我使用一个输入通道(如在 kotlin 通道中)和 N 个输出通道进行了自己的实现。但是看起来很麻烦,效率也不高。
我目前使用的代码有点像这样:
val inChannel = Channel<T>()
val outChannels = (0..n).map{
Channel<T>()
}
launch{
var i = 0
for(t in inChannel){
outChannels[i].offer(t)
i = ((i+1)%n)
}
}
outChannels.forEach{outChannel ->
launch{
for(t in outChannel){
fn(t)
}
}
}
当然它有错误管理和一切,但仍然...
编辑:我做了以下测试,但失败了。
test("Parallelism is correctly capped") {
val scope = CoroutineScope(Dispatchers.Default.limitedParallelism(3))
var num = 0
(1..100).map {
scope.launch {
num ++
println("started $it")
delay(Long.MAX_VALUE)
}
}
delay(500)
assertEquals(3,num)
}
您可以在 Dispatcher
上使用 limitedParallelism
函数(在 v1.6.0 中试验),并使用返回的调度程序调用您的异步函数。函数 returns 是对原始调度程序的一个视图,它将并行度限制在您提供的限制范围内。你可以这样使用它:
val limit = 2 // Or some other number
val dispatcher = Dispatchers.Default
val limitedDispatcher = dispatcher.limitedParallelism(limit)
for (n in 0..100) {
scope.launch(limitedDispatcher) {
executeTask(n)
}
}
您提出的问题需要@marstran 的回答。如果您想要的是在任何给定时间(并行)不超过 N 个协程正在被主动执行,那么 limitedParallelism
是可行的方法:
val maxThreads: Int = TODO("some max number of threads")
val limitedDispatcher = Dispatchers.Default.limitedParallelism(maxThreads)
elements.forEach { elt ->
scope.launch(limitedDispatcher) {
doSomething(elt)
}
}
现在,如果您想要甚至限制 并发性 ,以便最多 N 个协程同时 运行(可能交错),无论线程如何,您可以使用 Semaphore 代替:
val maxConcurrency: Int = TODO("some max number of concurrency coroutines")
val semaphore = Semaphore(maxConcurrency)
elements.forEach { elt ->
scope.async {
semaphore.withPermit {
doSomething(elt)
}
}
}
您也可以结合使用这两种方法。
其他回答已经说明了,看你需要限制并行还是并发。如果你需要限制并发,那么你可以像你原来的解决方案那样做,但只有一个通道:
val channel = Channel<T>()
repeat(n) {
launch {
for(t in channel){
fn(t)
}
}
}
另请注意,您的示例中的 offer()
不保证任务将永远执行。如果轮循中的下一个消费者仍然忙于前一个任务,则新任务将被忽略。