RxJS:如何让一个 Observer 处理多个 Observables?

RxJS: How to have one Observer process multiple Observables?

我正在使用调用我实现的函数的框架。我希望将此函数的参数转换为 Observable,并通过一系列 Observers 发送。我以为我可以为此使用 Subject,但它的行为并不像我预期的那样。

为了澄清,我有类似下面的代码。我认为下面的 Option 1 会起作用,但到目前为止,我正在接受 Option 2,这看起来一点也不惯用。

var eventSubject = new Rx.Subject();
var resultSource = eventSubject.map(processEvent);
var subscription = resultSource.subscribe(
    function(event) {
        console.log("got event", event);
    },
    function(e) {
        log.error(e);
    },
    function() {
        console.log('eventSubject onCompleted');
    }
);

// The framework calls this method
function onEvent(eventArray) {

   var eventSource = Rx.Observable.from(eventArray);

   // Option 1: I thought this would work, but it doesn't
   // eventSource.subscribe(eventSubject);

   // Option 2: This does work, but its obviously clunky
  eventSource.subscribe(
      function(event) {
          log.debug("sending to subject");
          eventSubject.onNext(event);
      },
      function(e) {
          log.error(e);
      },
      function() {
          console.log('eventSource onCompleted');
      }
  );
}

只是当你将整个 Subject 订阅到一个 observable 时,你最终会将该 observable 的 onComplete 事件订阅到 subject。这意味着当可观察对象完成时,它将完成您的主题。因此,当您获得下一组事件时,它们什么都不做,因为主题已经完成。

一个解决方案正是您所做的。只需订阅主题的 onNext 事件。

第二种解决方案,可以说更多 "rx like" 是将您的传入数据视为可观察的可观察数据,并使用 mergeAll:

展平此可观察数据流
var eventSubject = new Rx.Subject(); // observes observable sequences
var resultSource = eventSubject
  .mergeAll() // subscribe to each inner observable as it arrives
  .map(processEvent);
var subscription = resultSource.subscribe(
    function(event) {
        console.log("got event", event);
    },
    function(e) {
        log.error(e);
    },
    function() {
        console.log('eventSubject onCompleted');
    }
);

// The framework calls this method
function onEvent(eventArray) {

   var eventSource = Rx.Observable.from(eventArray);
   // send a new inner observable sequence
   // into the subject
   eventSubject.onNext(eventSource);
}

更新:Here is a running example

正如 Brandon 已经解释过的,将 eventSubject 订阅到另一个可观察对象意味着将 eventSubjects onNext、onError 和 onComplete 订阅到那个可观察对象 onNext、onError 和 onComplete。从您的示例来看,您似乎只想订阅 onNext。

你的主题 completes/errros 一旦第一个 eventSource completes/errors - 你的 eventSubject 正确地忽略了后续 eventSoures 对其触发的任何进一步 onNext/onError。

有多种方法可以只订阅任何事件源的onNext:

  1. 仅手动订阅 onNext。

    resultSource = eventSubject
      .map(processEvent);
    
    eventSource.subscribe(
        function(event) {
            eventSubject.onNext(event);
        },
        function(error) {
            // don't subscribe to onError
        },
        function() {
            // don't subscribe to onComplete
        }
    );
    
  2. 使用只为您处理订阅 eventSources onNext/onError 的运算符。这是布兰登的建议。 请记住,这也会订阅 eventSources onError,在您的示例中您似乎不需要它。

    resultSource = eventSubject
      .mergeAll()
      .map(processEvent);
    
    eventSubject.onNext(eventSource);
    
  3. 使用不为 eventSources onError/onComplete 调用 eventSubjects onError/onComplete 的观察者。您可以简单地将 eventSubjects onComplete 覆盖为肮脏的 hack,但创建一个新的观察者可能更好。

    resultSource = eventSubject
      .map(processEvent);
    
    var eventObserver = Rx.Observer.create(
      function (event) {
        eventSubject.onNext(event);
      }
    );
    
    eventSubject.subscribe(eventObserver);