合并一些异步的可观察序列并保留顺序
Merge observable sequences where some are async and preserve order
例如,我有两个可观察序列,"data1" 和 "data2",我希望将它们合并成一个可观察序列,同时保留初始顺序。
当两个可观察序列没有异步工作时——这里用一个小的延迟建模——这很容易用 Rx.Observable.merge
实现。但是,使用这种方法,任何异步工作都会破坏顺序。
是否可以使用内置运算符合并可观察序列,传递已知值并等待未知值?如果不是,我应该构建什么类型的运算符?
'use strict';
var Rx = require('rx');
var EventEmitter = require('events').EventEmitter;
var eventEmitter = new EventEmitter();
var end = Rx.Observable.fromEvent(eventEmitter, 'end');
var data1 = Rx.Observable.fromEvent(eventEmitter, 'data1')
.takeUntil(end);
var data2 = Rx.Observable.fromEvent(eventEmitter, 'data2')
.takeUntil(end)
.delay(1000);
Rx.Observable
.merge(data1, data2)
.reduce(function(acc, str) {
return acc + str + ';';
}, '')
.subscribe(function(data) {
console.log(data);
});
eventEmitter.emit('data1', '1');
eventEmitter.emit('data2', '2');
eventEmitter.emit('data1', '3');
eventEmitter.emit('data1', '4');
eventEmitter.emit('data2', '5');
eventEmitter.emit('end');
// expected: "1;2;3;4;5;"
// actual: "1;3;4;2;5;"
谢谢。
一个选项:在执行工作之前合并流以便捕获顺序,然后执行工作(使用concat
来维护顺序)
// synchronous work...but return it as an Observable so that
// concatMap() can use it correctly
var data1Work = function (data) { return Rx.Observable.of(data); };
// asynchronous work...returns an observable with the result
var data2Work = function (data) {
// cold observable (wont start until concatMap hits it)
var work = Rx.Observable.of(data).delay(1000);
// return work;
// return hot observable (starts immediately)
hotwork = work.replay();
hotwork.connect(); // start it now
return hotwork;
// alternatively you can start your async operation and return
// a Promise that will resolve when it is complete if that
// pattern feels better to you since Promises are always hot
// and Rx knows how to consume promises
// return someFuncThatReturnsPromise(data);
};
var data1 = Rx.Observable.fromEvent(eventEmitter, "data1")
.map(function (d) { return { type: "data1", data: d }; });
var data2 = Rx.Observable.fromEvent(eventEmitter, "data2")
.map(function (d) { return { type: "data1", data: d }; });
var results = Rx.Observable
.merge(data1, data2)
.concatMap(function (d) {
var workFunc = d.type === "data1" ? data1Work : data2Work;
return workFunc(d.data);
});
例如,我有两个可观察序列,"data1" 和 "data2",我希望将它们合并成一个可观察序列,同时保留初始顺序。
当两个可观察序列没有异步工作时——这里用一个小的延迟建模——这很容易用 Rx.Observable.merge
实现。但是,使用这种方法,任何异步工作都会破坏顺序。
是否可以使用内置运算符合并可观察序列,传递已知值并等待未知值?如果不是,我应该构建什么类型的运算符?
'use strict';
var Rx = require('rx');
var EventEmitter = require('events').EventEmitter;
var eventEmitter = new EventEmitter();
var end = Rx.Observable.fromEvent(eventEmitter, 'end');
var data1 = Rx.Observable.fromEvent(eventEmitter, 'data1')
.takeUntil(end);
var data2 = Rx.Observable.fromEvent(eventEmitter, 'data2')
.takeUntil(end)
.delay(1000);
Rx.Observable
.merge(data1, data2)
.reduce(function(acc, str) {
return acc + str + ';';
}, '')
.subscribe(function(data) {
console.log(data);
});
eventEmitter.emit('data1', '1');
eventEmitter.emit('data2', '2');
eventEmitter.emit('data1', '3');
eventEmitter.emit('data1', '4');
eventEmitter.emit('data2', '5');
eventEmitter.emit('end');
// expected: "1;2;3;4;5;"
// actual: "1;3;4;2;5;"
谢谢。
一个选项:在执行工作之前合并流以便捕获顺序,然后执行工作(使用concat
来维护顺序)
// synchronous work...but return it as an Observable so that
// concatMap() can use it correctly
var data1Work = function (data) { return Rx.Observable.of(data); };
// asynchronous work...returns an observable with the result
var data2Work = function (data) {
// cold observable (wont start until concatMap hits it)
var work = Rx.Observable.of(data).delay(1000);
// return work;
// return hot observable (starts immediately)
hotwork = work.replay();
hotwork.connect(); // start it now
return hotwork;
// alternatively you can start your async operation and return
// a Promise that will resolve when it is complete if that
// pattern feels better to you since Promises are always hot
// and Rx knows how to consume promises
// return someFuncThatReturnsPromise(data);
};
var data1 = Rx.Observable.fromEvent(eventEmitter, "data1")
.map(function (d) { return { type: "data1", data: d }; });
var data2 = Rx.Observable.fromEvent(eventEmitter, "data2")
.map(function (d) { return { type: "data1", data: d }; });
var results = Rx.Observable
.merge(data1, data2)
.concatMap(function (d) {
var workFunc = d.type === "data1" ? data1Work : data2Work;
return workFunc(d.data);
});