AsyncSubject 可以处理 RxJava 中的 SingleLiveEvent 情况吗?
Can an AsyncSubject handle the SingleLiveEvent case in RxJava?
问题
一次性事件的反应式编程模式的问题在于,在最初的一次性事件发生后,它们可能会重新发送给订阅者。
对于 LiveData the SingleLiveEvent provides a solution using an EventObserver
也可以应用于 Kotlin Flow。
问题
能否创建一个 AsyncSubject
observable 来处理 RxJava 中的 SingleLiveEvent 事件?主要问题似乎是,在调用 onComplete
之后,是否有办法手动 "re-opened" 重新发送数据?
可能的解决方案
AsyncSubject
似乎是 RxJava 的潜在解决方案,无需创建 EventObserver
,因为 documentation 声明它将 仅在序列为完成。
实施 - 加载状态样本
加载布尔值从 ViewModel 方法 initFeed
和视图效果状态发出到视图,在本例中是一个片段。加载布尔值在片段初始化和 ViewModel 通过 onNext
发送 true
时按预期工作,并在成功或错误尝试时以 onComplete
完成。
但是,当例如滑动刷新启动相同的 initFeed
方法时,重新发出值的尝试失败。同一个对象调用了onComplete
后好像不能再用onNext
了
SomeViewEffect.kt
data class _FeedViewEffect(
val _isLoading: AsyncSubject<Boolean> = AsyncSubject.create(),
)
data class FeedViewEffect(private val _viewEffect: _FeedViewEffect) {
val isLoading: AsyncSubject<Boolean> = _viewEffect._isLoading
}
SomeViewModel.kt
private fun initFeed(toRetry: Boolean) {
val disposable = feedRepository.initFeed(pagedListBoundaryCallback(toRetry))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { results ->
when (results.status) {
LOADING -> {
Log.v(LOG_TAG, "initFeed ${LOADING.name}")
_viewEffect._isLoading.onNext(true)
}
SUCCESS -> {
Log.v(LOG_TAG, "initFeed ${SUCCESS.name}")
_viewEffect._isLoading.onNext(false)
_viewEffect._isLoading.onComplete()
_viewState._feed.onNext(results.data)
}
ERROR -> {
Log.v(LOG_TAG, "initFeed ${ERROR.name}")
_viewEffect._isLoading.onNext(false)
_viewEffect._isLoading.onComplete()
_viewEffect._isError.onNext(true)
}
}
}
disposables.add(disposable)
}
SomeFragment.kt
private fun initViewEffects() {
val isLoadingDisposable = viewModel.viewEffect.isLoading
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnError { Log.v(LOG_TAG, "Error loading isLoading") }
.subscribe { isLoading ->
if (isLoading) progressBar.visibility = VISIBLE
else {
progressBar.visibility = GONE
swipeToRefresh.isRefreshing = false
}
}
compositeDisposable.addAll(isLoadingDisposable, isErrorDisposable)
}
不是很清楚为什么你需要 AsyncSubject 只发出最后一个事件。对于这种情况,您是否尝试使用 Behavior or Publish 处理器?
使用事件包装器
AsyncSubject
似乎不是处理从 Observable 到 Subscriber 的一次性事件排放的合适解决方案。在 onComplete
被调用后,AsyncSubject 不能 "re-open" 发出未来的一次性事件。
使用 Event
, as outlined in LiveData with SnackBar, Navigation and other events (the SingleLiveEvent case) 等事件包装器是最好的方法。
FeedViewEffect.kt
data class _FeedViewEffect(
val _isLoading: BehaviorSubject<Event<Boolean>> = BehaviorSubject.create()
)
data class FeedViewEffect(private val _viewEffect: _FeedViewEffect) {
val isLoading: BehaviorSubject<Event<Boolean>> = _viewEffect._isLoading
}
FeedViewModel.kt
private fun initFeed(toRetry: Boolean) {
val disposable = feedRepository.initFeed(pagedListBoundaryCallback(toRetry))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { results ->
when (results.status) {
LOADING -> _viewEffect._isLoading.onNext(Event(true))
SUCCESS -> _viewEffect._isLoading.onNext(Event(false))
ERROR -> _viewEffect._isLoading.onNext(Event(false))
}
}
disposables.add(disposable)
}
FeedFragment.kt
@ExperimentalCoroutinesApi
private fun initViewEffects() {
val isLoadingDisposable = viewModel.viewEffect.isLoading
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnError { Log.v(LOG_TAG, "Error loading isLoading") }
.subscribe { isLoading ->
if (isLoading.getContentIfNotHandled() == true) {
progressBar.visibility = VISIBLE
} else {
progressBar.visibility = GONE
swipeToRefresh.isRefreshing = false
}
}
compositeDisposable.addAll(isLoadingDisposable)
}
问题
一次性事件的反应式编程模式的问题在于,在最初的一次性事件发生后,它们可能会重新发送给订阅者。
对于 LiveData the SingleLiveEvent provides a solution using an EventObserver
也可以应用于 Kotlin Flow。
问题
能否创建一个 AsyncSubject
observable 来处理 RxJava 中的 SingleLiveEvent 事件?主要问题似乎是,在调用 onComplete
之后,是否有办法手动 "re-opened" 重新发送数据?
可能的解决方案
AsyncSubject
似乎是 RxJava 的潜在解决方案,无需创建 EventObserver
,因为 documentation 声明它将 仅在序列为完成。
实施 - 加载状态样本
加载布尔值从 ViewModel 方法 initFeed
和视图效果状态发出到视图,在本例中是一个片段。加载布尔值在片段初始化和 ViewModel 通过 onNext
发送 true
时按预期工作,并在成功或错误尝试时以 onComplete
完成。
但是,当例如滑动刷新启动相同的 initFeed
方法时,重新发出值的尝试失败。同一个对象调用了onComplete
后好像不能再用onNext
了
SomeViewEffect.kt
data class _FeedViewEffect(
val _isLoading: AsyncSubject<Boolean> = AsyncSubject.create(),
)
data class FeedViewEffect(private val _viewEffect: _FeedViewEffect) {
val isLoading: AsyncSubject<Boolean> = _viewEffect._isLoading
}
SomeViewModel.kt
private fun initFeed(toRetry: Boolean) {
val disposable = feedRepository.initFeed(pagedListBoundaryCallback(toRetry))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { results ->
when (results.status) {
LOADING -> {
Log.v(LOG_TAG, "initFeed ${LOADING.name}")
_viewEffect._isLoading.onNext(true)
}
SUCCESS -> {
Log.v(LOG_TAG, "initFeed ${SUCCESS.name}")
_viewEffect._isLoading.onNext(false)
_viewEffect._isLoading.onComplete()
_viewState._feed.onNext(results.data)
}
ERROR -> {
Log.v(LOG_TAG, "initFeed ${ERROR.name}")
_viewEffect._isLoading.onNext(false)
_viewEffect._isLoading.onComplete()
_viewEffect._isError.onNext(true)
}
}
}
disposables.add(disposable)
}
SomeFragment.kt
private fun initViewEffects() {
val isLoadingDisposable = viewModel.viewEffect.isLoading
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnError { Log.v(LOG_TAG, "Error loading isLoading") }
.subscribe { isLoading ->
if (isLoading) progressBar.visibility = VISIBLE
else {
progressBar.visibility = GONE
swipeToRefresh.isRefreshing = false
}
}
compositeDisposable.addAll(isLoadingDisposable, isErrorDisposable)
}
不是很清楚为什么你需要 AsyncSubject 只发出最后一个事件。对于这种情况,您是否尝试使用 Behavior or Publish 处理器?
使用事件包装器
AsyncSubject
似乎不是处理从 Observable 到 Subscriber 的一次性事件排放的合适解决方案。在 onComplete
被调用后,AsyncSubject 不能 "re-open" 发出未来的一次性事件。
使用 Event
, as outlined in LiveData with SnackBar, Navigation and other events (the SingleLiveEvent case) 等事件包装器是最好的方法。
FeedViewEffect.kt
data class _FeedViewEffect(
val _isLoading: BehaviorSubject<Event<Boolean>> = BehaviorSubject.create()
)
data class FeedViewEffect(private val _viewEffect: _FeedViewEffect) {
val isLoading: BehaviorSubject<Event<Boolean>> = _viewEffect._isLoading
}
FeedViewModel.kt
private fun initFeed(toRetry: Boolean) {
val disposable = feedRepository.initFeed(pagedListBoundaryCallback(toRetry))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { results ->
when (results.status) {
LOADING -> _viewEffect._isLoading.onNext(Event(true))
SUCCESS -> _viewEffect._isLoading.onNext(Event(false))
ERROR -> _viewEffect._isLoading.onNext(Event(false))
}
}
disposables.add(disposable)
}
FeedFragment.kt
@ExperimentalCoroutinesApi
private fun initViewEffects() {
val isLoadingDisposable = viewModel.viewEffect.isLoading
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnError { Log.v(LOG_TAG, "Error loading isLoading") }
.subscribe { isLoading ->
if (isLoading.getContentIfNotHandled() == true) {
progressBar.visibility = VISIBLE
} else {
progressBar.visibility = GONE
swipeToRefresh.isRefreshing = false
}
}
compositeDisposable.addAll(isLoadingDisposable)
}