将 LiveData 转换为 RxJava 可观察对象

Convert LiveData to RxJava observable

我有 2 个视图:带金额的 EditText(不应为空)和协议复选框(应选中)。我还有 2 个 MutableLive 数据变量,它们表示我的 ViewModel 中此视图的状态。

我想在 Observables 中组合这 2 个变量并使用 Observable.combineLattest 到 enable/disable 我的 "Send" 按钮。

我找到了调用 android.arch.lifecycle:reactivestreams 并将我的 LiveData 转换为 Publishers 的库,但我无法在 Observable.combineLattest 中使用它们,因为 org.reactivestreams 因为 Publisher 是 org.reactivestreams接口和 Observable.combineLattest 接受可观察源。

我看了一些文章,但他们都提到了这个库。

目前我有这样的代码:

override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)

        ...

        dispossable = Observable.combineLatest(
                LiveDataReactiveStreams.toPublisher(this, vm.amount),
                LiveDataReactiveStreams.toPublisher(this, vm.isAgreementChecked),
                BiFunction<String, Boolean, Boolean> { amount, isChecked ->
                    amount.isNotEmpty() && isChecked
                })
    }

有没有人知道将 LiveData 转换为 Observable 的好方法。 提前致谢。

正如 Blackbelt 所说的那样——我可以使用来自 LiveData 的 Flowable,而不是将 Observable 用于 combineLatest(以及其他操作符,如 Zip、Debounce 等):

LiveDataReactiveStreams.toPublisher(/*lifecycle*/, /*observable Field*/).

所以我目前的解决方案是这样的:

disposable = Flowable.combineLatest(
    LiveDataReactiveStreams.toPublisher(this, vm.amount),
    LiveDataReactiveStreams.toPublisher(this, vm.isAgreementChecked),
    BiFunction<String, Boolean, Boolean> { amount, isChecked ->
        amount.isNotEmpty() && isChecked
    }).subscribe { isDataValid ->
        vm.setIsDataValid(isDataValid)
    }

再次感谢:)

另一种解决方案可能是(我需要这个,因为 WorkManager 只有 returns LiveData):

fun getWorkData(): Flowable<List<WorkInfo>> {
    val workDataLiveData = WorkManager.getInstance().getWorkInfosByTagLiveData("TAG")
    Flowable.create({emitter -> 
        val observer = Observer> { emitter.onNext(it) }
        val disposable = disposeInUiThread { workDataLiveData.removeObserver(observer) }
        emitter.setDisposable(disposable)
        workDataLiveData.observeForever(observer)
    }, BackpressureStrategy.LATEST)
}

private fun disposeInUiThread(action: Action): Disposable {
        return Disposables.fromAction {
            if (Looper.getMainLooper() == Looper.myLooper()) {
                action.run()
            } else {
                val inner = AndroidSchedulers.mainThread().createWorker()
                inner.schedule {
                    try {
                        action.run()
                    } catch (e: Exception) {
                        Timber.e(e, "Could not unregister receiver in UI Thread")
                    }

                    inner.dispose()
                }
            }
        }
    }