Kotlin Flow:永远不会收集 emitAll

Kotlin Flow: emitAll is never collected

我正在尝试为 networkBoundResource 的 kotlin 版本编写单元测试,可以在 with several features

上找到

这是我的版本,其中包含针对以下问题的标记注释。

inline fun <ResultType, RequestType> networkBoundResource(
    ...
    coroutineDispatcher: CoroutineDispatcher
) = flow {

    emit(Resource.loading(null)) // emit works!

    val data = queryDatabase().firstOrNull()

    val flow = if (shouldFetch(data)) {
        
        emit(Resource.loading(data)) // emit works!

        try {
            saveFetchResult(fetch())
            query().map { Resource.success(it) }
            
        } catch (throwable: Throwable) {
            onFetchFailed(throwable)
            query().map { Resource.error(throwable.toString(), it) }
            
        }
    } else {
        query().map { Resource.success(it) }
    }

    emitAll(flow) // emitAll does not work!

}.catch { exception ->
    emit(Resource.error("An error occurred while fetching data! $exception", null))

}.flowOn(coroutineDispatcher)

这是我对此代码的单元测试之一。对代码进行了一些编辑以关注我的问题:


@get:Rule
val testCoroutineRule = TestCoroutineRule()

private val coroutineDispatcher = TestCoroutineDispatcher()

@Test
fun networkBoundResource_noCachedData_shouldMakeNetworkCallAndStoreUserInDatabase() = testCoroutineRule.runBlockingTest {
    ...

    // When getAuthToken is called
    val result = networkBoundResource(..., coroutineDispatcher).toList()

    result.forEach {
        println(it)
    }    
}

问题是 println(it) 只打印 Resource.loading(null) 排放量。但是如果你看一下 flow {} 块的最后一行,你会发现应该有另一个 val flow 的发射。但是这种排放永远不会出现在我的单元测试中。为什么?

我不太确定完整的行为,但本质上你想要获得一个资源,而当前的流量都集中在 FlowCollector<T> 中,这使得推理和测试变得更加困难。

我以前从未使用过或看过 Google 代码,老实说,我只看了一眼。我的主要收获是它的封装很差,似乎打破了关注点的分离——它管理资源状态,并一对一地处理所有 io 工作 class。我更希望有 2 个不同的 classes 来分离该逻辑并允许更容易的测试。

作为简单的伪代码我会做这样的事情:

class ResourceRepository {

    suspend fun get(r : Request) : Resource {
        // abstract implementation details network request and io 
        // - this function should only fulfill the request 
        // can now be mocked for testing
        delay(3_000)
        return Resource.success(Any())
    }
}

data class Request(val a : String)

sealed class Resource {

    companion object {
        val loading : Resource get() = Loading
        fun success(a : Any) : Resource = Success(a)
        fun error(t: Throwable) : Resource = Error(t)
    }

    object Loading : Resource()

    data class Success(val a : Any) : Resource()

    data class Error(val t : Throwable) : Resource()
}

fun resourceFromRequest(r : Request) : Flow<Resource> =
    flow { emit(resourceRepository.get(r)) }
        .onStart { emit(Resource.loading) }
        .catch { emit(Resource.error(it)) }

这使您可以极大地简化 resourceFromRequest() 函数的实际测试,因为您只需模拟存储库和一种方法。这允许您抽象和处理其他地方的网络和 io 工作,独立地再次可以单独测试。

正如@MarkKeen 所建议的,我现在创建了自己的实现并且它运行良好。与 SO 上的代码相比,这个版本现在注入了 coroutineDispatcher 以便于测试,它让流程负责错误处理,它不包含嵌套流程并且更易于阅读和理解。将更新的数据存储到数据库中仍然存在副作用,但我现在太累了,无法解决这个问题。

import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.flow.*

inline fun <ResultType, RequestType> networkBoundResource(
    crossinline query: () -> Flow<ResultType?>,
    crossinline fetch: suspend () -> RequestType,
    crossinline saveFetchResult: suspend (RequestType) -> Unit,
    crossinline shouldFetch: (ResultType?) -> Boolean = { true },
    coroutineDispatcher: CoroutineDispatcher
) = flow<Resource<ResultType>> {
    
    // check for data in database
    val data = query().firstOrNull()
    
    if (data != null) {
        // data is not null -> update loading status
        emit(Resource.loading(data))
    }
    
    if (shouldFetch(data)) {
        // Need to fetch data -> call backend
        val fetchResult = fetch()
        // got data from backend, store it in database
        saveFetchResult(fetchResult)        
    }

    // load updated data from database (must not return null anymore)
    val updatedData = query().first()

    // emit updated data
    emit(Resource.success(updatedData))    
    
}.onStart { 
    emit(Resource.loading(null))
    
}.catch { exception ->
    emit(Resource.error("An error occurred while fetching data! $exception", null))
    
}.flowOn(coroutineDispatcher)

此内联乐趣的一个可能的单元测试,用于 AuthRepsitory:

@ExperimentalCoroutinesApi
class AuthRepositoryTest {

    companion object {
        const val FAKE_ID_TOKEN = "FAkE_ID_TOKEN"
    }

    @get:Rule
    val testCoroutineRule = TestCoroutineRule()

    private val coroutineDispatcher = TestCoroutineDispatcher()

    private val userDaoFake = spyk<UserDaoFake>()

    private val mockApiService = mockk<MyApi>()

    private val sut = AuthRepository(
        userDaoFake, mockApiService, coroutineDispatcher
    )

    @Before
    fun beforeEachTest() {
        userDaoFake.clear()
    }

    @Test
    fun getAuthToken_noCachedData_shouldMakeNetworkCallAndStoreUserInDatabase() = testCoroutineRule.runBlockingTest {
        // Given an empty database
        coEvery { mockApiService.getUser(any()) } returns NetworkResponse.Success(UserFakes.getNetworkUser(), null, HttpURLConnection.HTTP_OK)

        // When getAuthToken is called
        val result = sut.getAuthToken(FAKE_ID_TOKEN).toList()

        coVerifyOrder {
            // Then first try to fetch data from the DB
            userDaoFake.get()

            // Then fetch the User from the API
            mockApiService.getUser(FAKE_ID_TOKEN)

            // Then insert the user into the DB
            userDaoFake.insert(any())

            // Finally return the inserted user from the DB
            userDaoFake.get()
        }
        
        assertThat(result).containsExactly(
            Resource.loading(null),
            Resource.success(UserFakes.getAppUser())
        ).inOrder()
    }
}