使用 Kt Flow 和 Retrofit 的用例或交互器

UseCases or Interactors with Kt Flow and Retrofit

上下文

我开始从事一个新项目,我决定从 RxJava 转移到 Kotlin Coroutines。我正在使用 MVVM clean 架构,这意味着我的 ViewModelsUseCases class 通信,而这些 UseCases classes 使用一个或多个 Repositories 从网络获取数据。

让我举个例子。假设我们有一个应该显示用户个人资料信息的屏幕。所以我们有 UserProfileViewModel:

@HiltViewModel
class UserProfileViewModel @Inject constructor(
    private val getUserProfileUseCase: GetUserProfileUseCase
) : ViewModel() {
    sealed class State {
        data SuccessfullyFetchedUser(
            user: ExampleUser
        ) : State()
    }
    // ...
    val state = SingleLiveEvent<UserProfileViewModel.State>()
    // ...
    fun fetchUserProfile() {
        viewModelScope.launch {
            // ⚠️ We trigger the use case to fetch the user profile info
            getUserProfileUseCase()
                .collect {
                    when (it) {
                        is GetUserProfileUseCase.Result.UserProfileFetched -> {
                            state.postValue(State.SuccessfullyFetchedUser(it.user))
                        }
                        is GetUserProfileUseCase.Result.ErrorFetchingUserProfile -> {
                            // ...
                        }
                    }
                }
        }
    }
}

GetUserProfileUseCase 用例如下所示:

interface GetUserProfileUseCase {
    sealed class Result {
        object ErrorFetchingUserProfile : Result()
        data class UserProfileFetched(
            val user: ExampleUser
        ) : Result()
    }

    suspend operator fun invoke(email: String): Flow<Result>
}

class GetUserProfileUseCaseImpl(
    private val userRepository: UserRepository
) : GetUserProfileUseCase {
    override suspend fun invoke(email: String): Flow<GetUserProfileUseCase.Result> {
        // ⚠️ Hit the repository to fetch the info. Notice that if we have more 
        // complex scenarios, we might require zipping repository calls together, or
        // flatmap responses.
        return userRepository.getUserProfile().flatMapMerge { 
            when (it) {
                is ResultData.Success -> {
                    flow { emit(GetUserProfileUseCase.Result.UserProfileFetched(it.data.toUserExampleModel())) }
                }
                is ResultData.Error -> {
                    flow { emit(GetUserProfileUseCase.Result.ErrorFetchingUserProfile) }
                }
            }
        }
    }
}

UserRepository 存储库如下所示:

interface UserRepository {
    fun getUserProfile(): Flow<ResultData<ApiUserProfileResponse>>
}

class UserRepositoryImpl(
    private val retrofitApi: RetrofitApi
) : UserRepository {
    override fun getUserProfile(): Flow<ResultData<ApiUserProfileResponse>> {
        return flow {
            val response = retrofitApi.getUserProfileFromApi()
            if (response.isSuccessful) {
                emit(ResultData.Success(response.body()!!))
            } else {
                emit(ResultData.Error(RetrofitNetworkError(response.code())))
            }
        }
    }
}

最后,RetrofitApi 和对后端建模的响应 class API 响应如下所示:

data class ApiUserProfileResponse(
    @SerializedName("user_name") val userName: String
    // ...
)

interface RetrofitApi {
    @GET("api/user/profile")
    suspend fun getUserProfileFromApi(): Response<ApiUserProfileResponse>
}

到目前为止一切正常,但我已经开始 运行 在实现更复杂的功能时遇到一些问题。

例如,有一个用例,当用户首次登录时,我需要 (1) post 到 POST /send_email_link 端点,这端点将检查我在正文中发送的电子邮件是否已经存在,如果不存在,它将 return 一个 404 错误代码,并且 (2)如果一切顺利,我应该点击一个 POST /peek 端点,它将 return 一些关于用户帐户的信息。

这是我目前为此实现的 UserAccountVerificationUseCase:

interface UserAccountVerificationUseCase {
    sealed class Result {
        object ErrorVerifyingUserEmail : Result()
        object ErrorEmailDoesNotExist : Result()
        data class UserEmailVerifiedSuccessfully(
            val canSignIn: Boolean
        ) : Result()
    }

    suspend operator fun invoke(email: String): Flow<Result>
}

class UserAccountVerificationUseCaseImpl(
    private val userRepository: UserRepository
) : UserAccountVerificationUseCase {
    override suspend fun invoke(email: String): Flow<UserAccountVerificationUseCase.Result> {
        return userRepository.postSendEmailLink().flatMapMerge { 
            when (it) {
                is ResultData.Success -> {
                    userRepository.postPeek().flatMapMerge { 
                        when (it) {
                            is ResultData.Success -> {
                                val canSignIn = it.data?.userName == "Something"
                                flow { emit(UserAccountVerificationUseCase.Result.UserEmailVerifiedSuccessfully(canSignIn)) }
                            } else {
                                flow { emit(UserAccountVerificationUseCase.Result.ErrorVerifyingUserEmail) }
                            }
                        }
                    }
                }
                is ResultData.Error -> {
                    if (it.exception is RetrofitNetworkError) {
                        if (it.exception.errorCode == 404) {
                            flow { emit(UserAccountVerificationUseCase.Result.ErrorEmailDoesNotExist) }
                        } else {
                            flow { emit(UserAccountVerificationUseCase.Result.ErrorVerifyingUserEmail) }
                        }
                    } else {
                        flow { emit(UserAccountVerificationUseCase.Result.ErrorVerifyingUserEmail) }
                    }
                }
            }
        }
    }
}

