我怎样才能使用 Rx Java 达到这个要求
How can I achieve this requirement using Rx Java
我有一个包含(良好、非关键、关键)值的状态(枚举)
所以要求是:
- 应该在状态进入非临界状态时触发。
- 应该在状态进入临界状态时触发。
- 当状态保持在临界状态 15 秒时应该触发。
输入:
publishSubject.onNext("Good")
publishSubject.onNext("Critcal")
publishSubject.onNext("Critcal")
publishSubject.onNext("NonCritical")
publishSubject.onNext("Critacal")
publishSubject.onNext("Critical")
publishSubject.onNext("Good")
and so on...
参考代码结构:
var publishSubject = PublishSubject.create<State>()
publishSubject.onNext(stateObject)
publishSubject
/* Business Logic Required Here ?? */
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
AppLogger.printLog("Trigger Success --> ")
}
请帮忙,
提前致谢,
您的要求的前两部分应该合二为一。您要求在 NonCritical
和 Critical
事件上触发链,因此应该 而不是 为 Good
事件触发链。同样,如果状态与前一个事件不同,您只需要触发一个事件。对于这两个 .filter
事件应该足够了:
var lastKnownState: State = null
publishSubject
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.filter(this::checkStateDiffers) // Check we have a new state
.filter { state -> state != State.Good } // Check event is good
.subscribe {
AppLogger.printLog("Trigger Success --> ")
}
...
private fun checkStateDiffers(val state: State): Boolean {
val isDifferent = state != lastKnownState
if (isDifferent) lastKnownState = state // Update known state if changed
return isDifferent
}
超时要求有点棘手。 RxJava 的 timeout()
运算符提供了在一段时间内没有收到任何新内容时发出错误的选项。但是,我假设您希望在收到超时后继续监听事件。同样,如果我们只发送另一个 Critical
事件,它将被第一个 filter
丢弃。所以在这种情况下,我会推荐第二个一次性的,它只负责监听这个超时。
Disposable timeoutDisp = publishSubject
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.timeout(15, TimeUnit.SECONDS)
.onErrorResumeNext(State.Timeout)
.filter { state -> state == State.Timeout }
.filter { state -> lastKnownState == State.Critical }
.subscribe {
AppLogger.printLog("Timeout Success --> ")
}
同时调整 checkStateDiffers()
以不在第一个链中保存此 Timeout
状态。
private fun checkStateDiffers(val state: State): Boolean {
if (state == State.Timeout) return true
var isDifferent = state != lastKnownState
if (isDifferent) lastKnownState = state // Update known state if changed
return isDifferent
}
您可以使用 distinctUntilChanged()
来抑制不改变状态的事件。使用 filter()
.
过滤掉正常事件
使用 switchMap()
运算符在状态更改时创建新订阅。当状态为 "critical" 时,使用 interval()
运算符等待 15 秒。如果状态在这 15 秒内发生变化,switchMap()
将取消订阅并重新订阅一个新的可观察对象。
publishSubject
.distinctUntilChanged()
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.filter( state -> state != State.Normal )
.switchMap( state -> {
if (state == State.Critical) {
return Observable.interval(0, 15, TimeUnit.SECONDS) // Note 1
.map(v -> State.Critical); // Note 2
}
return Observable.just( State.Noncritical );
})
.subscribe( ... );
interval()
被赋予初始值 0
,使其立即发出一个值。 15
秒后,将发出下一个值,依此类推。
map()
运算符将 interval()
发出的 Long
转换为
我有一个包含(良好、非关键、关键)值的状态(枚举)
所以要求是:
- 应该在状态进入非临界状态时触发。
- 应该在状态进入临界状态时触发。
- 当状态保持在临界状态 15 秒时应该触发。
输入:
publishSubject.onNext("Good")
publishSubject.onNext("Critcal")
publishSubject.onNext("Critcal")
publishSubject.onNext("NonCritical")
publishSubject.onNext("Critacal")
publishSubject.onNext("Critical")
publishSubject.onNext("Good")
and so on...
参考代码结构:
var publishSubject = PublishSubject.create<State>()
publishSubject.onNext(stateObject)
publishSubject
/* Business Logic Required Here ?? */
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
AppLogger.printLog("Trigger Success --> ")
}
请帮忙, 提前致谢,
您的要求的前两部分应该合二为一。您要求在 NonCritical
和 Critical
事件上触发链,因此应该 而不是 为 Good
事件触发链。同样,如果状态与前一个事件不同,您只需要触发一个事件。对于这两个 .filter
事件应该足够了:
var lastKnownState: State = null
publishSubject
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.filter(this::checkStateDiffers) // Check we have a new state
.filter { state -> state != State.Good } // Check event is good
.subscribe {
AppLogger.printLog("Trigger Success --> ")
}
...
private fun checkStateDiffers(val state: State): Boolean {
val isDifferent = state != lastKnownState
if (isDifferent) lastKnownState = state // Update known state if changed
return isDifferent
}
超时要求有点棘手。 RxJava 的 timeout()
运算符提供了在一段时间内没有收到任何新内容时发出错误的选项。但是,我假设您希望在收到超时后继续监听事件。同样,如果我们只发送另一个 Critical
事件,它将被第一个 filter
丢弃。所以在这种情况下,我会推荐第二个一次性的,它只负责监听这个超时。
Disposable timeoutDisp = publishSubject
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.timeout(15, TimeUnit.SECONDS)
.onErrorResumeNext(State.Timeout)
.filter { state -> state == State.Timeout }
.filter { state -> lastKnownState == State.Critical }
.subscribe {
AppLogger.printLog("Timeout Success --> ")
}
同时调整 checkStateDiffers()
以不在第一个链中保存此 Timeout
状态。
private fun checkStateDiffers(val state: State): Boolean {
if (state == State.Timeout) return true
var isDifferent = state != lastKnownState
if (isDifferent) lastKnownState = state // Update known state if changed
return isDifferent
}
您可以使用 distinctUntilChanged()
来抑制不改变状态的事件。使用 filter()
.
使用 switchMap()
运算符在状态更改时创建新订阅。当状态为 "critical" 时,使用 interval()
运算符等待 15 秒。如果状态在这 15 秒内发生变化,switchMap()
将取消订阅并重新订阅一个新的可观察对象。
publishSubject
.distinctUntilChanged()
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.filter( state -> state != State.Normal )
.switchMap( state -> {
if (state == State.Critical) {
return Observable.interval(0, 15, TimeUnit.SECONDS) // Note 1
.map(v -> State.Critical); // Note 2
}
return Observable.just( State.Noncritical );
})
.subscribe( ... );
interval()
被赋予初始值0
,使其立即发出一个值。15
秒后,将发出下一个值,依此类推。map()
运算符将interval()
发出的Long
转换为