违反流异常透明度:检测到来自另一个协程的发射
Flow exception transparency is violated : Emission from another coroutine is detected
我正在尝试在我的项目中实现 NetworkBoundResource class,这就是我正在尝试的。一切正常,获得响应、缓存,但当我在 flowBuilder 中发出值时,它崩溃并显示此错误。
我得到的错误:
Emission from another coroutine is detected.
Child of ProducerCoroutine{Active}@df26eb9, expected child of FlowCoroutine{Active}@a0bb2fe.
FlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow')' has been detected.
Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
For a more detailed explanation, please refer to Flow documentation.
NetworkBoundResource class:
abstract class NetworkBoundResource<ResultType, RequestType> {
fun invoke(): Flow<Resource<ResultType>> = flow {
val rawData = loadFromDb()
if (shouldFetch(rawData)) {
fetchDataFromServer()
.onStart { emit(Resource.loading(rawData)) } // emit() causing issue
.catch { emit(Resource.error(it, null)) } // emit() causing issue
.collectLatest { }
}
}
// Save API response result into the database
protected abstract suspend fun cacheInDb(items: RequestType)
// Need to fetch data from server or not.
protected abstract fun shouldFetch(data: ResultType?): Boolean
// Show cached data from the database.
protected abstract suspend fun loadFromDb(): ResultType
// Fetch the data from server.
protected abstract suspend fun fetchDataFromServer(): Flow<ApiResponse<List<Category>>>
// when the fetch fails.
protected open fun onFetchFailed() {}
}
存储库class:
fun getCategories(): Flow<Resource<List<Category>>> {
return object : NetworkBoundResource<List<Category>, List<Category>>() {
override suspend fun cacheInDb(items: List<Category>) {
withContext(Dispatchers.IO) { database.getCategories().insert(items) }
}
override fun shouldFetch(data: List<Category>?): Boolean {
return true
}
override suspend fun loadFromDb(): List<Category> {
return withContext(Dispatchers.IO) { database.getCategories().read() }
}
override suspend fun fetchDataFromServer(): Flow<ApiResponse<List<Category>>> {
return flow { emit(RetrofitModule.getCategories()) }
}
}.invoke()
}
我的视图模型类:
init {
viewModelScope.launch {
repository.getCategories().collectLatest {
if(it.data!=null){
_categories.value = it.data
Log.d("appDebug", " ViewModel : $it")
}
}
}
}
如异常所述,冷流不允许并发 emit()
。
您有两个选择:
- 用
channelFlow { }
替换 flow { }
并用 send()
发送值(在你的情况下可能更容易)
- 确保没有
emit()
被同时调用
我正在尝试在我的项目中实现 NetworkBoundResource class,这就是我正在尝试的。一切正常,获得响应、缓存,但当我在 flowBuilder 中发出值时,它崩溃并显示此错误。
我得到的错误:
Emission from another coroutine is detected.
Child of ProducerCoroutine{Active}@df26eb9, expected child of FlowCoroutine{Active}@a0bb2fe.
FlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow')' has been detected.
Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
For a more detailed explanation, please refer to Flow documentation.
NetworkBoundResource class:
abstract class NetworkBoundResource<ResultType, RequestType> {
fun invoke(): Flow<Resource<ResultType>> = flow {
val rawData = loadFromDb()
if (shouldFetch(rawData)) {
fetchDataFromServer()
.onStart { emit(Resource.loading(rawData)) } // emit() causing issue
.catch { emit(Resource.error(it, null)) } // emit() causing issue
.collectLatest { }
}
}
// Save API response result into the database
protected abstract suspend fun cacheInDb(items: RequestType)
// Need to fetch data from server or not.
protected abstract fun shouldFetch(data: ResultType?): Boolean
// Show cached data from the database.
protected abstract suspend fun loadFromDb(): ResultType
// Fetch the data from server.
protected abstract suspend fun fetchDataFromServer(): Flow<ApiResponse<List<Category>>>
// when the fetch fails.
protected open fun onFetchFailed() {}
}
存储库class:
fun getCategories(): Flow<Resource<List<Category>>> {
return object : NetworkBoundResource<List<Category>, List<Category>>() {
override suspend fun cacheInDb(items: List<Category>) {
withContext(Dispatchers.IO) { database.getCategories().insert(items) }
}
override fun shouldFetch(data: List<Category>?): Boolean {
return true
}
override suspend fun loadFromDb(): List<Category> {
return withContext(Dispatchers.IO) { database.getCategories().read() }
}
override suspend fun fetchDataFromServer(): Flow<ApiResponse<List<Category>>> {
return flow { emit(RetrofitModule.getCategories()) }
}
}.invoke()
}
我的视图模型类:
init {
viewModelScope.launch {
repository.getCategories().collectLatest {
if(it.data!=null){
_categories.value = it.data
Log.d("appDebug", " ViewModel : $it")
}
}
}
}
如异常所述,冷流不允许并发 emit()
。
您有两个选择:
- 用
channelFlow { }
替换flow { }
并用send()
发送值(在你的情况下可能更容易) - 确保没有
emit()
被同时调用