问题

以上解决方案按预期工作,如果第一个 API 调用 POST /send_email_link 曾经 return 是 404,用例将按预期运行并且 return ErrorEmailDoesNotExist 响应,以便 ViewModel 可以将其传递回 UI 并显示预期的用户体验。

如您所见,问题是该解决方案需要大量样板代码,我认为使用 Kotlin Coroutines 会使事情比使用 RxJava 更简单,但事实并非如此结果还是那样。我很确定这是因为我遗漏了什么或者我还没有完全学会如何正确使用 Flow。

到目前为止我尝试了什么

我试图改变我从存储库中发出元素的方式,来自:

...
    override fun getUserProfile(): Flow<ResultData<ApiUserProfileResponse>> {
        return flow {
            val response = retrofitApi.getUserProfileFromApi()
            if (response.isSuccessful) {
                emit(ResultData.Success(response.body()!!))
            } else {
                emit(ResultData.Error(RetrofitNetworkError(response.code())))
            }
        }
    }
...

像这样:

...
    override fun getUserProfile(): Flow<ResultData<ApiUserProfileResponse>> {
        return flow {
            val response = retrofitApi.getUserProfileFromApi()
            if (response.isSuccessful) {
                emit(ResultData.Success(response.body()!!))
            } else {
                error(RetrofitNetworkError(response.code()))
            }
        }
    }
..

所以我可以像使用 RxJava 的 onErrorResume():

一样使用 catch() 函数
class UserAccountVerificationUseCaseImpl(
    private val userRepository: UserRepository
) : UserAccountVerificationUseCase {
    override suspend fun invoke(email: String): Flow<UserAccountVerificationUseCase.Result> {
        return userRepository.postSendEmailLink()
            .catch { e ->
                if (e is RetrofitNetworkError) {
                    if (e.errorCode == 404) {
                        flow { emit(UserAccountVerificationUseCase.Result.ErrorEmailDoesNotExist) }
                    } else {
                        flow { emit(UserAccountVerificationUseCase.Result.ErrorVerifyingUserEmail) }
                    }
                } else {
                    flow { emit(UserAccountVerificationUseCase.Result.ErrorVerifyingUserEmail) }
                }
            }
            .flatMapMerge {
                userRepository.postPeek().flatMapMerge {
                    when (it) {
                        is ResultData.Success -> {
                            val canSignIn = it.data?.userName == "Something"
                            flow { emit(UserAccountVerificationUseCase.Result.UserEmailVerifiedSuccessfully(canSignIn)) }
                        } else -> {
                            flow { emit(UserAccountVerificationUseCase.Result.ErrorVerifyingUserEmail) }
                        }
                    }
                }
            }
        }
    }
}

确实 减少了一些样板代码,但我无法让它工作,因为一旦我尝试 运行 用例像这样我开始收到错误消息说我不应该在 catch().

中发出项目

即使我能让它正常工作,这里的样板代码仍然太多。我虽然用 Kotlin Coroutines 做这样的事情意味着有更简单、更易读的用例。类似于:

...
class UserAccountVerificationUseCaseImpl(
    private val userRepository: AuthRepository
) : UserAccountVerificationUseCase {
    override suspend fun invoke(email: String): Flow<UserAccountVerificationUseCase.Result> {
        return flow {
            coroutineScope {
                val sendLinksResponse = userRepository.postSendEmailLink()
                if (sendLinksResponse is ResultData.Success) {
                    val peekAccount = userRepository.postPeek()
                    if (peekAccount is ResultData.Success) {
                        emit(UserAccountVerificationUseCase.Result.UserEmailVerifiedSuccessfully())
                    } else {
                        emit(UserAccountVerificationUseCase.Result.ErrorVerifyingUserEmail)
                    }
                } else {
                    if (sendLinksResponse is ResultData.Error) {
                        if (sendLinksResponse.error == 404) {
                            emit(UserAccountVerificationUseCase.Result.ErrorEmailDoesNotExist)
                        } else {
                            emit(UserAccountVerificationUseCase.Result.ErrorVerifyingUserEmail)
                        }
                    } else {
                        emit(UserAccountVerificationUseCase.Result.ErrorVerifyingUserEmail)
                    }
                }
            }
        }
    }
}
...

这就是我对使用 Kotlin 协程的想象。放弃 RxJava 的 zip()contact()delayError()onErrorResume() 和所有这些 Observable 函数,以支持更具可读性的东西。

问题

如何减少样板代码的数量并使我的用例看起来更像协程?

备注

