在高阶 rxjs observable 中收集当前不完整的 observables

Collecting the currently incomplete observables in a higher-order rxjs observable

例如,我有一个每 5 秒发出一个 10 秒计时器的可观察对象。我可以使用 scan 创建一个 observable,该 observable 发出到目前为止发出的所有内部 observable 的数组:

tick = 5000
tock = 1000
timers = Observable.interval(tick).scan( (acc, next) => {
    let timer = Observable.interval(tock).take(10);
    return acc.concat([timer]);
}, []);

但是,如果我想发出它以发出所有 "live" 个计时器的数组(即那些尚未调用 complete 的计时器)怎么办?

这是我所想的糟糕的 ascii 大理石图:

-A--B-----
  \  \
   \  \
    \  x
     x
 |  |    |
[A][A,B] []

有没有办法使用标准运算符来做到这一点,或者这不是尝试用 rxjs 做的惯用事情?

您应该能够使用此答案中描述的 "active" 方法来实现您想要的: