Kotlin 流程顺序异步处理

Kotlin flow sequential asynchronous processing

我有一个 flowMutableSharedFlow,如果它是相关的)并且我有一个可能很昂贵的操作,我想异步执行,同时仍然保持顺序。我使用 CompletableFuture:

实现了我想要的
private val threadPoolSize = 5
private val threadPool = Executors.newFixedThreadPool(threadPoolSize)

fun process(flow: Flow<String>) = flow
    .map { CompletableFuture.supplyAsync({ expensiveHandle(it) }, threadPool) }
    .buffer(threadPoolSize)
    .map { it.get() } // block current thread
    .flowOn(threadPool.asCoroutineDispatcher())

多亏了卸载到线程池、固定大小 buffer 线程阻塞 CompletableFuture#get 的组合,此代码符合我的预期 - 高达 threadPoolSize 事件被并行处理,并按照接收到的顺序发送到流中。

当我用 kotlinx.coroutines.future 的扩展函数 CompletableFuture#await 替换 CompletableFuture#get 并使用 flowasync 而不是 CompletableFuture#supplyAsync 时,消息不再并行处理:

fun process(flow: Flow<String>) = flow
    .map { 
        runBlocking {
            future { expensiveHandle(it) } // same behaviour with async {...}
        }
    }
    .buffer(threadPoolSize)
    .map { it.await() }
    .flowOn(threadPool.asCoroutineDispatcher())

我可以使用 coroutines/suspending 函数编写等效代码吗?

不要映射作为参数传递的流,而是尝试返回一个使用 callbackFlow 构建器构建的新流并在其中收集流,这样您就可以启动多个协同程序来调用 expensiveHandle(it) 并发送它们的尽快各自的结果。

fun process(flow: Flow<String>) = callbackFlow {
        flow.collect {
            launch {
                send(expensiveHandle(it))
            }
        }
    }.flowOn(threadPool.asCoroutineDispatcher())

所以问题不是 future 本身,而是周围的 runBlocking。当使用自定义 CoroutineScope 和线程池作为底层调度程序时,代码按预期工作(注意 getawait 的更改,而且我使用 async 而不是future 因为它在核心协程库中):

private val threadPoolSize = 5
private val threadPool = Executors.newFixedThreadPool(threadPoolSize)
private val dispatcher = threadPool.asCoroutineDispatcher()
private val scope = CoroutineScope(dispatcher)

fun process(flow: Flow<String>) = flow
    .map { scope.async(expensiveHandle(it)) }
    .buffer(threadPoolSize)
    .map { it.await() }
    .flowOn(dispatcher)

asyncfutureCoroutineScope的扩展函数。所以,你需要一些 CoroutineScope 给他们打电话。

runBlocking 给出一些 CoroutineScope,但它是一个阻塞调用,所以它在 suspend 中的用法函数 is prohibited.

您可以使用 GlobalScope.async,但它也是 not recommended 并且执行将由 Dispatchers.Default 分派,而不是像 [=21] 的原始示例中那样由 threadPool.asCoroutineDispatcher() 分派=].

coroutineScopewithContext函数会提供CoroutineScope,它从外层作用域继承了coroutineContext,所以流处理会被挂起,立即执行expensiveHandle(it)协程。

您需要使用工厂函数创建 CoroutineScope,这样协程上下文就不会混合:

fun process(flow: Flow<String>, threadPool: ThreadPoolExecutor): Flow<String> {
    val dispatcher = threadPool.asCoroutineDispatcher()
    return flow
        .map { CoroutineScope(dispatcher).async { expensiveHandle(it) } }
        .buffer(threadPool.poolSize)
        .map { it.await() }
        .flowOn(dispatcher)
}