在无限流 RxJava 上重复使用条件

Repeat using condition on infinite stream RxJava

我有以下无限流:

Observable<MyEvent> eventStream;

此流上发布了许多事件,应用停止时流也停止。

我有 2 个事件需要监听:

MyEvent.START & MyEvent.STOP

当发出 START 事件时,我 运行 一个可以发出 STOP 的函数。如果在特定时间内我没有收到 STOP 事件,我想重新运行 这个功能。如果在第三次 运行 之后我仍然没有 STOP 事件,它将停止重试。

所以我需要这样的东西:

eventStream.filter(e -> e == START || e == STOP)
   .repeat(3)
   .takeUntil(/* while i dont get STOP*/)
   .delay(2, TimeUnit.SECONDS)
   .subscribe(this::onStart, this::onError)
   
/*
* By the way the first time i dont want a delay before running the function
*/

但是retry只有在出现错误信号时才起作用,而repeat需要完成信号。

我知道可以做到,但我不知道如何实现(以 rx 方式)。

例如:

如果eventStream很热,你可以把它用于takeUntil到:

eventStream.filter(e -> e == START)
.flatMap(e -> Observable.just(e)
             .observeOn(Schedulers.computation())
             .doOnNext(e -> { onStart(e); })
             .delay(2, TimeUnit.SECONDS)
             .repeat(3)
             .takeUntil(eventStream.filter(e -> e == STOP))
)
.subscribe(/* ... */);