我知道有些人只是直接从 ViewModel 层调用存储库,但我喜欢在中间有这个 UseCase 层,这样我就可以包含与切换流和处理相关的所有代码这里有错误。

欢迎任何反馈!谢谢!

编辑#1

根据 @Joffrey 的回复,我更改了代码,使其像这样工作:

Retrofit API 层保留 returning 挂起功能。

data class ApiUserProfileResponse(
    @SerializedName("user_name") val userName: String
    // ...
)

interface RetrofitApi {
    @GET("api/user/profile")
    suspend fun getUserProfileFromApi(): Response<ApiUserProfileResponse>
}

存储库现在 return 是一个可挂起的函数,我已经删除了 Flow 包装器:

interface UserRepository {
    suspend fun getUserProfile(): ResultData<ApiUserProfileResponse>
}

class UserRepositoryImpl(
    private val retrofitApi: RetrofitApi
) : UserRepository {
    override suspend fun getUserProfile(): ResultData<ApiUserProfileResponse> {
        val response = retrofitApi.getUserProfileFromApi()
        return if (response.isSuccessful) {
            ResultData.Success(response.body()!!)
        } else {
            ResultData.Error(RetrofitNetworkError(response.code()))
        }
    }
}

用例 保持 returning a Flow 因为我可能还会在此处插入对 Room DB 的调用:

interface GetUserProfileUseCase {
    sealed class Result {
        object ErrorFetchingUserProfile : Result()
        data class UserProfileFetched(
            val user: ExampleUser
        ) : Result()
    }

    suspend operator fun invoke(email: String): Flow<Result>
}

class GetUserProfileUseCaseImpl(
    private val userRepository: UserRepository
) : GetUserProfileUseCase {
    override suspend fun invoke(email: String): Flow<GetUserProfileUseCase.Result> {
        return flow {
            val userProfileResponse = userRepository.getUserProfile()
            when (userProfileResponse) {
                is ResultData.Success -> {
                    emit(GetUserProfileUseCase.Result.UserProfileFetched(it.toUserModel()))
                }
                is ResultData.Error -> {
                    emit(GetUserProfileUseCase.Result.ErrorFetchingUserProfile)
                }
            }
        }
    }
}

这看起来干净多了。现在,将同样的事情应用到 UserAccountVerificationUseCase:

interface UserAccountVerificationUseCase {
    sealed class Result {
        object ErrorVerifyingUserEmail : Result()
        object ErrorEmailDoesNotExist : Result()
        data class UserEmailVerifiedSuccessfully(
            val canSignIn: Boolean
        ) : Result()
    }

    suspend operator fun invoke(email: String): Flow<Result>
}

class UserAccountVerificationUseCaseImpl(
    private val userRepository: UserRepository
) : UserAccountVerificationUseCase {
    override suspend fun invoke(email: String): Flow<UserAccountVerificationUseCase.Result> {
        return flow { 
            val sendEmailLinkResponse = userRepository.postSendEmailLink()
            when (sendEmailLinkResponse) {
                is ResultData.Success -> {
                    val peekResponse = userRepository.postPeek()
                    when (peekResponse) {
                        is ResultData.Success -> {
                            val canSignIn = peekResponse.data?.userName == "Something"
                            emit(UserAccountVerificationUseCase.Result.UserEmailVerifiedSuccessfully(canSignIn)
                        }
                        else -> {
                            emit(UserAccountVerificationUseCase.Result.ErrorVerifyingUserEmail)
                        }
                    }
                }
                is ResultData.Error -> {
                    if (sendEmailLinkResponse.isNetworkError(404)) {
                        emit(UserAccountVerificationUseCase.Result.ErrorEmailDoesNotExist)
                    } else {
                        emit(UserAccountVerificationUseCase.Result.ErrorVerifyingUserEmail)
                    }
                }
            }
        }
    }
}

这看起来干净多了,而且效果很好。我仍然想知道这里是否还有改进的余地。

我在这里看到的最明显的问题是您对单个值使用 Flow 而不是 suspend 函数。

协程通过使用 return 纯值或抛出异常的挂起函数使单值用例变得更加简单。你当然也可以让它们 return Result-like 类 来封装错误而不是实际使用异常,但重要的部分是使用 suspend 函数你暴露了一个看似同步(因此方便)API,同时仍受益于异步运行时。

在提供的示例中,您没有在任何地方订阅更新,所有流实际上只提供一个元素并完成,因此没有真正的理由使用流,并且它使代码复杂化。这也让习惯协程的人更难阅读,因为它看起来像多个值即将到来,并且可能 collect 是无限的,但事实并非如此。

每次你写 flow { emit(x) } 它应该只是 x.

根据上述内容,您有时会使用 flatMapMerge 并在 lambda 中使用单个元素创建流。除非您正在寻找计算的并行化,否则您应该直接选择 .map { ... } 。所以替换这个:

val resultingFlow = sourceFlow.flatMapMerge {
    if (something) {
        flow { emit(x) }
    } else {
        flow { emit(y) }
    }
}

有了这个:

val resultingFlow = sourceFlow.map { if (something) x else y }