RxJS:如何让一个 Observer 处理多个 Observables?
RxJS: How to have one Observer process multiple Observables?
我正在使用调用我实现的函数的框架。我希望将此函数的参数转换为 Observable,并通过一系列 Observers 发送。我以为我可以为此使用 Subject,但它的行为并不像我预期的那样。
为了澄清,我有类似下面的代码。我认为下面的 Option 1
会起作用,但到目前为止,我正在接受 Option 2
,这看起来一点也不惯用。
var eventSubject = new Rx.Subject();
var resultSource = eventSubject.map(processEvent);
var subscription = resultSource.subscribe(
function(event) {
console.log("got event", event);
},
function(e) {
log.error(e);
},
function() {
console.log('eventSubject onCompleted');
}
);
// The framework calls this method
function onEvent(eventArray) {
var eventSource = Rx.Observable.from(eventArray);
// Option 1: I thought this would work, but it doesn't
// eventSource.subscribe(eventSubject);
// Option 2: This does work, but its obviously clunky
eventSource.subscribe(
function(event) {
log.debug("sending to subject");
eventSubject.onNext(event);
},
function(e) {
log.error(e);
},
function() {
console.log('eventSource onCompleted');
}
);
}
只是当你将整个 Subject
订阅到一个 observable 时,你最终会将该 observable 的 onComplete
事件订阅到 subject。这意味着当可观察对象完成时,它将完成您的主题。因此,当您获得下一组事件时,它们什么都不做,因为主题已经完成。
一个解决方案正是您所做的。只需订阅主题的 onNext
事件。
第二种解决方案,可以说更多 "rx like" 是将您的传入数据视为可观察的可观察数据,并使用 mergeAll:
展平此可观察数据流
var eventSubject = new Rx.Subject(); // observes observable sequences
var resultSource = eventSubject
.mergeAll() // subscribe to each inner observable as it arrives
.map(processEvent);
var subscription = resultSource.subscribe(
function(event) {
console.log("got event", event);
},
function(e) {
log.error(e);
},
function() {
console.log('eventSubject onCompleted');
}
);
// The framework calls this method
function onEvent(eventArray) {
var eventSource = Rx.Observable.from(eventArray);
// send a new inner observable sequence
// into the subject
eventSubject.onNext(eventSource);
}
正如 Brandon 已经解释过的,将 eventSubject 订阅到另一个可观察对象意味着将 eventSubjects onNext、onError 和 onComplete 订阅到那个可观察对象 onNext、onError 和 onComplete。从您的示例来看,您似乎只想订阅 onNext。
你的主题 completes/errros 一旦第一个 eventSource completes/errors - 你的 eventSubject 正确地忽略了后续 eventSoures 对其触发的任何进一步 onNext/onError。
有多种方法可以只订阅任何事件源的onNext:
仅手动订阅 onNext。
resultSource = eventSubject
.map(processEvent);
eventSource.subscribe(
function(event) {
eventSubject.onNext(event);
},
function(error) {
// don't subscribe to onError
},
function() {
// don't subscribe to onComplete
}
);
使用只为您处理订阅 eventSources onNext/onError 的运算符。这是布兰登的建议。 请记住,这也会订阅 eventSources onError,在您的示例中您似乎不需要它。
resultSource = eventSubject
.mergeAll()
.map(processEvent);
eventSubject.onNext(eventSource);
使用不为 eventSources onError/onComplete 调用 eventSubjects onError/onComplete 的观察者。您可以简单地将 eventSubjects onComplete 覆盖为肮脏的 hack,但创建一个新的观察者可能更好。
resultSource = eventSubject
.map(processEvent);
var eventObserver = Rx.Observer.create(
function (event) {
eventSubject.onNext(event);
}
);
eventSubject.subscribe(eventObserver);
我正在使用调用我实现的函数的框架。我希望将此函数的参数转换为 Observable,并通过一系列 Observers 发送。我以为我可以为此使用 Subject,但它的行为并不像我预期的那样。
为了澄清,我有类似下面的代码。我认为下面的 Option 1
会起作用,但到目前为止,我正在接受 Option 2
,这看起来一点也不惯用。
var eventSubject = new Rx.Subject();
var resultSource = eventSubject.map(processEvent);
var subscription = resultSource.subscribe(
function(event) {
console.log("got event", event);
},
function(e) {
log.error(e);
},
function() {
console.log('eventSubject onCompleted');
}
);
// The framework calls this method
function onEvent(eventArray) {
var eventSource = Rx.Observable.from(eventArray);
// Option 1: I thought this would work, but it doesn't
// eventSource.subscribe(eventSubject);
// Option 2: This does work, but its obviously clunky
eventSource.subscribe(
function(event) {
log.debug("sending to subject");
eventSubject.onNext(event);
},
function(e) {
log.error(e);
},
function() {
console.log('eventSource onCompleted');
}
);
}
只是当你将整个 Subject
订阅到一个 observable 时,你最终会将该 observable 的 onComplete
事件订阅到 subject。这意味着当可观察对象完成时,它将完成您的主题。因此,当您获得下一组事件时,它们什么都不做,因为主题已经完成。
一个解决方案正是您所做的。只需订阅主题的 onNext
事件。
第二种解决方案,可以说更多 "rx like" 是将您的传入数据视为可观察的可观察数据,并使用 mergeAll:
展平此可观察数据流var eventSubject = new Rx.Subject(); // observes observable sequences
var resultSource = eventSubject
.mergeAll() // subscribe to each inner observable as it arrives
.map(processEvent);
var subscription = resultSource.subscribe(
function(event) {
console.log("got event", event);
},
function(e) {
log.error(e);
},
function() {
console.log('eventSubject onCompleted');
}
);
// The framework calls this method
function onEvent(eventArray) {
var eventSource = Rx.Observable.from(eventArray);
// send a new inner observable sequence
// into the subject
eventSubject.onNext(eventSource);
}
正如 Brandon 已经解释过的,将 eventSubject 订阅到另一个可观察对象意味着将 eventSubjects onNext、onError 和 onComplete 订阅到那个可观察对象 onNext、onError 和 onComplete。从您的示例来看,您似乎只想订阅 onNext。
你的主题 completes/errros 一旦第一个 eventSource completes/errors - 你的 eventSubject 正确地忽略了后续 eventSoures 对其触发的任何进一步 onNext/onError。
有多种方法可以只订阅任何事件源的onNext:
仅手动订阅 onNext。
resultSource = eventSubject .map(processEvent); eventSource.subscribe( function(event) { eventSubject.onNext(event); }, function(error) { // don't subscribe to onError }, function() { // don't subscribe to onComplete } );
使用只为您处理订阅 eventSources onNext/onError 的运算符。这是布兰登的建议。 请记住,这也会订阅 eventSources onError,在您的示例中您似乎不需要它。
resultSource = eventSubject .mergeAll() .map(processEvent); eventSubject.onNext(eventSource);
使用不为 eventSources onError/onComplete 调用 eventSubjects onError/onComplete 的观察者。您可以简单地将 eventSubjects onComplete 覆盖为肮脏的 hack,但创建一个新的观察者可能更好。
resultSource = eventSubject .map(processEvent); var eventObserver = Rx.Observer.create( function (event) { eventSubject.onNext(event); } ); eventSubject.subscribe(eventObserver);