连锁可观察排放?
Chain Observable emissions?
我有两个独立的 Observable,我想以这样的方式耦合,即每次第一个发出一个值时,它都会导致第二个发出一个值。
在这种情况下,第一个 Observable 是一个无限序列,基于异步 Node.js 风格的方法,回调时间可变(不允许使用 delay
或 timer
)。第二个 Observable 是独立值的有限序列。
也许一张图表可以表达我要找的东西:
I: ---A--D-----Q-G---B-> (I is infinite)
F: -3512-|-> (F is finite)
O: ---3--5-----1-2-|-> (O is finite)
输出序列 (O) 的值是有限的,基于 (I) 的时间。
这是 (I):
// I'm stuck with this, it cannot change...
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 1000);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
const callbackObservable = Rx.Observable.bindNodeCallback(asyncOperation);
const infiniteCallbackSequence = Rx.Observable
.defer(callbackObservable)
.expand(x => callbackObservable());
为了简单起见,这里是 (F):
const finiteSequence = Rx.Observable.from([3, 5, 1, 2]);
(这建立在我之前的一个问题 的基础上)
我对事情的理解还不够深入,无法让 combine*
、merge*
或 *map
工作来生成我想要的东西——假设它们可以。好像每次 (O) 发出时我都希望 take(1)
on (I)。
如何获得所描述的行为?
Observable.zip
是否适合您的用例?
const Rx = require('rxjs');
const timers = require('timers');
const finite = Rx.Observable.from([3, 5, 1, 2]);
// I'm stuck with this, it cannot change...
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 1000);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
const callbackObservable = Rx.Observable.bindNodeCallback(asyncOperation);
const infiniteCallbackSequence = Rx
.Observable
.defer(callbackObservable)
.expand(x => callbackObservable());
const combined = Rx.Observable.zip(
finite,
infiniteCallbackSequence
).forEach(v => console.log(v));
运行 在节点 6 上:
➜ $ node test.js
[ 3, 816 ]
[ 5, 297 ]
[ 1, 95 ]
[ 2, 677 ]
➜ $
我有两个独立的 Observable,我想以这样的方式耦合,即每次第一个发出一个值时,它都会导致第二个发出一个值。
在这种情况下,第一个 Observable 是一个无限序列,基于异步 Node.js 风格的方法,回调时间可变(不允许使用 delay
或 timer
)。第二个 Observable 是独立值的有限序列。
也许一张图表可以表达我要找的东西:
I: ---A--D-----Q-G---B-> (I is infinite)
F: -3512-|-> (F is finite)
O: ---3--5-----1-2-|-> (O is finite)
输出序列 (O) 的值是有限的,基于 (I) 的时间。
这是 (I):
// I'm stuck with this, it cannot change...
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 1000);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
const callbackObservable = Rx.Observable.bindNodeCallback(asyncOperation);
const infiniteCallbackSequence = Rx.Observable
.defer(callbackObservable)
.expand(x => callbackObservable());
为了简单起见,这里是 (F):
const finiteSequence = Rx.Observable.from([3, 5, 1, 2]);
(这建立在我之前的一个问题
我对事情的理解还不够深入,无法让 combine*
、merge*
或 *map
工作来生成我想要的东西——假设它们可以。好像每次 (O) 发出时我都希望 take(1)
on (I)。
如何获得所描述的行为?
Observable.zip
是否适合您的用例?
const Rx = require('rxjs');
const timers = require('timers');
const finite = Rx.Observable.from([3, 5, 1, 2]);
// I'm stuck with this, it cannot change...
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 1000);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
const callbackObservable = Rx.Observable.bindNodeCallback(asyncOperation);
const infiniteCallbackSequence = Rx
.Observable
.defer(callbackObservable)
.expand(x => callbackObservable());
const combined = Rx.Observable.zip(
finite,
infiniteCallbackSequence
).forEach(v => console.log(v));
运行 在节点 6 上:
➜ $ node test.js
[ 3, 816 ]
[ 5, 297 ]
[ 1, 95 ]
[ 2, 677 ]
➜ $