在无限流 RxJava 上重复使用条件
Repeat using condition on infinite stream RxJava
我有以下无限流:
Observable<MyEvent> eventStream;
此流上发布了许多事件,应用停止时流也停止。
我有 2 个事件需要监听:
MyEvent.START & MyEvent.STOP
当发出 START 事件时,我 运行 一个可以发出 STOP 的函数。如果在特定时间内我没有收到 STOP 事件,我想重新运行 这个功能。如果在第三次 运行 之后我仍然没有 STOP 事件,它将停止重试。
所以我需要这样的东西:
eventStream.filter(e -> e == START || e == STOP)
.repeat(3)
.takeUntil(/* while i dont get STOP*/)
.delay(2, TimeUnit.SECONDS)
.subscribe(this::onStart, this::onError)
/*
* By the way the first time i dont want a delay before running the function
*/
但是retry只有在出现错误信号时才起作用,而repeat需要完成信号。
我知道可以做到,但我不知道如何实现(以 rx 方式)。
例如:
如果eventStream
很热,你可以把它用于takeUntil
到:
eventStream.filter(e -> e == START)
.flatMap(e -> Observable.just(e)
.observeOn(Schedulers.computation())
.doOnNext(e -> { onStart(e); })
.delay(2, TimeUnit.SECONDS)
.repeat(3)
.takeUntil(eventStream.filter(e -> e == STOP))
)
.subscribe(/* ... */);
我有以下无限流:
Observable<MyEvent> eventStream;
此流上发布了许多事件,应用停止时流也停止。
我有 2 个事件需要监听:
MyEvent.START & MyEvent.STOP
当发出 START 事件时,我 运行 一个可以发出 STOP 的函数。如果在特定时间内我没有收到 STOP 事件,我想重新运行 这个功能。如果在第三次 运行 之后我仍然没有 STOP 事件,它将停止重试。
所以我需要这样的东西:
eventStream.filter(e -> e == START || e == STOP)
.repeat(3)
.takeUntil(/* while i dont get STOP*/)
.delay(2, TimeUnit.SECONDS)
.subscribe(this::onStart, this::onError)
/*
* By the way the first time i dont want a delay before running the function
*/
但是retry只有在出现错误信号时才起作用,而repeat需要完成信号。
我知道可以做到,但我不知道如何实现(以 rx 方式)。
例如:
如果eventStream
很热,你可以把它用于takeUntil
到:
eventStream.filter(e -> e == START)
.flatMap(e -> Observable.just(e)
.observeOn(Schedulers.computation())
.doOnNext(e -> { onStart(e); })
.delay(2, TimeUnit.SECONDS)
.repeat(3)
.takeUntil(eventStream.filter(e -> e == STOP))
)
.subscribe(/* ... */);