如何在 Kotlin 中为 RxJava 函数创建一个安全的调用包装器?

How to create a safe call wrapper for RxJava functions in Kotlin?

下面粘贴的是来自 Google 的 Plaid 应用程序的一段代码,它包装了挂起函数并安全地执行网络操作。需要进行哪些更改才能使其与 RxJava 函数而不是协程一起工作并等待网络结果,我们将不胜感激。

/**
 * Wrap a suspending API [call] in try/catch. In case an exception is thrown, a [Result.Error] is
 * created based on the [errorMessage].
 */
suspend fun <T : Any> safeApiCall(call: suspend () -> Result<T>, errorMessage: String): Result<T> {
    return try {
        call()
    } catch (e: Exception) {
        // An exception was thrown when calling the API so we're converting this to an IOException
        Result.Error(IOException(errorMessage, e))
    }
}

ProductHuntRemoteDataSource.kt中的用法示例:

class ProductHuntRemoteDataSource @Inject constructor(private val service: ProductHuntService) {

    /**
     * Load Product Hunt data for a specific page.
     */
    suspend fun loadData(page: Int) = safeApiCall(
            call = { requestData(page) },
            errorMessage = "Error loading ProductHunt data"
    )

    private suspend fun requestData(page: Int): Result<GetPostsResponse> {
        val response = service.getPostsAsync(page)
        if (response.isSuccessful) {
            val body = response.body()
            if (body != null) {
                return Result.Success(body)
            }
        }
        return Result.Error(IOException("Error loading ProductHunt data " +
                "${response.code()} ${response.message()}"))
    }
}

您可以通过使用 awaitFirst()await()(对于 Single)扩展函数将您的 observables 转换为挂起函数,从而轻松地使用 RxJava 进行这项工作。您需要使用 RxJava 扩展。

编辑:

你可以在你的 observables 上写一个扩展函数来做一些类似但不完全相同的事情。

这是一个可能的解决方案

fun <T> Observable<Result<T>>.safeCall(errorMessage: String, block: (Result<T>) -> Unit): Disposable =
    subscribe({ response: Result<T> ->
        if (response.isSuccess && response.body != null) {
            block(Result.Success(response.body))
        } else {
            block(Result.Error(IOException(errorMessage)))
        }
    }, { throwable ->
        block(Result.Error(IOException(errorMessage, throwable)))
    })

您可以像这样在可观察对象上调用它

val disposable = myObservable.safeCall("Error message") { result ->
    // TODO: handle result
}