如何暂停 Observable 而不会丢失发射的项目?

How can an Observable be paused without losing the items emitted?

我有一个每秒发出滴答声的Observable

Observable.interval(0, 1, TimeUnit.SECONDS)
    .take(durationInSeconds + 1));

我想暂停这个 Observable,让它停止发出数字,然后按需恢复。

有一些陷阱:

Another way of handling an overproductive Observable is to block the callstack (parking the thread that governs the overproductive Observable). This has the disadvantage of going against the “reactive” and non-blocking model of Rx. However this can be a viable option if the problematic Observable is on a thread that can be blocked safely. Currently RxJava does not expose any operators to facilitate this.

有没有办法暂停 interval Observable?还是我应该实现自己的 'ticking' Observable 并提供一些背压支持?

有很多方法可以做到这一点。例如,您仍然可以使用 interval() 并维护两个附加状态:布尔标志 "paused" 和计数器。

public static final Observable<Long> pausableInterval(
  final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) {

  final AtomicLong counter = new AtomicLong();
  return Observable.interval(initial, interval, unit, scheduler)
      .filter(tick -> !paused.get())
      .map(tick -> counter.getAndIncrement()); 
}

然后你只需在某处调用 paused.set(true/false) 到 pause/resume

编辑 2016-06-04

上面的解决方案有点问题。 如果我们多次重用 observable 实例,它将从最后一次取消订阅时的值开始。例如:

Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();

虽然 list1 应该是 [0,1,2,3,4],但 list2 实际上是 [5,6,7,8,9]。 如果不需要上述行为,则必须将可观察对象设为无状态。这可以通过 scan() 运算符来实现。 修改后的版本可以是这样的:

  public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay, 
      final long period, TimeUnit unit, Scheduler scheduler) {

    return Observable.interval(initialDelay, period, unit, scheduler)
        .filter(tick->!pause.get())
        .scan((acc,tick)->acc + 1);
  }

或者,如果您不希望依赖于 Java 8 和 lambda,您可以使用 Java 6+ 兼容代码来做这样的事情:

https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361