AsyncSubject 可以处理 RxJava 中的 SingleLiveEvent 情况吗?

Can an AsyncSubject handle the SingleLiveEvent case in RxJava?

问题

一次性事件的反应式编程模式的问题在于,在最初的一次性事件发生后,它们可能会重新发送给订阅者。

对于 LiveData the SingleLiveEvent provides a solution using an EventObserver 也可以应用于 Kotlin Flow。

问题

能否创建一个 AsyncSubject observable 来处理 RxJava 中的 SingleLiveEvent 事件?主要问题似乎是,在调用 onComplete 之后,是否有办法手动 "re-opened" 重新发送数据?

可能的解决方案

AsyncSubject 似乎是 RxJava 的潜在解决方案,无需创建 EventObserver,因为 documentation 声明它将 仅在序列为完成

实施 - 加载状态样本

加载布尔值从 ViewModel 方法 initFeed 和视图效果状态发出到视图,在本例中是一个片段。加载布尔值在片段初始化和 ViewModel 通过 onNext 发送 true 时按预期工作,并在成功或错误尝试时以 onComplete 完成。

但是,当例如滑动刷新启动相同的 initFeed 方法时,重新发出值的尝试失败。同一个对象调用了onComplete后好像不能再用onNext

SomeViewEffect.kt

data class _FeedViewEffect(
    val _isLoading: AsyncSubject<Boolean> = AsyncSubject.create(),
)

data class FeedViewEffect(private val _viewEffect: _FeedViewEffect) {
    val isLoading: AsyncSubject<Boolean> = _viewEffect._isLoading
}

SomeViewModel.kt

private fun initFeed(toRetry: Boolean) {
        val disposable = feedRepository.initFeed(pagedListBoundaryCallback(toRetry))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe { results ->
                when (results.status) {
                    LOADING -> {
                        Log.v(LOG_TAG, "initFeed ${LOADING.name}")
                        _viewEffect._isLoading.onNext(true)
                    }
                    SUCCESS -> {
                        Log.v(LOG_TAG, "initFeed ${SUCCESS.name}")
                        _viewEffect._isLoading.onNext(false)
                        _viewEffect._isLoading.onComplete()
                        _viewState._feed.onNext(results.data)
                    }
                    ERROR -> {
                        Log.v(LOG_TAG, "initFeed ${ERROR.name}")
                        _viewEffect._isLoading.onNext(false)
                        _viewEffect._isLoading.onComplete()
                        _viewEffect._isError.onNext(true)
                    }
                }
            }
        disposables.add(disposable)
    }

SomeFragment.kt

private fun initViewEffects() {
        val isLoadingDisposable = viewModel.viewEffect.isLoading
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError { Log.v(LOG_TAG, "Error loading isLoading") }
            .subscribe { isLoading ->
                if (isLoading) progressBar.visibility = VISIBLE
                else {
                    progressBar.visibility = GONE
                    swipeToRefresh.isRefreshing = false
                }
            }
        compositeDisposable.addAll(isLoadingDisposable, isErrorDisposable)
    }

不是很清楚为什么你需要 AsyncSubject 只发出最后一个事件。对于这种情况,您是否尝试使用 Behavior or Publish 处理器?

使用事件包装器

AsyncSubject 似乎不是处理从 Observable 到 Subscriber 的一次性事件排放的合适解决方案。在 onComplete 被调用后,AsyncSubject 不能 "re-open" 发出未来的一次性事件。

使用 Event, as outlined in LiveData with SnackBar, Navigation and other events (the SingleLiveEvent case) 等事件包装器是最好的方法。

FeedViewEffect.kt

data class _FeedViewEffect(
    val _isLoading: BehaviorSubject<Event<Boolean>> = BehaviorSubject.create()
)

data class FeedViewEffect(private val _viewEffect: _FeedViewEffect) {
    val isLoading: BehaviorSubject<Event<Boolean>> = _viewEffect._isLoading
}

FeedViewModel.kt

private fun initFeed(toRetry: Boolean) {
        val disposable = feedRepository.initFeed(pagedListBoundaryCallback(toRetry))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe { results ->
                when (results.status) {
                    LOADING -> _viewEffect._isLoading.onNext(Event(true))
                    SUCCESS -> _viewEffect._isLoading.onNext(Event(false))
                    ERROR -> _viewEffect._isLoading.onNext(Event(false))
                }
            }
        disposables.add(disposable)
    }

FeedFragment.kt

 @ExperimentalCoroutinesApi
    private fun initViewEffects() {
        val isLoadingDisposable = viewModel.viewEffect.isLoading
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError { Log.v(LOG_TAG, "Error loading isLoading") }
            .subscribe { isLoading ->
                if (isLoading.getContentIfNotHandled() == true) {
                    progressBar.visibility = VISIBLE
                } else {
                    progressBar.visibility = GONE
                    swipeToRefresh.isRefreshing = false
                }
            }
        compositeDisposable.addAll(isLoadingDisposable)
    }