并行触发异步请求,但使用 rxjs 按顺序获取结果

Fire async request in parallel but get result in order using rxjs

例如:

使用 jquery ajax 并行获取 5 页。当第 2 页 return 时,什么都不做。当 page1 return 时,对 page1 和 page2 做一些事情。

// assume there is some operator that can do this, 
// then it might look like this?
Rx.Observable.range(1, 5).
someOperator(function(page) {
  return Rx.Observable.defer( () => $.get(page) );
}).scan(function(preVal, curItem) {
  preVal.push(curItem);
  return preVal;
}, []);

存在 forkJoin 运算符,它将 run all observable sequences in parallel and collect their last elements. (引用自文档)。但是如果你使用那个,你将不得不等待所有 5 个承诺都解决,或者 5 个承诺中的一个出错。它与 RSVP.alljQuery.when 非常接近。所以一旦你有了第二个,那将不允许你做某事。无论如何我都会提到它,以防它在其他情况下对您有用。

另一种可能性是使用 concatMap,这样您就可以按顺序接收已解决的承诺。但是,我不清楚它们是否会并行启动,第二个 promise 应该只有在第一个 promise 解决后才开始。

我能想到的最后一个选择是使用 merge(2),这应该 运行 两个并行的承诺,并且在任何时候它们只会是两个承诺 'launched'。

现在,如果您不使用 defer,而使用 concatMap,我相信您应该启动所有 AJAX 请求,并且顺序仍然正确。所以你可以写:

.concatMap(function(page) {
  return $.get(page);
})

相关文档:

concatMap 保持顺序,但按顺序处理元素。

mergeMap 不保持顺序,而是并行运行。

我在下面创建了运算符 mergeMapAsync 以并行处理元素(例如页面下载)但按顺序发出。它甚至支持节流(例如并行下载最多 6 页)。

Rx.Observable.prototype.mergeMapAsync = mergeMapAsync;
function mergeMapAsync(func, concurrent) {
    return new Rx.Observable(observer => {
        let outputIndex = 0;
        const inputLen = this.array ? this.array.length : this.source.array.length;
        const responses = new Array(inputLen);

        const merged = this.map((value, index) => ({ value, index })) // Add index to input value.
            .mergeMap(value => {
                return Rx.Observable.fromPromise(new Promise(resolve => {
                    console.log(`${now()}: Call func for ${value.value}`);  
                    // Return func retVal and index.
                    func(value.value).then(retVal => {
                        resolve({ value: retVal, index: value.index });
                    });
                }));
            }, concurrent);

        const mergeObserver = {
            next: (x) => {
                console.log(`${now()}: Promise returned for ${x.value}`);
                responses[x.index] = x.value;

                // Emit in order using outputIndex.
                for (let i = outputIndex, len = responses.length; i < len; i++) {
                    if (typeof responses[i] !== "undefined") {
                        observer.next(responses[i]);
                        outputIndex = i + 1;
                    } else {
                        break;
                    }
                }
            },
            error: (err) => observer.error(err),
            complete: () => observer.complete()
        };
        return merged.subscribe(mergeObserver);
    });
};

// ----------------------------------------
const CONCURRENT = 3;
var start = Date.now();
var now = () => Date.now() - start;

const array = ["a", "b", "c", "d", "e"];
Rx.Observable.from(array)
    .mergeMapAsync(value => getData(value), CONCURRENT)
    .finally(() => console.log(`${now()}: End`))
    .subscribe(value => {
        console.log(`${now()}: ${value}`); // getData
    });

function getData(input) {
    const delayMin = 500; // ms
    const delayMax = 2000; // ms

    return new Promise(resolve => {
        setTimeout(() => resolve(`${input}+`), Math.floor(Math.random() * delayMax) + delayMin);
    });
}
<!DOCTYPE html>
<html>
<head>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width">
  <title>mergeMapAsync</title>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.8/Rx.min.js"></script>
</head>
<body>

</body>
</html>