如何使用 Rxjs 顺序解析 http 请求

How to resolve http requests in sequence using Rxjs

我有一个这样的js对象

var routes = [
    {lat: 12.44, lng: 74.50},
    {lat: 12.54, lng: 74.60},
    {lat: 12.64, lng: 74.70},
    ...
];

我想计算路线数组中两点之间的距离,以创建一个包含纬度、经度和与前一点的距离的新数组。使用递归函数我可以按顺序执行此操作,因此在未解决 http 请求之前我不会调用下一个点。这是递归代码

var finalData = [];
function getData(points, i) {
    if(points.length >= i+1) {
        var p1 = points[i];
        var p2 = points[i+1];
        var url = 'http://to/web/app/origin=' + p1.lat + ',' + p1.lng + '&destination=' + p2.lat + ',' + p2.lng;
        http.get(url, function(res) {
            var data = '';
            res.on('data', function(chunk) {
                data += chunk;
            });
            res.on('end', function() {
                var d = JSON.parse(data);
                finalData.push({
                    lat: p2.lat,
                    lng: p2.lng,
                    distance: d.dis
                });
                getData(points, i+1);
            });
        });
    }
}
getData(routes, 0);

作为响应式编程的新手,无法思考并找出如何使用 Rxjs 库实现相同的功能?我的意思是一些小的声明性代码。

您似乎不需要递归地执行此操作,因为您有一个将要使用的项目列表:

//Make your callback function `Observable`
const observableHttp = Rx.Observable.bindCallback(http.get);
//Optionally if it is a node-style callback
//const observableHttp = Rx.Observable.bindNodeCallback(http.get);

Rx.Observable.from(routes)
  //Combine each of the elements into pairwise arrays
  .pairwise()
  //Maintain the order of the responses
  .concatMap(points => {
    const p1 = points[0];
    const p2 = points[1];
    return observableHttp(`http://to/web/app/origin=${p1.lat},${p1.lng}` + 
                          `&destination=${p2.lat},${p2.lng}`);
  },
  //Process the response along with the original array of responses 
  (points, res) => ({lat: points[1].lat, lng: points[1].lng, distance: res.dis}))
  //Gather all the values into a single array
  .toArray()
  .subscribe(x => /*Do something with your array of values*/

扩展@paulpdaniels 的答案我能够修复我的代码。如果有人感兴趣就把它放在这里。

Rx.Observable.from(data)
    .pairwise()
    .concatMap(points => {
        const p1 = points[0];
        const p2 = points[1];
        var url = `http://to/web/app?origins=${p1.lat},${p1.lng}` + 
                         `&destinations=${p2.lat},${p2.lng}`;
        return GetDataStream(url);
    },
    //Process the response along with the original array of responses 
    (points, dis) => {          
        return {lat: points[1].lat, lng: points[1].lng, distance: dis};
    })
    .toArray()
    .subscribe(x => console.log(x));

而她是自定义可观察对象

const GetDataStream = function(url) {
    return Rx.Observable.create(observer => {
        http.get(url, res => {
            var _data = ''
            res.on('data', chunk => {
                _data += chunk;
            });
            res.on('end', () => {
                var d = JSON.parse(_data);
                var distance = d.rows[0].elements[0].distance.value;
                observer.next(distance);
                observer.complete();
            });
        });
    });
}