为什么这段代码没有并发执行并且 delay() 没有像预期的那样挂起协程?

Why doesn't this code execute concurrently and delay() doesn't suspend the coroutine as expected?

以下代码生成范围为 [1,10] 的流。然后它调用一个挂起函数,该函数为每个数字启动一个异步协程并将其值一式三份。

    @OptIn(FlowPreview::class)
    @Test
    fun someTest() {
        val t1 = System.currentTimeMillis()
        runTest {
            val range = (1..10).asFlow()
            val concat = range.flatMapMerge { triplicate(it) }.toList(mutableListOf())
            println(concat)
        }
        val t2 = System.currentTimeMillis()
        println("Elapsed time ${t2 -t1}")
    }

    private suspend fun triplicate(i: Int): Flow<Int> {
        return coroutineScope {
            val x = async {
                val t1 = System.currentTimeMillis()
                println("pre sleep ${Thread.currentThread().name}")
                delay(10000L)
                val t2 = System.currentTimeMillis()
                println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
                listOf(i, i, i).asFlow()
            }
            x.await()
        }
    }

1) delay() 不工作:

执行显示此日志:

pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 4ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 1ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 1ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 1ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 0ms
pre sleep Test worker @coroutine#13
post sleep Test worker @coroutine#13 1ms
pre sleep Test worker @coroutine#15
post sleep Test worker @coroutine#15 1ms
pre sleep Test worker @coroutine#17
post sleep Test worker @coroutine#17 1ms
pre sleep Test worker @coroutine#19
post sleep Test worker @coroutine#19 1ms
pre sleep Test worker @coroutine#21
post sleep Test worker @coroutine#21 1ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 179

每个协程只休眠几毫秒,而不是预期的 10000 毫秒 (delay(10000L))。为什么?

  1. 为 Thread.sleep 更改 delay() 我可以中断协程,但无法使异步协程同时执行。 完成需要 100 秒(10 个元素 * 10 秒休眠)而不是仅 10 秒(10 个元素并行休眠 10 秒)。为什么?
pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 10005ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 10004ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 10003ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 10005ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 10003ms
pre sleep Test worker @coroutine#13
post sleep Test worker @coroutine#13 10002ms
pre sleep Test worker @coroutine#15
post sleep Test worker @coroutine#15 10002ms
pre sleep Test worker @coroutine#17
post sleep Test worker @coroutine#17 10004ms
pre sleep Test worker @coroutine#19
post sleep Test worker @coroutine#19 10002ms
pre sleep Test worker @coroutine#21
post sleep Test worker @coroutine#21 10003ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 100179

更新:

    @OptIn(FlowPreview::class)
    @Test
    fun someTest() {
        runBlocking {
            val t1 = System.currentTimeMillis()
            val range = (1..10).asFlow()
            val concat = range.flatMapMerge{ triplicate(it) }.toList(mutableListOf())
            println(concat)
            val t2 = System.currentTimeMillis()
            println("Elapsed time ${t2 - t1}")
            println(t2 - t1)
        }
    }

    private suspend fun triplicate(i: Int): Flow<Int> {
        return coroutineScope {
            val x = async {
                val t1 = System.currentTimeMillis()
                println("pre sleep ${Thread.currentThread().name}")
                delay(10000L)
                val t2 = System.currentTimeMillis()
                println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
                listOf(i, i, i).asFlow()
            }
            x.await()
        }
    }

我已将 runTest 更改为 runBlocking,现在 delay 使线程休眠。但是我无法并行化 triplicate 调用。怎么了?

pre sleep Test worker @coroutine#2
post sleep Test worker @coroutine#2 10005ms
pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 10005ms
pre sleep Test worker @coroutine#4
post sleep Test worker @coroutine#4 10005ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 10004ms
pre sleep Test worker @coroutine#6
post sleep Test worker @coroutine#6 10006ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 10002ms
pre sleep Test worker @coroutine#8
post sleep Test worker @coroutine#8 10003ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 10005ms
pre sleep Test worker @coroutine#10
post sleep Test worker @coroutine#10 10004ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 10003ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 10084
100084
  1. 因为这正是runTest()所做的:它创建了一个非常特殊的协程环境,其中模拟了时间流。它保持正确的操作顺序,但一切都会立即发生。我们不希望单元测试 运行 等待几秒甚至几小时。

    如果您希望正常等待,请使用 runBlocking() 而不是 runTest()

  2. 无法运行并发的三个原因

    • runTest() 默认使用单线程。如果您需要使用更多线程,则提供协程调度程序,例如:runTest(Dispatchers.Default) { ... }.

    • 当 运行在协程中运行时,您几乎不应该阻塞线程。正如您在示例中所见,它使协同程序无响应。即使你使用 Dispatchers.Default 然后假设你有 4 个 CPU 核心,它仍然需要 30 秒才能完成,因为你一次只能执行 4 个睡眠。

    • triplicate()函数真的乱七八糟。通过使用 async(),然后立即使用 await(),您仍然可以几乎同步地执行代码。 triplicate() returns 在等待 10s 之后,因为 flatMapMerge() “顺序调用转换”,你真的一次执行一个 triplicate() .

      我猜您的意图 (?) 是立即 return 流并在 10 秒后发出项目。那么flatMapMerge()可以获取多个这样的流并并发收集:

private fun triplicate(i: Int): Flow<Int> = flow {
    val t1 = System.currentTimeMillis()
    println("pre sleep ${Thread.currentThread().name}")
    delay(10000L)
    val t2 = System.currentTimeMillis()
    println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
    emit(i)
    emit(i)
    emit(i)
}