LiveData 到 Observable 转换使用 RxJava

LiveData to Observable conversion to use RxJava

通过使用LiveDataReactiveStreams可以将LiveData转换为Publisher。 但是为了使用 RxJava2 方法,比如 withLatestFrom 我需要一个 Observable 而不是 Publisher。除了使用这个(过时的)库之外,还有其他方法可以将 Publisher 转换为 Observable - RxJava<->ReactiveStreams ?

所以我设法做了这样的事情。但我不知道将这段代码放在 android 项目结构中的什么位置,因为 toPublisher 需要 Activity 个实例。

val rxDataPublisher: Publisher<DataSnapshot> = LiveDataReactiveStreams.toPublisher(this@LocationActivity,fireBaseLiveData)

val rxLocationPublisher: Publisher<Location> = LiveDataReactiveStreams.toPublisher(this@LocationActivity, locationLiveData)

val rxFlowable: Flowable<Pair<Location, DataSnapshot>> = Flowable.fromPublisher(rxDataPublisher)
                .withLatestFrom(rxLocationPublisher, BiFunction{t1, t2 -> Pair(t2,t1) })

val flowableToLiveData = LiveDataReactiveStreams.fromPublisher(rxFlowable)
// Flowable to LiveData

val flowable = Flowable.just(1, 2, 3)
val liveData = LiveDataReactiveStreams.fromPublisher<Int>(flowable)

// From Live Data to Publisher(Flowable is also publisher)

LiveDataReactiveStreams.toPublisher(this, liveData).subscribe(object: Subscriber<Int>{
    override fun onNext(t: Int?) {}

    override fun onComplete() {}

    override fun onSubscribe(s: Subscription?) {}

    override fun onError(t: Throwable?) {}
})