ReactiveX 在超时后发出空值或标记值

ReactiveX emit null or sentinel value after timeout

寻找一种干净的方法来将源 Observable 转换为在一段时间内未发出项目后发出单个 null(或标记值)。

例如,如果源 observable 发出 1, 2, 3 然后在发出 4, 5, 6 之前停止发出 10 秒我希望发出的项目是 1, 2, 3, null, 4, 5, 6.

该用例用于在 UI 中显示值,如果最后发出的值是 stale/old,则显示的值应变成破折号 -N/A .

我查看了 timeout 运算符,但它会在发生超时时终止 Observable,这是不希望的。

使用 RxJava。

您可以通过稍微复杂的 publish-amb-timer 设置来实现:

PublishSubject<Integer> ps = PublishSubject.create();
TestScheduler s = Schedulers.test();
TestSubscriber<Integer> ts = new TestSubscriber<>();

ps.publish(o -> 
    o.take(1).ambWith(Observable.timer(10, TimeUnit.SECONDS, s).map(v -> (Integer)null))
    .repeat().takeUntil(o.ignoreElements())
).subscribe(ts);

ps.onNext(1);
ps.onNext(2);
ps.onNext(3);

s.advanceTimeBy(15, TimeUnit.SECONDS);

ps.onNext(4);
ps.onNext(5);
ps.onNext(6);
ps.onCompleted();

ts.assertValues(1, 2, 3, null, 4, 5, 6);

发生的事情是源已发布,因此您可以从中一个接一个地获取项目或计时器事件,确保最快的获胜并使用下一个值重复它,所有这些都无需重新订阅原始源时间.

编辑 修复了上游完成 repeat() 进入无限循环的情况。

基于 and an answer in a similar question,替代实现:

单个标记值(根据 OP)

如果您正在寻找单个值来指示两次发射之间的时间间隔:

final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

final long duration = 100;
final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
    .concatWith(Observable.never())
    .takeUntil(subject)
    .repeat();

subject.mergeWith(timeout).subscribe(subscriber);

subject.onNext(1,   0);
subject.onNext(2, 100);
subject.onNext(3, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

subject.onNext(4,   0);
subject.onNext(5, 100);
subject.onNext(6, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));

连续标记值

如果您希望在源可观察对象在一段时间内未发出后连续接收值:

final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

final long duration = 100;
final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
    .map(x -> -1)
    .takeUntil(subject)
    .repeat();

subject.mergeWith(timeout).subscribe(subscriber);

subject.onNext(1,   0);
subject.onNext(2, 100);
subject.onNext(3, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

subject.onNext(4,   0);
subject.onNext(5, 100);
subject.onNext(6, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));

区别在于 timeout 可观察到的以及它是否重复出现。

您可以根据需要将 -1 替换为 null

以上所有内容均使用 Java 1.8.0_72.

使用 RxJava 1.0.17 进行测试