当其中一个流发出特定值时,如何取消流的组合?
How to cancel a combine of flows when one of them emits a certain value?
我正在并行执行多个网络请求并使用 Stateflow
监控结果。
每个网络请求都在单独的 flow
中完成,我使用 combine
将最新状态推送到我的 Stateflow
上。这是我的代码:
回购 class:
fun networkRequest1(id: Int): Flow<Resource<List<Area>>> =
flow {
emit(Resource.Loading())
try {
val areas = retrofitInterface.getAreas(id)
emit(Resource.Success(areas))
} catch (throwable: Throwable) {
emit(
Resource.Error()
)
)
}
}
fun networkRequest2(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
fun networkRequest3(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
fun networkRequest4(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
ViewModel class:
val getDataCombinedStateFlow: StateFlow<Resource<HashMap<String, Resource<out List<Any>>>>?> =
getDataTrigger.flatMapLatest {
withContext(it) {
combine(
repo.networkRequest1(id: Int),
repo.networkRequest2(id: Int),
repo.networkRequest3(id: Int),
repo.networkRequest4(id: Int)
) { a,
b,
c,
d
->
hashMapOf(
Pair("1", a),
Pair("2",b),
Pair("3", c),
Pair("4", d),
)
}.flatMapLatest {
val progress = it
var isLoading = false
flow<Resource<HashMap<String, Resource<out List<Any>>>>?> {
emit(Resource.Loading())
progress.forEach { (t, u) ->
if (u is Resource.Error) {
emit(Resource.Error(error = u.error!!))
// I want to cancel here, as I no longer care if 1 request fails
return@flow
}
if (u is Resource.Loading) {
isLoading = true
}
}
if (isLoading) {
emit(Resource.Loading())
return@flow
}
if (!isLoading) {
emit(Resource.Success(progress))
}
}
}
}
}.stateIn(viewModelScope, SharingStarted.Lazily, null)
查看class:
viewLifecycleOwner.lifecycleScope.launchWhenCreated() {
viewModel.getDataCombinedStateFlow.collect {
val result = it ?: return@collect
binding.loadingErrorState.apply {
if (result is Resource.Loading) {
//show smth
}
if (result is Resource.Error) {
//show error msg
}
if (result is Resource.Success) {
//done
}
}
}
}
我希望能够在发出 Resource.Error
后取消所有工作,因为我不想再等待或为其他 API 调用的响应做任何相关工作,以防万一其中失败了。
我怎样才能做到这一点?
我尝试取消收集,但构建 Stateflow 的流程继续工作并发出结果。我知道他们不会被收集,但我仍然觉得这是一种资源浪费。
我认为整个情况很复杂,因为您有源流只是在其他情况下暂停具有加载状态的函数之前。因此,您必须合并它们并过滤掉各种加载状态,并且您的最终结果流不断重复发出加载状态,直到所有源都准备就绪。
如果您的网络操作有基本的暂停功能,例如:
suspend fun networkRequest1(id: Int): List<Area> =
retrofitInterface.getAreas(id)
然后您的视图模型流程变得更简单。仅使用特定上下文来调用 flow
构建器函数是没有意义的,因此我省略了该部分。 (我也很困惑为什么你有一个 CoroutineContexts 流。)
我还认为,如果将请求调用分解为一个单独的函数,会更简洁。
private fun makeParallelRequests(id: Int): Map<String, Resource<out List<Any>> = coroutineScope {
val results = listOf(
async { networkRequest1(id) },
async { networkRequest2(id) },
async { networkRequest2(id) },
async { networkRequest4(id) }
).awaitAll()
.map { Resource.Success(it) }
listOf("1", "2", "3", "4").zip(results).toMap()
}
val dataCombinedStateFlow: StateFlow<Resource<Map<String, Resource<out List<Any>>>>?> =
getDataTrigger.flatMapLatest {
flow {
emit(Resource.Loading())
try {
val result = makeParallelRequests(id)
emit(Resource.Success(result))
catch (e: Throwable) {
emit(Resource.Error(e))
}
}
}
我同意@Tenfour04 的观点,那些嵌套流程过于复杂,有几种方法可以简化它(@Tenfour04 的解决方案是一个很好的方法)。
如果您不想重写所有内容,那么您可以修复破坏结构化并发的那一行:
.stateIn(viewModelScope, SharingStarted.Lazily, null)
有了这个,整个 ViewModel 流程在 ViewModel 的范围内启动,而视图从一个单独的范围(viewLifecycleOwner.lifecycleScope
将是片段/Activity 范围)开始收集。
如果你想从视图中取消流,你需要使用相同的范围或公开一个取消函数来取消 ViewModel 的范围。
如果您想取消来自 ViewModel 本身的流(在 return@flow
语句中),那么您只需添加:
viewModelScope.cancel()
我正在并行执行多个网络请求并使用 Stateflow
监控结果。
每个网络请求都在单独的 flow
中完成,我使用 combine
将最新状态推送到我的 Stateflow
上。这是我的代码:
回购 class:
fun networkRequest1(id: Int): Flow<Resource<List<Area>>> =
flow {
emit(Resource.Loading())
try {
val areas = retrofitInterface.getAreas(id)
emit(Resource.Success(areas))
} catch (throwable: Throwable) {
emit(
Resource.Error()
)
)
}
}
fun networkRequest2(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
fun networkRequest3(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
fun networkRequest4(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
ViewModel class:
val getDataCombinedStateFlow: StateFlow<Resource<HashMap<String, Resource<out List<Any>>>>?> =
getDataTrigger.flatMapLatest {
withContext(it) {
combine(
repo.networkRequest1(id: Int),
repo.networkRequest2(id: Int),
repo.networkRequest3(id: Int),
repo.networkRequest4(id: Int)
) { a,
b,
c,
d
->
hashMapOf(
Pair("1", a),
Pair("2",b),
Pair("3", c),
Pair("4", d),
)
}.flatMapLatest {
val progress = it
var isLoading = false
flow<Resource<HashMap<String, Resource<out List<Any>>>>?> {
emit(Resource.Loading())
progress.forEach { (t, u) ->
if (u is Resource.Error) {
emit(Resource.Error(error = u.error!!))
// I want to cancel here, as I no longer care if 1 request fails
return@flow
}
if (u is Resource.Loading) {
isLoading = true
}
}
if (isLoading) {
emit(Resource.Loading())
return@flow
}
if (!isLoading) {
emit(Resource.Success(progress))
}
}
}
}
}.stateIn(viewModelScope, SharingStarted.Lazily, null)
查看class:
viewLifecycleOwner.lifecycleScope.launchWhenCreated() {
viewModel.getDataCombinedStateFlow.collect {
val result = it ?: return@collect
binding.loadingErrorState.apply {
if (result is Resource.Loading) {
//show smth
}
if (result is Resource.Error) {
//show error msg
}
if (result is Resource.Success) {
//done
}
}
}
}
我希望能够在发出 Resource.Error
后取消所有工作,因为我不想再等待或为其他 API 调用的响应做任何相关工作,以防万一其中失败了。
我怎样才能做到这一点?
我尝试取消收集,但构建 Stateflow 的流程继续工作并发出结果。我知道他们不会被收集,但我仍然觉得这是一种资源浪费。
我认为整个情况很复杂,因为您有源流只是在其他情况下暂停具有加载状态的函数之前。因此,您必须合并它们并过滤掉各种加载状态,并且您的最终结果流不断重复发出加载状态,直到所有源都准备就绪。
如果您的网络操作有基本的暂停功能,例如:
suspend fun networkRequest1(id: Int): List<Area> =
retrofitInterface.getAreas(id)
然后您的视图模型流程变得更简单。仅使用特定上下文来调用 flow
构建器函数是没有意义的,因此我省略了该部分。 (我也很困惑为什么你有一个 CoroutineContexts 流。)
我还认为,如果将请求调用分解为一个单独的函数,会更简洁。
private fun makeParallelRequests(id: Int): Map<String, Resource<out List<Any>> = coroutineScope {
val results = listOf(
async { networkRequest1(id) },
async { networkRequest2(id) },
async { networkRequest2(id) },
async { networkRequest4(id) }
).awaitAll()
.map { Resource.Success(it) }
listOf("1", "2", "3", "4").zip(results).toMap()
}
val dataCombinedStateFlow: StateFlow<Resource<Map<String, Resource<out List<Any>>>>?> =
getDataTrigger.flatMapLatest {
flow {
emit(Resource.Loading())
try {
val result = makeParallelRequests(id)
emit(Resource.Success(result))
catch (e: Throwable) {
emit(Resource.Error(e))
}
}
}
我同意@Tenfour04 的观点,那些嵌套流程过于复杂,有几种方法可以简化它(@Tenfour04 的解决方案是一个很好的方法)。
如果您不想重写所有内容,那么您可以修复破坏结构化并发的那一行:
.stateIn(viewModelScope, SharingStarted.Lazily, null)
有了这个,整个 ViewModel 流程在 ViewModel 的范围内启动,而视图从一个单独的范围(viewLifecycleOwner.lifecycleScope
将是片段/Activity 范围)开始收集。
如果你想从视图中取消流,你需要使用相同的范围或公开一个取消函数来取消 ViewModel 的范围。
如果您想取消来自 ViewModel 本身的流(在 return@flow
语句中),那么您只需添加:
viewModelScope.cancel()