如何使用 rx.js 发送多个 http 请求?

How do I send multiple http requests with rx.js?

我正在尝试 rx.js 并且我正在尝试发出多个 http 请求。这是我为发出 http 请求而设置的 observable

function httpGet(url) {
    return Observable.create(function forEach(observer) {
        var cancelled = false;
        axios.get(url).then(function(res) {
            if(!cancelled) {
                observer.onNext(res);
                observer.onCompleted();             
            }
        });

        return function dispose() {
            cancelled = true;
        }
    })
}

我正在尝试发出多个 http 请求,但我的结果更多 Observable。这是 subscribe:

var array = ['http://localhost:4444/text/88', 'http://localhost:4444/other/77'];

var source = Rx.Observable.fromArray(array).map(httpGet);

var subscription = source.subscribe(
    function (x) {
        console.log(x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
});

这是我的结果。

AnonymousObservable { source: undefined, __subscribe: [Function: forEach] }
AnonymousObservable { source: undefined, __subscribe: [Function: forEach] }
Completed

我知道我正在获取 Observable,如果我 forEach 编辑它们,那么我会得到结果,但我缺少将其转换为 [=] 的正确方法23=] 而不是 Observable。我做错了什么?

这应该可以解决问题:

var array = ['http://localhost:4444/text/88', 'http://localhost:4444/other/77'];

var source = Rx.Observable.fromArray(array).concatMap(httpGet);

function httpGet(url) {
  return axios.get(url);
  }
var subscription = source.subscribe(
    function (x) {
        console.log(x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
});

一些解释:

  • 可观察值是一系列值。观察者是处理这些值的三管齐下的对象。
  • 你想要的是将数组(源)转换为 html 的序列(通过获取 url 获得)。您可以使用的是:
  • 然后您想要订阅生成的序列并对它们做一些事情。那就是定义观察者的 subscribe 函数。
  • 所以在这里你获取你的数组值序列,使用 map 运算符生成一系列承诺,但实际上你将使用 concatMap 运算符以相同的方式输出承诺解析值比数组值的顺序。 concatMap 可以将一个 observable、一个 array/iterable 或一个 promise 作为其参数,并将输出包装到 observable it returns 中的那些对象中的值序列。因此,您没有一系列承诺,而是有一系列由承诺解决的值,即您获取的 html 内容。如果您对保持源的初始顺序不感兴趣,您也可以使用 flatMap 运算符。比照。 https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/concatmap.md and https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/selectmany.md
  • 然后您订阅它,并处理这些值

所以这基本上是一般过程,您获取一个源序列,通过明智地选择运算符将其转换为您选择的序列,然后用您的观察者函数一个一个地处理这些值。 另外,请注意,promise 类似于 observable,一些(大多数?)Rxjs 运算符会这样对待它们,因此您通常不必使用 then 来获取已解析的值。

最后一件事,Rx.Observable.fromArray 似乎已被弃用,取而代之的是 Rx.Observable.from:cf. https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/fromarray.md