如何在 RxJs 中构建从多个来源接收数据的流?

How to build a stream in RxJs which receives data from multiple sources?

我正在尝试构建一个包含多个组件的应用程序,这些组件使用来自服务器的事件。该应用程序内置于 Angular 并使用 rxjs-ng。

我没有找到符合我的用例的示例。

例如,以货币更新为例,您在 UI 层中有多个订阅者,并且数据访问和业务逻辑有多个提供者(全部在客户端)。

我已经实现了一个客户端服务,该服务获取货币和 returns 可观察值:

update: function (from, to) {
    var urlForCurrencies = createConverterUrl(from, to);
    var observable = Rx.Observable.fromPromise(
        $http({method: 'JSONP', url: urlForCurrencies})
    );

    return observable;
}

并且在 UI 组件中:

var that = this;
DataService.update(currencyFrom,currencyTo).subscribe(
    function (data) {
        that.currency = data.data.CHF_EUR.val;
    },
    function (err) {
        that.error = err.message;
    }    
);

这应该只在 UI 层请求时工作一次。

如何发送另一个货币数据更新或从不同的视图触发更新并仍然使用相同的 stream/observable?

查看有关 using subjects 的文档。

The Subject class inherits both Observable and Observer , in the sense that it is both an observer and an observable. You can use a subject to subscribe all the observers, and then subscribe the subject to a backend data source.

您可以与 angular 服务共享主题。这样其他组件就可以使用 subject.subscribe(onNext, onError, onCompleted) 对事件做出反应。可以使用 subject.onNext(...) 通过主题发送更新。

// Creating a subject
var subject = new Rx.Subject(); 

...

// Handing updates over to the subject
getCurrencyUpdate().then(function(data) {
  subject.onNext(data);
});

...

// Subscribe to the subject to react on updates
subscription = subject.subscribe(
  function (x) { console.log('onNext: ' + x); },
  function (e) { console.log('onError: ' + e.message); },
  function () { console.log('onCompleted'); });
)

这样您就可以使用主题向您的订阅者广播数据。