Kotlin Flow:永远不会收集 emitAll
Kotlin Flow: emitAll is never collected
我正在尝试为 networkBoundResource
的 kotlin 版本编写单元测试,可以在 with several features
上找到
这是我的版本,其中包含针对以下问题的标记注释。
inline fun <ResultType, RequestType> networkBoundResource(
...
coroutineDispatcher: CoroutineDispatcher
) = flow {
emit(Resource.loading(null)) // emit works!
val data = queryDatabase().firstOrNull()
val flow = if (shouldFetch(data)) {
emit(Resource.loading(data)) // emit works!
try {
saveFetchResult(fetch())
query().map { Resource.success(it) }
} catch (throwable: Throwable) {
onFetchFailed(throwable)
query().map { Resource.error(throwable.toString(), it) }
}
} else {
query().map { Resource.success(it) }
}
emitAll(flow) // emitAll does not work!
}.catch { exception ->
emit(Resource.error("An error occurred while fetching data! $exception", null))
}.flowOn(coroutineDispatcher)
这是我对此代码的单元测试之一。对代码进行了一些编辑以关注我的问题:
@get:Rule
val testCoroutineRule = TestCoroutineRule()
private val coroutineDispatcher = TestCoroutineDispatcher()
@Test
fun networkBoundResource_noCachedData_shouldMakeNetworkCallAndStoreUserInDatabase() = testCoroutineRule.runBlockingTest {
...
// When getAuthToken is called
val result = networkBoundResource(..., coroutineDispatcher).toList()
result.forEach {
println(it)
}
}
问题是 println(it)
只打印 Resource.loading(null)
排放量。但是如果你看一下 flow {}
块的最后一行,你会发现应该有另一个 val flow
的发射。但是这种排放永远不会出现在我的单元测试中。为什么?
我不太确定完整的行为,但本质上你想要获得一个资源,而当前的流量都集中在 FlowCollector<T>
中,这使得推理和测试变得更加困难。
我以前从未使用过或看过 Google 代码,老实说,我只看了一眼。我的主要收获是它的封装很差,似乎打破了关注点的分离——它管理资源状态,并一对一地处理所有 io 工作 class。我更希望有 2 个不同的 classes 来分离该逻辑并允许更容易的测试。
作为简单的伪代码我会做这样的事情:
class ResourceRepository {
suspend fun get(r : Request) : Resource {
// abstract implementation details network request and io
// - this function should only fulfill the request
// can now be mocked for testing
delay(3_000)
return Resource.success(Any())
}
}
data class Request(val a : String)
sealed class Resource {
companion object {
val loading : Resource get() = Loading
fun success(a : Any) : Resource = Success(a)
fun error(t: Throwable) : Resource = Error(t)
}
object Loading : Resource()
data class Success(val a : Any) : Resource()
data class Error(val t : Throwable) : Resource()
}
fun resourceFromRequest(r : Request) : Flow<Resource> =
flow { emit(resourceRepository.get(r)) }
.onStart { emit(Resource.loading) }
.catch { emit(Resource.error(it)) }
这使您可以极大地简化 resourceFromRequest()
函数的实际测试,因为您只需模拟存储库和一种方法。这允许您抽象和处理其他地方的网络和 io 工作,独立地再次可以单独测试。
正如@MarkKeen 所建议的,我现在创建了自己的实现并且它运行良好。与 SO 上的代码相比,这个版本现在注入了 coroutineDispatcher 以便于测试,它让流程负责错误处理,它不包含嵌套流程并且更易于阅读和理解。将更新的数据存储到数据库中仍然存在副作用,但我现在太累了,无法解决这个问题。
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.flow.*
inline fun <ResultType, RequestType> networkBoundResource(
crossinline query: () -> Flow<ResultType?>,
crossinline fetch: suspend () -> RequestType,
crossinline saveFetchResult: suspend (RequestType) -> Unit,
crossinline shouldFetch: (ResultType?) -> Boolean = { true },
coroutineDispatcher: CoroutineDispatcher
) = flow<Resource<ResultType>> {
// check for data in database
val data = query().firstOrNull()
if (data != null) {
// data is not null -> update loading status
emit(Resource.loading(data))
}
if (shouldFetch(data)) {
// Need to fetch data -> call backend
val fetchResult = fetch()
// got data from backend, store it in database
saveFetchResult(fetchResult)
}
// load updated data from database (must not return null anymore)
val updatedData = query().first()
// emit updated data
emit(Resource.success(updatedData))
}.onStart {
emit(Resource.loading(null))
}.catch { exception ->
emit(Resource.error("An error occurred while fetching data! $exception", null))
}.flowOn(coroutineDispatcher)
此内联乐趣的一个可能的单元测试,用于 AuthRepsitory
:
@ExperimentalCoroutinesApi
class AuthRepositoryTest {
companion object {
const val FAKE_ID_TOKEN = "FAkE_ID_TOKEN"
}
@get:Rule
val testCoroutineRule = TestCoroutineRule()
private val coroutineDispatcher = TestCoroutineDispatcher()
private val userDaoFake = spyk<UserDaoFake>()
private val mockApiService = mockk<MyApi>()
private val sut = AuthRepository(
userDaoFake, mockApiService, coroutineDispatcher
)
@Before
fun beforeEachTest() {
userDaoFake.clear()
}
@Test
fun getAuthToken_noCachedData_shouldMakeNetworkCallAndStoreUserInDatabase() = testCoroutineRule.runBlockingTest {
// Given an empty database
coEvery { mockApiService.getUser(any()) } returns NetworkResponse.Success(UserFakes.getNetworkUser(), null, HttpURLConnection.HTTP_OK)
// When getAuthToken is called
val result = sut.getAuthToken(FAKE_ID_TOKEN).toList()
coVerifyOrder {
// Then first try to fetch data from the DB
userDaoFake.get()
// Then fetch the User from the API
mockApiService.getUser(FAKE_ID_TOKEN)
// Then insert the user into the DB
userDaoFake.insert(any())
// Finally return the inserted user from the DB
userDaoFake.get()
}
assertThat(result).containsExactly(
Resource.loading(null),
Resource.success(UserFakes.getAppUser())
).inOrder()
}
}
我正在尝试为 networkBoundResource
的 kotlin 版本编写单元测试,可以在
这是我的版本,其中包含针对以下问题的标记注释。
inline fun <ResultType, RequestType> networkBoundResource(
...
coroutineDispatcher: CoroutineDispatcher
) = flow {
emit(Resource.loading(null)) // emit works!
val data = queryDatabase().firstOrNull()
val flow = if (shouldFetch(data)) {
emit(Resource.loading(data)) // emit works!
try {
saveFetchResult(fetch())
query().map { Resource.success(it) }
} catch (throwable: Throwable) {
onFetchFailed(throwable)
query().map { Resource.error(throwable.toString(), it) }
}
} else {
query().map { Resource.success(it) }
}
emitAll(flow) // emitAll does not work!
}.catch { exception ->
emit(Resource.error("An error occurred while fetching data! $exception", null))
}.flowOn(coroutineDispatcher)
这是我对此代码的单元测试之一。对代码进行了一些编辑以关注我的问题:
@get:Rule
val testCoroutineRule = TestCoroutineRule()
private val coroutineDispatcher = TestCoroutineDispatcher()
@Test
fun networkBoundResource_noCachedData_shouldMakeNetworkCallAndStoreUserInDatabase() = testCoroutineRule.runBlockingTest {
...
// When getAuthToken is called
val result = networkBoundResource(..., coroutineDispatcher).toList()
result.forEach {
println(it)
}
}
问题是 println(it)
只打印 Resource.loading(null)
排放量。但是如果你看一下 flow {}
块的最后一行,你会发现应该有另一个 val flow
的发射。但是这种排放永远不会出现在我的单元测试中。为什么?
我不太确定完整的行为,但本质上你想要获得一个资源,而当前的流量都集中在 FlowCollector<T>
中,这使得推理和测试变得更加困难。
我以前从未使用过或看过 Google 代码,老实说,我只看了一眼。我的主要收获是它的封装很差,似乎打破了关注点的分离——它管理资源状态,并一对一地处理所有 io 工作 class。我更希望有 2 个不同的 classes 来分离该逻辑并允许更容易的测试。
作为简单的伪代码我会做这样的事情:
class ResourceRepository {
suspend fun get(r : Request) : Resource {
// abstract implementation details network request and io
// - this function should only fulfill the request
// can now be mocked for testing
delay(3_000)
return Resource.success(Any())
}
}
data class Request(val a : String)
sealed class Resource {
companion object {
val loading : Resource get() = Loading
fun success(a : Any) : Resource = Success(a)
fun error(t: Throwable) : Resource = Error(t)
}
object Loading : Resource()
data class Success(val a : Any) : Resource()
data class Error(val t : Throwable) : Resource()
}
fun resourceFromRequest(r : Request) : Flow<Resource> =
flow { emit(resourceRepository.get(r)) }
.onStart { emit(Resource.loading) }
.catch { emit(Resource.error(it)) }
这使您可以极大地简化 resourceFromRequest()
函数的实际测试,因为您只需模拟存储库和一种方法。这允许您抽象和处理其他地方的网络和 io 工作,独立地再次可以单独测试。
正如@MarkKeen 所建议的,我现在创建了自己的实现并且它运行良好。与 SO 上的代码相比,这个版本现在注入了 coroutineDispatcher 以便于测试,它让流程负责错误处理,它不包含嵌套流程并且更易于阅读和理解。将更新的数据存储到数据库中仍然存在副作用,但我现在太累了,无法解决这个问题。
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.flow.*
inline fun <ResultType, RequestType> networkBoundResource(
crossinline query: () -> Flow<ResultType?>,
crossinline fetch: suspend () -> RequestType,
crossinline saveFetchResult: suspend (RequestType) -> Unit,
crossinline shouldFetch: (ResultType?) -> Boolean = { true },
coroutineDispatcher: CoroutineDispatcher
) = flow<Resource<ResultType>> {
// check for data in database
val data = query().firstOrNull()
if (data != null) {
// data is not null -> update loading status
emit(Resource.loading(data))
}
if (shouldFetch(data)) {
// Need to fetch data -> call backend
val fetchResult = fetch()
// got data from backend, store it in database
saveFetchResult(fetchResult)
}
// load updated data from database (must not return null anymore)
val updatedData = query().first()
// emit updated data
emit(Resource.success(updatedData))
}.onStart {
emit(Resource.loading(null))
}.catch { exception ->
emit(Resource.error("An error occurred while fetching data! $exception", null))
}.flowOn(coroutineDispatcher)
此内联乐趣的一个可能的单元测试,用于 AuthRepsitory
:
@ExperimentalCoroutinesApi
class AuthRepositoryTest {
companion object {
const val FAKE_ID_TOKEN = "FAkE_ID_TOKEN"
}
@get:Rule
val testCoroutineRule = TestCoroutineRule()
private val coroutineDispatcher = TestCoroutineDispatcher()
private val userDaoFake = spyk<UserDaoFake>()
private val mockApiService = mockk<MyApi>()
private val sut = AuthRepository(
userDaoFake, mockApiService, coroutineDispatcher
)
@Before
fun beforeEachTest() {
userDaoFake.clear()
}
@Test
fun getAuthToken_noCachedData_shouldMakeNetworkCallAndStoreUserInDatabase() = testCoroutineRule.runBlockingTest {
// Given an empty database
coEvery { mockApiService.getUser(any()) } returns NetworkResponse.Success(UserFakes.getNetworkUser(), null, HttpURLConnection.HTTP_OK)
// When getAuthToken is called
val result = sut.getAuthToken(FAKE_ID_TOKEN).toList()
coVerifyOrder {
// Then first try to fetch data from the DB
userDaoFake.get()
// Then fetch the User from the API
mockApiService.getUser(FAKE_ID_TOKEN)
// Then insert the user into the DB
userDaoFake.insert(any())
// Finally return the inserted user from the DB
userDaoFake.get()
}
assertThat(result).containsExactly(
Resource.loading(null),
Resource.success(UserFakes.getAppUser())
).inOrder()
}
}