协程:列表 运行 中的延迟操作顺序。
Coroutine: Deferred operations in a List run sequentially.
我有一个 List
参数用于执行下载。
我将该列表的元素映射到执行下载的 Deferred
中;然后,列表的 forEach
元素,我调用 await
,但显然下载是按顺序执行的。
这是我的功能:
suspend fun syncFiles() = coroutineScope {
remoteRepository.requiredFiles()
.filter { localRepository.needToDownload( it.name, it.md5 ) }
.map { async { downloader( it ) } }
.forEach { deferredResult ->
when ( val result = deferredResult.await() ) {
is DownloadResult.Layout -> localRepository.storeLayout( result.content )
is DownloadResult.StringR -> localRepository.storeFile( result )
}
}
}
这是我的测试:
private val useCase = SyncUseCaseImpl.Factory(
mockk { // downloader
coEvery { this@mockk.invoke( any() ) } coAnswers { delay(1000 );any() }
},
...
).newInstance()
@Test
fun `syncFiles downloadConcurrently`() = runBlocking {
val requiredFilesCount = useCase.remoteRepository.requiredFiles().size
assert( requiredFilesCount ).isEqualTo( 3 )
val time = measureTimeMillis {
useCase.syncFiles()
}
assert( time ).isBetween( 1000, 1100 )
}
这是我的结果:expected to be between:<1000L> and <1100L> but was:<3081L>
我觉得很奇怪,因为这 2 个虚拟测试正确完成,也许我遗漏了什么 (?)
@Test // OK
fun test() = runBlocking {
val a = async { delay(1000 ) }
val b = async { delay(1000 ) }
val c = async { delay(1000 ) }
val time = measureTimeMillis {
a.await()
b.await()
c.await()
}
assert( time ).isBetween( 1000, 1100 )
}
@Test // OK
fun test() = runBlocking {
val wasteTime: suspend () -> Unit = { delay(1000 ) }
suspend fun wasteTimeConcurrently() = listOf( wasteTime, wasteTime, wasteTime )
.map { async { it() } }
.forEach { it.await() }
val time = measureTimeMillis {
wasteTimeConcurrently()
}
assert( time ).isBetween( 1000, 1100 )
}
问题出在mockk
如果您查看 coAnswer
函数的代码,您会发现 (API.kt + InternalPlatformDsl.kt):
infix fun coAnswers(answer: suspend MockKAnswerScope<T, B>.(Call) -> T) = answers {
InternalPlatformDsl.runCoroutine {
answer(it)
}
}
runCoroutine
看起来像这样。
actual fun <T> runCoroutine(block: suspend () -> T): T {
return runBlocking {
block()
}
}
如您所见,coAnswer
是一个非挂起函数,并使用 runBlocking
启动一个新协程。
让我们看一个例子:
val mock = mockk<Downloader> {
coEvery {
this@mockk.download()
} coAnswers {
delay(1000)
}
}
val a = async {
mock.download()
}
当mockk
执行coAnswer
块(delay()
)时,它启动一个人工协程范围,执行给定的块并等待(阻塞当前线程:runBlocking
) 直到这个块完成。所以 delay(1000)
完成后答案块只有 returns。
表示 coAnswer
中的所有协程 运行 按顺序执行。
如果作业阻塞整个线程,例如 IO 绑定任务阻塞整个线程执行,从而阻塞该线程上的所有其他协程,则可能会发生这种情况。如果您使用的是 Kotlin JVM,请尝试调用 async(IO) { }
到 运行 IO 调度程序下的协程,以便协程环境现在知道该作业将阻塞整个线程并相应地运行。
我有一个 List
参数用于执行下载。
我将该列表的元素映射到执行下载的 Deferred
中;然后,列表的 forEach
元素,我调用 await
,但显然下载是按顺序执行的。
这是我的功能:
suspend fun syncFiles() = coroutineScope {
remoteRepository.requiredFiles()
.filter { localRepository.needToDownload( it.name, it.md5 ) }
.map { async { downloader( it ) } }
.forEach { deferredResult ->
when ( val result = deferredResult.await() ) {
is DownloadResult.Layout -> localRepository.storeLayout( result.content )
is DownloadResult.StringR -> localRepository.storeFile( result )
}
}
}
这是我的测试:
private val useCase = SyncUseCaseImpl.Factory(
mockk { // downloader
coEvery { this@mockk.invoke( any() ) } coAnswers { delay(1000 );any() }
},
...
).newInstance()
@Test
fun `syncFiles downloadConcurrently`() = runBlocking {
val requiredFilesCount = useCase.remoteRepository.requiredFiles().size
assert( requiredFilesCount ).isEqualTo( 3 )
val time = measureTimeMillis {
useCase.syncFiles()
}
assert( time ).isBetween( 1000, 1100 )
}
这是我的结果:expected to be between:<1000L> and <1100L> but was:<3081L>
我觉得很奇怪,因为这 2 个虚拟测试正确完成,也许我遗漏了什么 (?)
@Test // OK
fun test() = runBlocking {
val a = async { delay(1000 ) }
val b = async { delay(1000 ) }
val c = async { delay(1000 ) }
val time = measureTimeMillis {
a.await()
b.await()
c.await()
}
assert( time ).isBetween( 1000, 1100 )
}
@Test // OK
fun test() = runBlocking {
val wasteTime: suspend () -> Unit = { delay(1000 ) }
suspend fun wasteTimeConcurrently() = listOf( wasteTime, wasteTime, wasteTime )
.map { async { it() } }
.forEach { it.await() }
val time = measureTimeMillis {
wasteTimeConcurrently()
}
assert( time ).isBetween( 1000, 1100 )
}
问题出在mockk
如果您查看 coAnswer
函数的代码,您会发现 (API.kt + InternalPlatformDsl.kt):
infix fun coAnswers(answer: suspend MockKAnswerScope<T, B>.(Call) -> T) = answers {
InternalPlatformDsl.runCoroutine {
answer(it)
}
}
runCoroutine
看起来像这样。
actual fun <T> runCoroutine(block: suspend () -> T): T {
return runBlocking {
block()
}
}
如您所见,coAnswer
是一个非挂起函数,并使用 runBlocking
启动一个新协程。
让我们看一个例子:
val mock = mockk<Downloader> {
coEvery {
this@mockk.download()
} coAnswers {
delay(1000)
}
}
val a = async {
mock.download()
}
当mockk
执行coAnswer
块(delay()
)时,它启动一个人工协程范围,执行给定的块并等待(阻塞当前线程:runBlocking
) 直到这个块完成。所以 delay(1000)
完成后答案块只有 returns。
表示 coAnswer
中的所有协程 运行 按顺序执行。
如果作业阻塞整个线程,例如 IO 绑定任务阻塞整个线程执行,从而阻塞该线程上的所有其他协程,则可能会发生这种情况。如果您使用的是 Kotlin JVM,请尝试调用 async(IO) { }
到 运行 IO 调度程序下的协程,以便协程环境现在知道该作业将阻塞整个线程并相应地运行。