如何根据发出的值链接具有不同 return 类型的流并收集它们的结果?

How to chain flows with different return types based on emitted values and collect their results?

我有一种情况,我必须一个接一个地执行 3 个网络请求,收集它们的结果(不同类型)。

以下是代码的相关部分:

Resource.kt

sealed class Resource<T>(val data: T? = null, val message: String? = null) {
    class Loading<T>(data: T? = null): Resource<T>(data)
    class Success<T>(data: T?): Resource<T>(data)
    class Error<T>(message: String, data: T? = null): Resource<T>(data, message)
}

Repository.kt

override fun getReportData(profileId: Int): Flow<Resource<ProfileReport>> =
        flow {
            emit(Resource.Loading<ProfileReport>())

            var report: ProfileReport? = null
            try {
                // Api is available as a retrofit implementation
                report = api.getReport(profileId).toProfileReport()
            } catch (e: HttpException) {
                emit(
                    Resource.Error<ProfileReport>(
                        message = "An unknown http exception occured"
                    )
                )
            }

            if (report!= null) {
                emit(Resource.Success<ProfileReport>(data = report))
            }
        }

假设我有 3 个这样的流程来获取我的存储库中的数据,并且它们具有不同的 return 类型(例如:ProfileReport、ProfileInfo、ProfileStatus)。

现在在我的视图模型中,我有一个函数可以执行这些流程并对发出的值执行操作,例如:

ViewModel.kt

fun getProfileData(profileId: Int) {
        getReportData(profileId)
            .onEach { result ->
                when (result) {
                    is Resource.Loading -> {
                        _loading.value = true
                    }
                    is Resource.Error -> {
                        _loading.value = false
                        // UI event to display error snackbar
                    }
                    is Resource.Success -> {
                        _loading.value = false
                        if (result.data != null) {
                            _report.value = _report.value.copy(
                                // Use result here
                            )
                        }
                    }
                }
            }.launchIn(viewModelScope)
    }

这对一个流程没问题,但我如何才能一个接一个地执行 3 个流程。 也就是说,执行第一个,如果成功,则执行第二个,依此类推,如果全部成功,则使用结果。 我是这样做的:

fun getProfileData(profileId: Int) {
        getReportData(profileId)
            .onEach { result1 ->
                when (result1) {
                    is Resource.Loading -> {/*do stuff*/}
                    is Resource.Error -> {/*do stuff*/}
                    is Resource.Success -> {
                        getProfileStatus(profileId)
                            .onEach { result2 ->
                                is Resource.Loading -> {/*do stuff*/}
                                is Resource.Error -> {/*do stuff*/}
                                is Resource.Success -> {
                                    getProfileInfo(profileId)
                                        .onEach { result3 ->
                                            is Resource.Loading -> {/*do stuff*/}
                                            is Resource.Error -> {/*do stuff*/}
                                            is Resource.Success -> {
                                                /*
                                                    Finally update viewmodel state
                                                    using result1, result2 and result3
                                                */
                                            }
                                        }.launchIn(viewModelScope)
                                }
                            }.launchIn(viewModelScope)
                        
                    }
                }
            }.launchIn(viewModelScope)
    }

但是,这感觉太麻烦了,可能有更好的方法来根据成功条件链接流程并在最后收集结果。我检查了一些使用 combine() 或 flatMapMerge() 的方法,但在这种情况下无法使用它们。

有办法实现吗?或者从设计的角度来看,这种方法本身可能是错误的?

我认为使用命令式协程比使用流可以更清晰地建模。由于您要重写函数,这取决于您是否能够修改超类型抽象函数签名。

此解决方案不使用 Resource.Loading,因此您应该删除它以使智能投射更容易。

suspend fun getReportData(profileId: Int): Resource<ProfileReport> =
    try {
        val report = api.getReport(profileId).toProfileReport()
        Resource.Success<ProfileReport>(data = report)
    } catch (e: HttpException) {
        Resource.Error<ProfileReport>(
            message = "An unknown http exception occured"
        )
    }

//.. similar for the other two functions that used to return flows.

fun getProfileData(profileId: Int) {
    viewModelScope.launch {
        // do stuff to indicate 1st loading state
        
        when(val result = getReportData(profileId)) {
            Resource.Error<ProfileReport> -> {
                // do stuff for error state
                return@launch
            }
            Resource.Success<ProfileReport> -> {
                // do stuff with result
            }
        }

        // Since we returned when there was error, we know first 
        // result was successful.

        // do stuff to indicate 2nd loading state

        when(val result = getProfileStatus(profileId)) {
            Resource.Error<ProfileStatus> -> {
                // do stuff for error state
                return@launch
            }
            Resource.Success<ProfileStatus> -> {
                // do stuff with result
            }
        }

        // do stuff to indicate 3rd loading state

        when(val result = getProfileInfo(profileId)) {
            Resource.Error<ProfileInfo> -> {
                // do stuff for error state
                return@launch
            }
            Resource.Success<ProfileInfo> -> {
                // do stuff with result
            }
        }

    }
}

如果您想保留当前的流,您可以通过这种方式收集流以避免深度嵌套。这是可行的,因为您的源流被设计为有限的(它们不会无限期地重复发出新值,但只有一个最终结果)。

fun getProfileData(profileId: Int) = viewModelScope.launch {
    var shouldBreak = false

    getReportData(profileId).collect { result ->
        when (result) { 
            is Resource.Loading -> { /*do stuff*/ }
            is Resource.Error -> {
                /*do stuff*/
                shouldBreak = true
            }
            is Resource.Success -> { /*do stuff*/ }
        }
    }

    if (shouldBreak) return@launch

    getProfileStatus(profileId).collect { result ->
        when (result) {
            is Resource.Loading -> { /*do stuff*/ }
            is Resource.Error -> {
                /*do stuff*/
                shouldBreak = true
            }
            is Resource.Success -> { /*do stuff*/ }
        }
    }

    if (shouldBreak) return@launch

    getProfileInfo(profileId).collect { result ->
        when (result) {
            is Resource.Loading -> { /*do stuff*/ }
            is Resource.Error -> { /*do stuff*/ }
            is Resource.Success -> { /*do stuff*/ }
        }
    }

}