如何将挂起函数转换为 RX Single(或 Completable)?

How to convert a suspend function to an RX Single (or Completable)?

我们正在将我们的项目从 RX 重构为 Kotlin 协程,但不是一蹴而就的,因此我们的项目需要同时使用这两种协程。

现在我们有很多像这样使用 RX single 作为 return 类型的方法,因为它们很重,很长 运行 操作,比如 API 调用。

fun foo(): Single<String> // Heavy, long running opertation

我们希望它是这样的:

suspend fun foo(): String // The same heavy, long running opertation

在我们使用这种方法的地方,我们希望仍然使用 RX。

我们一直在这样做:

foo()
    .subscribeOn(Schedulers.io())
    .map { ... }
    .subscribe { ... }

现在我应该如何将我的挂起乐趣转换成我可以使用的 Single?

这是个好主意吗?

Single.fromCallable {
    runBlocking {
        foo() // This is now a suspend fun
    }
}
    .subscribeOn(Schedulers.io())
    .map { ... }
    .subscribe { ... }

我没有给它很多时间来推理它,但我认为你创建它的方式可能会在挂起函数的持续时间内使用一个额外的线程。

有一个官方 kotlinx-coroutines-rx3 库可以进行转换。 rxSingle 函数在这里是相关的。在后台,它将以挂起的方式调用挂起函数,而不会阻塞其他线程。

我自己还没有用过,所以你可能想自己测试一下,但我认为你不需要紧接着使用 subscribeOn,因为实际工作是在协程。你的 Foo 挂起函数,因为它是一个挂起函数,如果它执行阻塞 IO,应该已经在内部使用 Dispatchers.IO

rxSingle { foo() }
    .map { ... }
    .subscribe { ... }