Kotlin 流程顺序异步处理
Kotlin flow sequential asynchronous processing
我有一个 flow
(MutableSharedFlow
,如果它是相关的)并且我有一个可能很昂贵的操作,我想异步执行,同时仍然保持顺序。我使用 CompletableFuture
:
实现了我想要的
private val threadPoolSize = 5
private val threadPool = Executors.newFixedThreadPool(threadPoolSize)
fun process(flow: Flow<String>) = flow
.map { CompletableFuture.supplyAsync({ expensiveHandle(it) }, threadPool) }
.buffer(threadPoolSize)
.map { it.get() } // block current thread
.flowOn(threadPool.asCoroutineDispatcher())
多亏了卸载到线程池、固定大小 buffer
和 线程阻塞 CompletableFuture#get
的组合,此代码符合我的预期 - 高达 threadPoolSize
事件被并行处理,并按照接收到的顺序发送到流中。
当我用 kotlinx.coroutines.future
的扩展函数 CompletableFuture#await
替换 CompletableFuture#get
并使用 flow
或 async
而不是 CompletableFuture#supplyAsync
时,消息不再并行处理:
fun process(flow: Flow<String>) = flow
.map {
runBlocking {
future { expensiveHandle(it) } // same behaviour with async {...}
}
}
.buffer(threadPoolSize)
.map { it.await() }
.flowOn(threadPool.asCoroutineDispatcher())
我可以使用 coroutines/suspending 函数编写等效代码吗?
不要映射作为参数传递的流,而是尝试返回一个使用 callbackFlow
构建器构建的新流并在其中收集流,这样您就可以启动多个协同程序来调用 expensiveHandle(it)
并发送它们的尽快各自的结果。
fun process(flow: Flow<String>) = callbackFlow {
flow.collect {
launch {
send(expensiveHandle(it))
}
}
}.flowOn(threadPool.asCoroutineDispatcher())
所以问题不是 future
本身,而是周围的 runBlocking
。当使用自定义 CoroutineScope
和线程池作为底层调度程序时,代码按预期工作(注意 get
到 await
的更改,而且我使用 async
而不是future
因为它在核心协程库中):
private val threadPoolSize = 5
private val threadPool = Executors.newFixedThreadPool(threadPoolSize)
private val dispatcher = threadPool.asCoroutineDispatcher()
private val scope = CoroutineScope(dispatcher)
fun process(flow: Flow<String>) = flow
.map { scope.async(expensiveHandle(it)) }
.buffer(threadPoolSize)
.map { it.await() }
.flowOn(dispatcher)
async
和future
是CoroutineScope
的扩展函数。所以,你需要一些
CoroutineScope
给他们打电话。
runBlocking
给出一些 CoroutineScope
,但它是一个阻塞调用,所以它在 suspend
中的用法函数 is prohibited.
您可以使用 GlobalScope.async
,但它也是 not recommended 并且执行将由 Dispatchers.Default
分派,而不是像 [=21] 的原始示例中那样由 threadPool.asCoroutineDispatcher()
分派=].
coroutineScope
和withContext
函数会提供CoroutineScope
,它从外层作用域继承了coroutineContext
,所以流处理会被挂起,立即执行expensiveHandle(it)
协程。
您需要使用工厂函数创建 CoroutineScope
,这样协程上下文就不会混合:
fun process(flow: Flow<String>, threadPool: ThreadPoolExecutor): Flow<String> {
val dispatcher = threadPool.asCoroutineDispatcher()
return flow
.map { CoroutineScope(dispatcher).async { expensiveHandle(it) } }
.buffer(threadPool.poolSize)
.map { it.await() }
.flowOn(dispatcher)
}
我有一个 flow
(MutableSharedFlow
,如果它是相关的)并且我有一个可能很昂贵的操作,我想异步执行,同时仍然保持顺序。我使用 CompletableFuture
:
private val threadPoolSize = 5
private val threadPool = Executors.newFixedThreadPool(threadPoolSize)
fun process(flow: Flow<String>) = flow
.map { CompletableFuture.supplyAsync({ expensiveHandle(it) }, threadPool) }
.buffer(threadPoolSize)
.map { it.get() } // block current thread
.flowOn(threadPool.asCoroutineDispatcher())
多亏了卸载到线程池、固定大小 buffer
和 线程阻塞 CompletableFuture#get
的组合,此代码符合我的预期 - 高达 threadPoolSize
事件被并行处理,并按照接收到的顺序发送到流中。
当我用 kotlinx.coroutines.future
的扩展函数 CompletableFuture#await
替换 CompletableFuture#get
并使用 flow
或 async
而不是 CompletableFuture#supplyAsync
时,消息不再并行处理:
fun process(flow: Flow<String>) = flow
.map {
runBlocking {
future { expensiveHandle(it) } // same behaviour with async {...}
}
}
.buffer(threadPoolSize)
.map { it.await() }
.flowOn(threadPool.asCoroutineDispatcher())
我可以使用 coroutines/suspending 函数编写等效代码吗?
不要映射作为参数传递的流,而是尝试返回一个使用 callbackFlow
构建器构建的新流并在其中收集流,这样您就可以启动多个协同程序来调用 expensiveHandle(it)
并发送它们的尽快各自的结果。
fun process(flow: Flow<String>) = callbackFlow {
flow.collect {
launch {
send(expensiveHandle(it))
}
}
}.flowOn(threadPool.asCoroutineDispatcher())
所以问题不是 future
本身,而是周围的 runBlocking
。当使用自定义 CoroutineScope
和线程池作为底层调度程序时,代码按预期工作(注意 get
到 await
的更改,而且我使用 async
而不是future
因为它在核心协程库中):
private val threadPoolSize = 5
private val threadPool = Executors.newFixedThreadPool(threadPoolSize)
private val dispatcher = threadPool.asCoroutineDispatcher()
private val scope = CoroutineScope(dispatcher)
fun process(flow: Flow<String>) = flow
.map { scope.async(expensiveHandle(it)) }
.buffer(threadPoolSize)
.map { it.await() }
.flowOn(dispatcher)
async
和future
是CoroutineScope
的扩展函数。所以,你需要一些
CoroutineScope
给他们打电话。
runBlocking
给出一些 CoroutineScope
,但它是一个阻塞调用,所以它在 suspend
中的用法函数 is prohibited.
您可以使用 GlobalScope.async
,但它也是 not recommended 并且执行将由 Dispatchers.Default
分派,而不是像 [=21] 的原始示例中那样由 threadPool.asCoroutineDispatcher()
分派=].
coroutineScope
和withContext
函数会提供CoroutineScope
,它从外层作用域继承了coroutineContext
,所以流处理会被挂起,立即执行expensiveHandle(it)
协程。
您需要使用工厂函数创建 CoroutineScope
,这样协程上下文就不会混合:
fun process(flow: Flow<String>, threadPool: ThreadPoolExecutor): Flow<String> {
val dispatcher = threadPool.asCoroutineDispatcher()
return flow
.map { CoroutineScope(dispatcher).async { expensiveHandle(it) } }
.buffer(threadPool.poolSize)
.map { it.await() }
.flowOn(dispatcher)
}