使用 Kotlin 中不同线程的结果更新列表

Update list with the results of different threads in Kotlin

我想用不同线程的结果更新列表。

mainFunction(): List<A> {
  var x: List<A> = listOf<A>()
  val job = ArrayList<Job>()
  val ans = mainScope.async {
            var i = 0
            for (j in (0..5)) {
                job.add(
                    launch {
                        val res = async {
                            func1()
                        }
                        x += res.await()
                    }
                )
            }
         job.joinAll()
        }
  ans.await()
  return x
}
fun func1(): List<A> {
 //Perform some operation to get list abc
 var abc: List<A> = listOf<A>()
 delay(1000)
 return abc
}

列表“x”未正确更新。 有时,它会附加“res”。有时它不会。 有没有线程安全的方法来修改这样的列表?

新实施:

mainFunction(): List<A> {
 var x: List<A> = listOf<A>()
 val ans = mainScope.async {
   List(6) {
     async{
      func1()
     }
   }.awaitAll()
 }
 print(ans)
 for (item in ans) {
  x+= item as List<A> // item is of type Kotlin.Unit
 }
}

如果你只想并发执行一些任务并在最后获得所有完成结果的列表,你可以这样做:

val jobs = (0..5).map { mainScope.async { func1() } }
val results = jobs.awaitAll()

简答

这是您正在做的事情的一个更简单的版本,它避免了您可能 运行 遇到的同步问题:

suspend fun mainFunction(): List<A> {
    return coroutineScope {
        List(6) { async { func1() } }.awaitAll()
    }
}

如果你想解压这个,你可以阅读长答案。我将解释原始代码中的不同内容,这些内容并不是真正惯用的并且可以替换。

长答案

问题中的代码中有多个非惯用的东西,所以我会尝试解决每个问题。

基于 0 的范围的索引 for 循环

如果你只是想多次重复一个操作,使用repeat(6)而不是for (j in 0..5)更简单。它更容易阅读,尤其是当你不需要索引变量时:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val job = ArrayList<Job>()
    val ans = mainScope.async {
        repeat(6) {
            job.add(
                launch {
                    val res = async {
                        func1()
                    }
                    x += res.await()
                }
            )
        }
        job.joinAll()
    }
    ans.await()
    return x
}

使用循环创建列表

如果您想要在该循环之外创建一个列表,您还可以使用 List(size) { computeElement() } 而不是 repeat(或 for),它利用了 List factory function:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val ans = mainScope.async {
        val jobs = List(6) {
            launch {
                val res = async {
                    func1()
                }
                x += res.await()
            }
        }
        jobs.joinAll()
    }
    ans.await()
    return x
}

额外的异步

这里不需要用额外的异步包装你的启动,你可以直接在 launches 上使用你的范围:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val jobs = List(6) {
        mainScope.launch {
            val res = async {
                func1()
            }
            x += res.await()
        }
    }
    jobs.joinAll()
    return x
}

异步 + 立即等待

使用 async { someFun() } 然后立即 await-ing 这个 Deferred 结果等同于直接调用 someFun() (除非你使用不同的范围或上下文,你在这里不是为了最内在的逻辑而做的)。

所以你可以替换最里面的部分:

val res = async {
    func1()
}
x += res.await()

x += func1(),得到:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val jobs = List(6) {
        mainScope.launch {
            x += func1()
        }
    }
    jobs.joinAll()
    return x
}

启动与异步

如果你想要结果,通常使用async而不是launch更实用。当您使用 launch 时,您必须手动将结果存储在某个地方(这会使您 运行 陷入像现在这样的同步问题)。使用 async,你会得到一个 Deferred<T> 值,然后你可以 await(),当你有一个 Deferred 的列表时,当你 await them all 时没有同步问题.

所以前面代码的总体思路是不好的做法,可能会咬你,因为它需要手动同步。您可以将其替换为:

suspend fun mainFunction(): List<A> {
    val deferredValues = List(6) {
        mainScope.async {
            func1()
        }
    }
    val x = deferredValues.awaitAll()
    return x
}

或更简单:

suspend fun mainFunction(): List<A> {
    return List(6) {
        mainScope.async {
            func1()
        }
    }.awaitAll()
}

手动连接与 coroutineScope

手动 join() 作业通常有异味。如果你想等待一些协程完成,更习惯的做法是在 a coroutineScope { ... } block 内启动所有这些协程,这将暂停直到所有子协程完成。

这里我们已经用 await 调用的 async 替换了我们 join() 的所有 launch,所以这不再适用,因为我们仍然需要 await() 延迟值才能得到结果。然而,由于我们已经处于挂起函数中,我们仍然可以使用 coroutineScope 而不是像 mainScope 这样的外部范围来确保我们不会泄漏任何协程:

suspend fun mainFunction(): List<A> {
    return coroutineScope {
        List(6) { async { func1() } }.awaitAll()
    }
}