后台收集无限Flow的方法单元测试永远运行
Unit test of method collecting infinite Flow in background runs forever
我正在努力测试 and/or 实现一个在后台监听无限流的方法。具体来说,我想到的用例是一些具有本地和远程数据源的数据的存储库。本地数据源是唯一的真实来源。存储库随时间公开变化流,这实际上只是本地数据源随时间变化的流。但是,通过后台收集远程源流,远程的变化也会反映到本地。
我正在尝试测试后者,但由于远程源流是无限的,因此使用 运行Blocking 进行测试会使它永远 运行。但是,如果我对协程和范围也有其他误解,我不会感到惊讶(相反,如果我没有误解,我会感到惊讶)。我将如何着手让我的测试不会永远等待?我是否在存储库的 observerValues
方法中犯了一些基本错误,导致它无法像我想的那样工作?
以下是我目前在一个尽可能小的非工作示例中的困惑尝试:
package com.example.data.repositories
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import org.mockito.Mockito
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.mock
import org.mockito.kotlin.whenever
@DisplayName("FooRepository")
internal class FooRepositoryTest {
private val mockLocalDataSource: FooLocalDataSource = mock()
private val mockRemoteDataSource: FooRemoteDataSource = mock()
private var testRepository: FooRepository =
FooRepository(mockLocalDataSource, mockRemoteDataSource)
@BeforeEach
fun setUp() {
Mockito.reset(mockLocalDataSource)
Mockito.reset(mockRemoteDataSource)
testRepository = FooRepository(mockLocalDataSource, mockRemoteDataSource)
}
@Nested
@DisplayName("observeReferencePoints()")
inner class ObserveReferencePointsTest {
@ExperimentalCoroutinesApi
@Test
fun `acts on value emitted from remote flow`() = runBlocking {
val intCaptor = argumentCaptor<Int>()
val valuesActedOn = mutableListOf<Int>()
whenever(mockLocalDataSource.observeValues()).thenReturn(infiniteFlow(1, 60000, 0))
whenever(mockLocalDataSource.doSomething(intCaptor.capture())).then {
valuesActedOn.add(intCaptor.firstValue)
}
whenever(mockRemoteDataSource.observeValues()).thenReturn(infiniteFlow(7, 0, 60000))
val testFlow = testRepository.observeValues()
val collectedValues = mutableListOf<Int>()
val collectionJob = launch {
testFlow.collect { collectedValues.add(it) }
}
delay(100)
collectionJob.cancel()
assertEquals(listOf<Int>(), collectedValues)
assertEquals(listOf(7), valuesActedOn)
}
}
}
class FooRepository(
private val localDataSource: FooLocalDataSource,
private val remoteDataSource: FooRemoteDataSource,
) {
suspend fun observeValues(): Flow<Int> = coroutineScope {
val localFlow = localDataSource.observeValues()
launch {
remoteDataSource.observeValues().collect {
localDataSource.doSomething(it)
}
}
localFlow
}
}
class FooLocalDataSource {
private val values = mutableListOf<Int>()
fun observeValues(): Flow<Int> = infiniteFlow(5, 10000, 10000)
suspend fun doSomething(value: Int) {
delay(50)
values.add(value)
}
}
class FooRemoteDataSource {
fun observeValues(): Flow<Int> = infiniteFlow(3, 15000, 15000)
}
fun <T> infiniteFlow(item: T, delayBefore: Long, delayAfter: Long): Flow<T> = flow {
while (true) {
delay(delayBefore)
emit(item)
delay(delayAfter)
}
}
suspend fun xxx(): Flow<T>
是一个很好的指标,表明实施存在缺陷。这是因为 Flow<T>
被设计为冷流,将有关执行的细节留给用户。
您需要注意的一件事是 coroutineScope
不会 return 直到其中启动的所有协程都完成。
这意味着 observeValues()
方法不会 return 直到 remoteDataSource.observeValues().collect(...)
returns.
正确的实现应该是这样的:
fun observeValues(): Flow<Int> = flow {
coroutineScope { // allow parallel processing of remoteDataSource
val localFlow = localDataSource.observeValues()
launch {
remoteDataSource.observeValues().collect {
localDataSource.doSomething(it)
}
}
emitAll(localFlow) // Forward emitted data to outer flow
}
}