我怎样才能使用 Rx Java 达到这个要求

How can I achieve this requirement using Rx Java

我有一个包含(良好、非关键、关键)值的状态(枚举)

所以要求是:

  1. 应该在状态进入非临界状态时触发。
  2. 应该在状态进入临界状态时触发。
  3. 当状态保持在临界状态 15 秒时应该触发。

输入:

publishSubject.onNext("Good")
publishSubject.onNext("Critcal") 
publishSubject.onNext("Critcal") 
publishSubject.onNext("NonCritical")  
publishSubject.onNext("Critacal") 
publishSubject.onNext("Critical") 
publishSubject.onNext("Good")
and so on...

参考代码结构:

    var publishSubject = PublishSubject.create<State>()
    publishSubject.onNext(stateObject)


    publishSubject
            /* Business Logic Required Here ?? */
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                AppLogger.printLog("Trigger Success --> ")
            }

请帮忙, 提前致谢,

您的要求的前两部分应该合二为一。您要求在 NonCriticalCritical 事件上触发链,因此应该 而不是 Good 事件触发链。同样,如果状态与前一个事件不同,您只需要触发一个事件。对于这两个 .filter 事件应该足够了:

var lastKnownState: State = null

publishSubject
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .filter(this::checkStateDiffers)       // Check we have a new state
        .filter { state -> state != State.Good }        // Check event is good
        .subscribe {
            AppLogger.printLog("Trigger Success --> ")
        }

...

private fun checkStateDiffers(val state: State): Boolean {
     val isDifferent = state != lastKnownState
     if (isDifferent) lastKnownState = state       // Update known state if changed
     return isDifferent
}

超时要求有点棘手。 RxJava 的 timeout() 运算符提供了在一段时间内没有收到任何新内容时发出错误的选项。但是,我假设您希望在收到超时后继续监听事件。同样,如果我们只发送另一个 Critical 事件,它将被第一个 filter 丢弃。所以在这种情况下,我会推荐第二个一次性的,它只负责监听这个超时。

Disposable timeoutDisp = publishSubject
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .timeout(15, TimeUnit.SECONDS)
        .onErrorResumeNext(State.Timeout)
        .filter { state -> state == State.Timeout }
        .filter { state -> lastKnownState == State.Critical }
        .subscribe {
            AppLogger.printLog("Timeout Success --> ")
        }

同时调整 checkStateDiffers() 以不在第一个链中保存此 Timeout 状态。

private fun checkStateDiffers(val state: State): Boolean {
     if (state == State.Timeout) return true

     var isDifferent = state != lastKnownState
     if (isDifferent) lastKnownState = state       // Update known state if changed
     return isDifferent
}

您可以使用 distinctUntilChanged() 来抑制不改变状态的事件。使用 filter().

过滤掉正常事件

使用 switchMap() 运算符在状态更改时创建新订阅。当状态为 "critical" 时,使用 interval() 运算符等待 15 秒。如果状态在这 15 秒内发生变化,switchMap() 将取消订阅并重新订阅一个新的可观察对象。

publishSubject
  .distinctUntilChanged()
  .subscribeOn(Schedulers.computation())
  .observeOn(AndroidSchedulers.mainThread())
  .filter( state -> state != State.Normal )
  .switchMap( state -> {
                   if (state == State.Critical) {
                     return Observable.interval(0, 15, TimeUnit.SECONDS) // Note 1
                        .map(v -> State.Critical); // Note 2
                   }
                   return Observable.just( State.Noncritical );
                 })
  .subscribe( ... );
  1. interval() 被赋予初始值 0,使其立即发出一个值。 15 秒后,将发出下一个值,依此类推。
  2. map() 运算符将 interval() 发出的 Long 转换为