为什么这段代码没有并发执行并且 delay() 没有像预期的那样挂起协程?
Why doesn't this code execute concurrently and delay() doesn't suspend the coroutine as expected?
以下代码生成范围为 [1,10] 的流。然后它调用一个挂起函数,该函数为每个数字启动一个异步协程并将其值一式三份。
@OptIn(FlowPreview::class)
@Test
fun someTest() {
val t1 = System.currentTimeMillis()
runTest {
val range = (1..10).asFlow()
val concat = range.flatMapMerge { triplicate(it) }.toList(mutableListOf())
println(concat)
}
val t2 = System.currentTimeMillis()
println("Elapsed time ${t2 -t1}")
}
private suspend fun triplicate(i: Int): Flow<Int> {
return coroutineScope {
val x = async {
val t1 = System.currentTimeMillis()
println("pre sleep ${Thread.currentThread().name}")
delay(10000L)
val t2 = System.currentTimeMillis()
println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
listOf(i, i, i).asFlow()
}
x.await()
}
}
1) delay() 不工作:
执行显示此日志:
pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 4ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 1ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 1ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 1ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 0ms
pre sleep Test worker @coroutine#13
post sleep Test worker @coroutine#13 1ms
pre sleep Test worker @coroutine#15
post sleep Test worker @coroutine#15 1ms
pre sleep Test worker @coroutine#17
post sleep Test worker @coroutine#17 1ms
pre sleep Test worker @coroutine#19
post sleep Test worker @coroutine#19 1ms
pre sleep Test worker @coroutine#21
post sleep Test worker @coroutine#21 1ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 179
每个协程只休眠几毫秒,而不是预期的 10000 毫秒 (delay(10000L)
)。为什么?
- 为 Thread.sleep 更改 delay() 我可以中断协程,但无法使异步协程同时执行。 完成需要 100 秒(10 个元素 * 10 秒休眠)而不是仅 10 秒(10 个元素并行休眠 10 秒)。为什么?
pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 10005ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 10004ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 10003ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 10005ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 10003ms
pre sleep Test worker @coroutine#13
post sleep Test worker @coroutine#13 10002ms
pre sleep Test worker @coroutine#15
post sleep Test worker @coroutine#15 10002ms
pre sleep Test worker @coroutine#17
post sleep Test worker @coroutine#17 10004ms
pre sleep Test worker @coroutine#19
post sleep Test worker @coroutine#19 10002ms
pre sleep Test worker @coroutine#21
post sleep Test worker @coroutine#21 10003ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 100179
更新:
@OptIn(FlowPreview::class)
@Test
fun someTest() {
runBlocking {
val t1 = System.currentTimeMillis()
val range = (1..10).asFlow()
val concat = range.flatMapMerge{ triplicate(it) }.toList(mutableListOf())
println(concat)
val t2 = System.currentTimeMillis()
println("Elapsed time ${t2 - t1}")
println(t2 - t1)
}
}
private suspend fun triplicate(i: Int): Flow<Int> {
return coroutineScope {
val x = async {
val t1 = System.currentTimeMillis()
println("pre sleep ${Thread.currentThread().name}")
delay(10000L)
val t2 = System.currentTimeMillis()
println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
listOf(i, i, i).asFlow()
}
x.await()
}
}
我已将 runTest 更改为 runBlocking,现在 delay
使线程休眠。但是我无法并行化 triplicate
调用。怎么了?
pre sleep Test worker @coroutine#2
post sleep Test worker @coroutine#2 10005ms
pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 10005ms
pre sleep Test worker @coroutine#4
post sleep Test worker @coroutine#4 10005ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 10004ms
pre sleep Test worker @coroutine#6
post sleep Test worker @coroutine#6 10006ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 10002ms
pre sleep Test worker @coroutine#8
post sleep Test worker @coroutine#8 10003ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 10005ms
pre sleep Test worker @coroutine#10
post sleep Test worker @coroutine#10 10004ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 10003ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 10084
100084
因为这正是runTest()所做的:它创建了一个非常特殊的协程环境,其中模拟了时间流。它保持正确的操作顺序,但一切都会立即发生。我们不希望单元测试 运行 等待几秒甚至几小时。
如果您希望正常等待,请使用 runBlocking() 而不是 runTest()
。
无法运行并发的三个原因
runTest()
默认使用单线程。如果您需要使用更多线程,则提供协程调度程序,例如:runTest(Dispatchers.Default) { ... }
.
当 运行在协程中运行时,您几乎不应该阻塞线程。正如您在示例中所见,它使协同程序无响应。即使你使用 Dispatchers.Default
然后假设你有 4 个 CPU 核心,它仍然需要 30 秒才能完成,因为你一次只能执行 4 个睡眠。
triplicate()
函数真的乱七八糟。通过使用 async()
,然后立即使用 await()
,您仍然可以几乎同步地执行代码。 triplicate()
returns 在等待 10s 之后,因为 flatMapMerge() “顺序调用转换”,你真的一次执行一个 triplicate()
.
我猜您的意图 (?) 是立即 return 流并在 10 秒后发出项目。那么flatMapMerge()
可以获取多个这样的流并并发收集:
private fun triplicate(i: Int): Flow<Int> = flow {
val t1 = System.currentTimeMillis()
println("pre sleep ${Thread.currentThread().name}")
delay(10000L)
val t2 = System.currentTimeMillis()
println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
emit(i)
emit(i)
emit(i)
}
以下代码生成范围为 [1,10] 的流。然后它调用一个挂起函数,该函数为每个数字启动一个异步协程并将其值一式三份。
@OptIn(FlowPreview::class)
@Test
fun someTest() {
val t1 = System.currentTimeMillis()
runTest {
val range = (1..10).asFlow()
val concat = range.flatMapMerge { triplicate(it) }.toList(mutableListOf())
println(concat)
}
val t2 = System.currentTimeMillis()
println("Elapsed time ${t2 -t1}")
}
private suspend fun triplicate(i: Int): Flow<Int> {
return coroutineScope {
val x = async {
val t1 = System.currentTimeMillis()
println("pre sleep ${Thread.currentThread().name}")
delay(10000L)
val t2 = System.currentTimeMillis()
println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
listOf(i, i, i).asFlow()
}
x.await()
}
}
1) delay() 不工作:
执行显示此日志:
pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 4ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 1ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 1ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 1ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 0ms
pre sleep Test worker @coroutine#13
post sleep Test worker @coroutine#13 1ms
pre sleep Test worker @coroutine#15
post sleep Test worker @coroutine#15 1ms
pre sleep Test worker @coroutine#17
post sleep Test worker @coroutine#17 1ms
pre sleep Test worker @coroutine#19
post sleep Test worker @coroutine#19 1ms
pre sleep Test worker @coroutine#21
post sleep Test worker @coroutine#21 1ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 179
每个协程只休眠几毫秒,而不是预期的 10000 毫秒 (delay(10000L)
)。为什么?
- 为 Thread.sleep 更改 delay() 我可以中断协程,但无法使异步协程同时执行。 完成需要 100 秒(10 个元素 * 10 秒休眠)而不是仅 10 秒(10 个元素并行休眠 10 秒)。为什么?
pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 10005ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 10004ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 10003ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 10005ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 10003ms
pre sleep Test worker @coroutine#13
post sleep Test worker @coroutine#13 10002ms
pre sleep Test worker @coroutine#15
post sleep Test worker @coroutine#15 10002ms
pre sleep Test worker @coroutine#17
post sleep Test worker @coroutine#17 10004ms
pre sleep Test worker @coroutine#19
post sleep Test worker @coroutine#19 10002ms
pre sleep Test worker @coroutine#21
post sleep Test worker @coroutine#21 10003ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 100179
更新:
@OptIn(FlowPreview::class)
@Test
fun someTest() {
runBlocking {
val t1 = System.currentTimeMillis()
val range = (1..10).asFlow()
val concat = range.flatMapMerge{ triplicate(it) }.toList(mutableListOf())
println(concat)
val t2 = System.currentTimeMillis()
println("Elapsed time ${t2 - t1}")
println(t2 - t1)
}
}
private suspend fun triplicate(i: Int): Flow<Int> {
return coroutineScope {
val x = async {
val t1 = System.currentTimeMillis()
println("pre sleep ${Thread.currentThread().name}")
delay(10000L)
val t2 = System.currentTimeMillis()
println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
listOf(i, i, i).asFlow()
}
x.await()
}
}
我已将 runTest 更改为 runBlocking,现在 delay
使线程休眠。但是我无法并行化 triplicate
调用。怎么了?
pre sleep Test worker @coroutine#2
post sleep Test worker @coroutine#2 10005ms
pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 10005ms
pre sleep Test worker @coroutine#4
post sleep Test worker @coroutine#4 10005ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 10004ms
pre sleep Test worker @coroutine#6
post sleep Test worker @coroutine#6 10006ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 10002ms
pre sleep Test worker @coroutine#8
post sleep Test worker @coroutine#8 10003ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 10005ms
pre sleep Test worker @coroutine#10
post sleep Test worker @coroutine#10 10004ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 10003ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 10084
100084
因为这正是runTest()所做的:它创建了一个非常特殊的协程环境,其中模拟了时间流。它保持正确的操作顺序,但一切都会立即发生。我们不希望单元测试 运行 等待几秒甚至几小时。
如果您希望正常等待,请使用 runBlocking() 而不是
runTest()
。无法运行并发的三个原因
runTest()
默认使用单线程。如果您需要使用更多线程,则提供协程调度程序,例如:runTest(Dispatchers.Default) { ... }
.当 运行在协程中运行时,您几乎不应该阻塞线程。正如您在示例中所见,它使协同程序无响应。即使你使用
Dispatchers.Default
然后假设你有 4 个 CPU 核心,它仍然需要 30 秒才能完成,因为你一次只能执行 4 个睡眠。triplicate()
函数真的乱七八糟。通过使用async()
,然后立即使用await()
,您仍然可以几乎同步地执行代码。triplicate()
returns 在等待 10s 之后,因为 flatMapMerge() “顺序调用转换”,你真的一次执行一个triplicate()
.我猜您的意图 (?) 是立即 return 流并在 10 秒后发出项目。那么
flatMapMerge()
可以获取多个这样的流并并发收集:
private fun triplicate(i: Int): Flow<Int> = flow {
val t1 = System.currentTimeMillis()
println("pre sleep ${Thread.currentThread().name}")
delay(10000L)
val t2 = System.currentTimeMillis()
println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
emit(i)
emit(i)
emit(i)
}