连锁可观察排放?

Chain Observable emissions?

我有两个独立的 Observable,我想以这样的方式耦合,即每次第一个发出一个值时,它都会导致第二个发出一个值。

在这种情况下,第一个 Observable 是一个无限序列,基于异步 Node.js 风格的方法,回调时间可变(不允许使用 delaytimer)。第二个 Observable 是独立值的有限序列。

也许一张图表可以表达我要找的东西:

I: ---A--D-----Q-G---B-> (I is infinite)
F: -3512-|->             (F is finite)
O: ---3--5-----1-2-|->   (O is finite)

输出序列 (O) 的值是有限的,基于 (I) 的时间。

这是 (I):

// I'm stuck with this, it cannot change...
function asyncOperation(callback) {
  const delayMsec = Math.floor(Math.random() * 1000);
  timers.setTimeout(callback, delayMsec, null, delayMsec);
}

const callbackObservable = Rx.Observable.bindNodeCallback(asyncOperation);

const infiniteCallbackSequence = Rx.Observable
  .defer(callbackObservable)
  .expand(x => callbackObservable());

为了简单起见,这里是 (F):

const finiteSequence = Rx.Observable.from([3, 5, 1, 2]);

(这建立在我之前的一个问题 的基础上)

我对事情的理解还不够深入,无法让 combine*merge**map 工作来生成我想要的东西——假设它们可以。好像每次 (O) 发出时我都希望 take(1) on (I)。

如何获得所描述的行为?

Observable.zip 是否适合您的用例?

const Rx = require('rxjs');
const timers = require('timers');

const finite = Rx.Observable.from([3, 5, 1, 2]);

// I'm stuck with this, it cannot change...
function asyncOperation(callback) {
  const delayMsec = Math.floor(Math.random() * 1000);
  timers.setTimeout(callback, delayMsec, null, delayMsec);
}

const callbackObservable = Rx.Observable.bindNodeCallback(asyncOperation);

const infiniteCallbackSequence = Rx
  .Observable
  .defer(callbackObservable)
  .expand(x => callbackObservable());

const combined = Rx.Observable.zip(
  finite,
  infiniteCallbackSequence
).forEach(v => console.log(v));

运行 在节点 6 上:

➜ $ node test.js
[ 3, 816 ]
[ 5, 297 ]
[ 1, 95 ]
[ 2, 677 ]
➜ $