Kotlin Coroutines - 如何阻止 await/join 所有作业?

Kotlin Coroutines - How to block to await/join all jobs?

我是 Kotlin/Coroutines 的新手,所以希望我只是想念 something/don 没有完全理解如何为我要解决的问题构建代码。

本质上,我正在获取一个字符串列表,对于列表中的每个项目,我想将其发送到另一个方法来完成工作(进行网络调用和 return 基于响应的数据) . (编辑:)我希望所有调用同时启动,并阻塞直到所有调用都done/the响应被执行,然后return一个新列表每个回复的信息。

我可能还不完全理解何时使用 launch/async,但我已经尝试在启动(使用 joinAll)和异步(使用 await ).

fun processData(lstInputs: List<String>): List<response> {

    val lstOfReturnData = mutableListOf<response>()

    runBlocking {
        withContext(Dispatchers.IO) {
            val jobs = List(lstInputs.size) {
                launch {
                    lstOfReturnData.add(networkCallToGetData(lstInputs[it]))
                }
            }
            jobs.joinAll()
        }
    }

    return lstofReturnData

我期望发生的情况是,如果我的 lstInputs 大小为 120,当所有作业都加入后,我的 lstOfReturnData 大小也应该为 120。

实际发生的是不一致的结果。我将 运行 它一次,我在最终列表中得到 118,再次 运行 它,它是 120,再次 运行 它,它是 117,等等。在 networkCallToGetData() 方法,我正在处理任何异常,至少 return 每个请求都有一些东西,无论网络调用是否失败。

任何人都可以帮助解释为什么我得到不一致的结果,以及我需要做什么来确保我正确地阻止并且在继续之前加入所有作业?

Runblocking 应该意味着您不必调用加入。 从 runblocking 范围内启动协程应该为你做这件事。 你试过了吗:

fun processData(lstInputs: List<String>): List<response> {

val lstOfReturnData = mutableListOf<response>()

runBlocking {
    lstInputs.forEach {
            launch(Dispatchers.IO) {
                lstOfReturnData.add(networkCallToGetData(it))
            }
   } 
}

return lstofReturnData

mutableListOf() 创建一个 ArrayList,它不是线程安全的。
尝试改用 ConcurrentLinkedQueue

还有,你用的是稳定版的Kotlin/Kotlinx.coroutine(不是旧的实验版)?在稳定版中,随着结构化并发的引入,就不用写jobs.joinAll anymore了。 launchrunBlocking 的扩展函数,它将在 runBlocking 范围内启动新协程,并且 runBlocking 范围将自动等待所有启动的作业完成。所以上面的代码可以缩短为

val lstOfReturnData = ConcurrentLinkedQueue<response>()
runBlocking {
        lstInputs.forEach {
            launch(Dispatches.IO) {
                lstOfReturnData.add(networkCallToGetData(it))
            }
        }
}
return lstOfReturnData

runBlocking 中断地阻塞当前线程直到它完成。我想这不是你想要的。如果我想错了,你想阻塞当前线程,那么你可以摆脱协程,只在当前线程中进行网络调用:

val lstOfReturnData = mutableListOf<response>()
lstInputs.forEach {
    lstOfReturnData.add(networkCallToGetData(it))
} 

但如果这不是您的意图,您可以执行以下操作:

class Presenter(private val uiContext: CoroutineContext = Dispatchers.Main) 
    : CoroutineScope {

    // creating local scope for coroutines
    private var job: Job = Job()
    override val coroutineContext: CoroutineContext
        get() = uiContext + job

    // call this to cancel job when you don't need it anymore
    fun detach() {
        job.cancel()
    }

    fun processData(lstInputs: List<String>) {

        launch {
            val deferredList = lstInputs.map { 
                async(Dispatchers.IO) { networkCallToGetData(it) } // runs in parallel in background thread
            }
            val lstOfReturnData = deferredList.awaitAll() // waiting while all requests are finished without blocking the current thread

            // use lstOfReturnData in Main Thread, e.g. update UI
        }
    }
}