使用 compose() 时使用 RxJav/RxKotlin2 自定义 ObservableTransformer

Customize ObservableTransformer with RxJav/RxKotlin2 when using with compose()

我尝试编写一个与 compose() 一起使用的转换函数,以减少样板代码。就像这样很简单:

    fun <R> withSchedulers(): ObservableTransformer<R, R> {
        return ObservableTransformer {
            it.subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread())
        }
    }

所以每次我想在 ioThread 上订阅任何内容并在 mainThread 上收听结果时,只需几行代码:

    Observable.just(1)
        .compose(MyUtilClass.withSchedulers())
        .subscribe()

但是不只有Observable,我们还有SingleCompletableMaybeFlowable。因此,每次我想将它们与我的 withSchedulers() 函数组合时,我都必须将其转换为新类型(我不希望如此)。

例如,

Completable.fromAction { 
        Log.d("nhp", "hello world")
    }//.compose(MyUtilClass.withSchedulers()) <-- This is not compiled
            .toObservable() <--- I have to transform it into Observable
            .compose(MyUtilClass.withSchedulers())
            .subscribe()

所以我的问题是,有没有办法编写上述函数以与 compose() 一起用于任何类型的 ObservableSingleCompletable、。 ..)?或者我们必须编写使用 ObservableTransformerSingleTransformer、....?

的不同函数

我使用 reified 类型创建了一个辅助方法:

inline fun <reified T> withSchedulers(): T {
    when (T::class) {
        ObservableTransformer::class  -> return ObservableTransformer<Unit, Unit> {
            it.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
        } as T
        SingleTransformer::class      -> return SingleTransformer<Unit, Unit> {
            it.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
        } as T
        CompletableTransformer::class -> return CompletableTransformer {
            it.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
        } as T
    }
    throw IllegalArgumentException("not a valid Transformer type")
}

示例:

    Observable.just("1", "2")
            .compose(withSchedulers<ObservableTransformer<String, String>>())
            .subscribe(System.out::println)

    Single.just(3)
            .compose(withSchedulers<SingleTransformer<Int, Int>>())
            .subscribe(Consumer { println(it) })

    Completable.defer { Completable.complete()  }
            .compose(withSchedulers<CompletableTransformer>())
            .subscribe { println("completed") }

输出:

1
2
3
completed

可能还有其他方法,但想到了这个。