后台收集无限Flow的方法单元测试永远运行

Unit test of method collecting infinite Flow in background runs forever

我正在努力测试 and/or 实现一个在后台监听无限流的方法。具体来说,我想到的用例是一些具有本地和远程数据源的数据的存储库。本地数据源是唯一的真实来源。存储库随时间公开变化流,这实际上只是本地数据源随时间变化的流。但是,通过后台收集远程源流,远程的变化也会反映到本地。

我正在尝试测试后者,但由于远程源流是无限的,因此使用 运行Blocking 进行测试会使它永远 运行。但是,如果我对协程和范围也有其他误解,我不会感到惊讶(相反,如果我没有误解,我会感到惊讶)。我将如何着手让我的测试不会永远等待?我是否在存储库的 observerValues 方法中犯了一些基本错误,导致它无法像我想的那样工作?

以下是我目前在一个尽可能小的非工作示例中的困惑尝试:

package com.example.data.repositories

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import org.mockito.Mockito
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.mock
import org.mockito.kotlin.whenever

@DisplayName("FooRepository")
internal class FooRepositoryTest {
    private val mockLocalDataSource: FooLocalDataSource = mock()
    private val mockRemoteDataSource: FooRemoteDataSource = mock()
    private var testRepository: FooRepository =
        FooRepository(mockLocalDataSource, mockRemoteDataSource)

    @BeforeEach
    fun setUp() {
        Mockito.reset(mockLocalDataSource)
        Mockito.reset(mockRemoteDataSource)
        testRepository = FooRepository(mockLocalDataSource, mockRemoteDataSource)
    }

    @Nested
    @DisplayName("observeReferencePoints()")
    inner class ObserveReferencePointsTest {
        @ExperimentalCoroutinesApi
        @Test
        fun `acts on value emitted from remote flow`() = runBlocking {
            val intCaptor = argumentCaptor<Int>()
            val valuesActedOn = mutableListOf<Int>()
            whenever(mockLocalDataSource.observeValues()).thenReturn(infiniteFlow(1, 60000, 0))
            whenever(mockLocalDataSource.doSomething(intCaptor.capture())).then {
                valuesActedOn.add(intCaptor.firstValue)
            }
            whenever(mockRemoteDataSource.observeValues()).thenReturn(infiniteFlow(7, 0, 60000))

            val testFlow = testRepository.observeValues()

            val collectedValues = mutableListOf<Int>()
            val collectionJob = launch {
                testFlow.collect { collectedValues.add(it) }
            }
            delay(100)
            collectionJob.cancel()

            assertEquals(listOf<Int>(), collectedValues)
            assertEquals(listOf(7), valuesActedOn)
        }
    }
}

class FooRepository(
    private val localDataSource: FooLocalDataSource,
    private val remoteDataSource: FooRemoteDataSource,
) {
    suspend fun observeValues(): Flow<Int> = coroutineScope {
        val localFlow = localDataSource.observeValues()
        launch {
            remoteDataSource.observeValues().collect {
                localDataSource.doSomething(it)
            }
        }
        localFlow
    }
}

class FooLocalDataSource {
    private val values = mutableListOf<Int>()
    fun observeValues(): Flow<Int> = infiniteFlow(5, 10000, 10000)

    suspend fun doSomething(value: Int) {
        delay(50)
        values.add(value)
    }
}

class FooRemoteDataSource {
    fun observeValues(): Flow<Int> = infiniteFlow(3, 15000, 15000)
}

fun <T> infiniteFlow(item: T, delayBefore: Long, delayAfter: Long): Flow<T> = flow {
    while (true) {
        delay(delayBefore)
        emit(item)
        delay(delayAfter)
    }
}

suspend fun xxx(): Flow<T> 是一个很好的指标,表明实施存在缺陷。这是因为 Flow<T> 被设计为冷流,将有关执行的细节留给用户。

您需要注意的一件事是 coroutineScope 不会 return 直到其中启动的所有协程都完成。

这意味着 observeValues() 方法不会 return 直到 remoteDataSource.observeValues().collect(...) returns.

正确的实现应该是这样的:

    fun observeValues(): Flow<Int> = flow {
        coroutineScope { // allow parallel processing of remoteDataSource
            val localFlow = localDataSource.observeValues()
            launch {
                remoteDataSource.observeValues().collect {
                    localDataSource.doSomething(it)
                }
            }
            emitAll(localFlow) // Forward emitted data to outer flow
        }
    }