当其中一个流发出特定值时,如何取消流的组合?

How to cancel a combine of flows when one of them emits a certain value?

我正在并行执行多个网络请求并使用 Stateflow 监控结果。 每个网络请求都在单独的 flow 中完成,我使用 combine 将最新状态推送到我的 Stateflow 上。这是我的代码:

回购 class:

fun networkRequest1(id: Int): Flow<Resource<List<Area>>> =
        flow {
            emit(Resource.Loading())
            try {
                val areas = retrofitInterface.getAreas(id)
                emit(Resource.Success(areas))
            } catch (throwable: Throwable) {
                emit(
                    Resource.Error()
                    )
                )
            }
        }

fun networkRequest2(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
fun networkRequest3(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
fun networkRequest4(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity

ViewModel class:

 val getDataCombinedStateFlow: StateFlow<Resource<HashMap<String, Resource<out List<Any>>>>?> =
        getDataTrigger.flatMapLatest {
            withContext(it) { 
                combine(
                    repo.networkRequest1(id: Int),
                    repo.networkRequest2(id: Int),
                    repo.networkRequest3(id: Int),
                    repo.networkRequest4(id: Int)
                ) { a,
                    b,
                    c,
                    d
                    ->
                    hashMapOf(
                        Pair("1", a),
                        Pair("2",b),
                        Pair("3", c),
                        Pair("4", d),
                    )
                }.flatMapLatest {
                    val progress = it
                    var isLoading = false
                    flow<Resource<HashMap<String, Resource<out List<Any>>>>?> {
                        emit(Resource.Loading())
                        progress.forEach { (t, u) ->
                            if (u is Resource.Error) {
                                emit(Resource.Error(error = u.error!!))
                               // I want to cancel here, as I no longer care if 1 request fails
                                return@flow
                            }
                            if (u is Resource.Loading) {
                                isLoading = true
                            }
                        }
                        if (isLoading) {
                            emit(Resource.Loading())
                            return@flow
                        }
                        if (!isLoading) {
                            emit(Resource.Success(progress))
                        }
                    }
                }
            }
        }.stateIn(viewModelScope, SharingStarted.Lazily, null)

查看class:

  viewLifecycleOwner.lifecycleScope.launchWhenCreated() {
            viewModel.getDataCombinedStateFlow.collect {
                val result = it ?: return@collect
                binding.loadingErrorState.apply {
                    if (result is Resource.Loading) {
                       //show smth
                    }

                    if (result is Resource.Error) {
                       //show error msg
                    }

                    if (result is Resource.Success) {
                       //done
                    }
                }
            }
        }

我希望能够在发出 Resource.Error 后取消所有工作,因为我不想再等待或为其他 API 调用的响应做任何相关工作,以防万一其中失败了。

我怎样才能做到这一点?

我尝试取消收集,但构建 Stateflow 的流程继续工作并发出结果。我知道他们不会被收集,但我仍然觉得这是一种资源浪费。

我认为整个情况很复杂,因为您有源流只是在其他情况下暂停具有加载状态的函数之前。因此,您必须合并它们并过滤掉各种加载状态,并且您的最终结果流不断重复发出加载状态,直到所有源都准备就绪。

如果您的网络操作有基本的暂停功能,例如:

suspend fun networkRequest1(id: Int): List<Area> =
        retrofitInterface.getAreas(id)

然后您的视图模型流程变得更简单。仅使用特定上下文来调用 flow 构建器函数是没有意义的,因此我省略了该部分。 (我也很困惑为什么你有一个 CoroutineContexts 流。)

我还认为,如果将请求调用分解为一个单独的函数,会更简洁。

private fun makeParallelRequests(id: Int): Map<String, Resource<out List<Any>> = coroutineScope {
    val results = listOf(
        async { networkRequest1(id) },
        async { networkRequest2(id) },
        async { networkRequest2(id) },
        async { networkRequest4(id) }
    ).awaitAll()
        .map { Resource.Success(it) }
    listOf("1", "2", "3", "4").zip(results).toMap()
}

val dataCombinedStateFlow: StateFlow<Resource<Map<String, Resource<out List<Any>>>>?> =
    getDataTrigger.flatMapLatest {
        flow {
            emit(Resource.Loading())
            try {
                val result = makeParallelRequests(id)
                emit(Resource.Success(result))
            catch (e: Throwable) {
                emit(Resource.Error(e))
            }
        }
    }

我同意@Tenfour04 的观点,那些嵌套流程过于复杂,有几种方法可以简化它(@Tenfour04 的解决方案是一个很好的方法)。

如果您不想重写所有内容,那么您可以修复破坏结构化并发的那一行:

.stateIn(viewModelScope, SharingStarted.Lazily, null)

有了这个,整个 ViewModel 流程在 ViewModel 的范围内启动,而视图从一个单独的范围(viewLifecycleOwner.lifecycleScope 将是片段/Activity 范围)开始收集。

如果你想从视图中取消流,你需要使用相同的范围或公开一个取消函数来取消 ViewModel 的范围。

如果您想取消来自 ViewModel 本身的流(在 return@flow 语句中),那么您只需添加:

viewModelScope.cancel()