ReactiveX 在超时后发出空值或标记值
ReactiveX emit null or sentinel value after timeout
寻找一种干净的方法来将源 Observable
转换为在一段时间内未发出项目后发出单个 null
(或标记值)。
例如,如果源 observable 发出 1, 2, 3
然后在发出 4, 5, 6
之前停止发出 10 秒我希望发出的项目是 1, 2, 3, null, 4, 5, 6
.
该用例用于在 UI 中显示值,如果最后发出的值是 stale/old,则显示的值应变成破折号 -
或 N/A
.
我查看了 timeout
运算符,但它会在发生超时时终止 Observable
,这是不希望的。
使用 RxJava。
您可以通过稍微复杂的 publish-amb-timer 设置来实现:
PublishSubject<Integer> ps = PublishSubject.create();
TestScheduler s = Schedulers.test();
TestSubscriber<Integer> ts = new TestSubscriber<>();
ps.publish(o ->
o.take(1).ambWith(Observable.timer(10, TimeUnit.SECONDS, s).map(v -> (Integer)null))
.repeat().takeUntil(o.ignoreElements())
).subscribe(ts);
ps.onNext(1);
ps.onNext(2);
ps.onNext(3);
s.advanceTimeBy(15, TimeUnit.SECONDS);
ps.onNext(4);
ps.onNext(5);
ps.onNext(6);
ps.onCompleted();
ts.assertValues(1, 2, 3, null, 4, 5, 6);
发生的事情是源已发布,因此您可以从中一个接一个地获取项目或计时器事件,确保最快的获胜并使用下一个值重复它,所有这些都无需重新订阅原始源时间.
编辑 修复了上游完成 repeat() 进入无限循环的情况。
基于 and an answer in a similar question,替代实现:
单个标记值(根据 OP)
如果您正在寻找单个值来指示两次发射之间的时间间隔:
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final long duration = 100;
final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
.concatWith(Observable.never())
.takeUntil(subject)
.repeat();
subject.mergeWith(timeout).subscribe(subscriber);
subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));
连续标记值
如果您希望在源可观察对象在一段时间内未发出后连续接收值:
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final long duration = 100;
final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
.map(x -> -1)
.takeUntil(subject)
.repeat();
subject.mergeWith(timeout).subscribe(subscriber);
subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));
区别在于 timeout
可观察到的以及它是否重复出现。
您可以根据需要将 -1
替换为 null
。
以上所有内容均使用 Java 1.8.0_72
.
使用 RxJava 1.0.17 进行测试
寻找一种干净的方法来将源 Observable
转换为在一段时间内未发出项目后发出单个 null
(或标记值)。
例如,如果源 observable 发出 1, 2, 3
然后在发出 4, 5, 6
之前停止发出 10 秒我希望发出的项目是 1, 2, 3, null, 4, 5, 6
.
该用例用于在 UI 中显示值,如果最后发出的值是 stale/old,则显示的值应变成破折号 -
或 N/A
.
我查看了 timeout
运算符,但它会在发生超时时终止 Observable
,这是不希望的。
使用 RxJava。
您可以通过稍微复杂的 publish-amb-timer 设置来实现:
PublishSubject<Integer> ps = PublishSubject.create();
TestScheduler s = Schedulers.test();
TestSubscriber<Integer> ts = new TestSubscriber<>();
ps.publish(o ->
o.take(1).ambWith(Observable.timer(10, TimeUnit.SECONDS, s).map(v -> (Integer)null))
.repeat().takeUntil(o.ignoreElements())
).subscribe(ts);
ps.onNext(1);
ps.onNext(2);
ps.onNext(3);
s.advanceTimeBy(15, TimeUnit.SECONDS);
ps.onNext(4);
ps.onNext(5);
ps.onNext(6);
ps.onCompleted();
ts.assertValues(1, 2, 3, null, 4, 5, 6);
发生的事情是源已发布,因此您可以从中一个接一个地获取项目或计时器事件,确保最快的获胜并使用下一个值重复它,所有这些都无需重新订阅原始源时间.
编辑 修复了上游完成 repeat() 进入无限循环的情况。
基于
单个标记值(根据 OP)
如果您正在寻找单个值来指示两次发射之间的时间间隔:
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final long duration = 100;
final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
.concatWith(Observable.never())
.takeUntil(subject)
.repeat();
subject.mergeWith(timeout).subscribe(subscriber);
subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));
连续标记值
如果您希望在源可观察对象在一段时间内未发出后连续接收值:
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final long duration = 100;
final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
.map(x -> -1)
.takeUntil(subject)
.repeat();
subject.mergeWith(timeout).subscribe(subscriber);
subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));
区别在于 timeout
可观察到的以及它是否重复出现。
您可以根据需要将 -1
替换为 null
。
以上所有内容均使用 Java 1.8.0_72
.