并行触发异步请求,但使用 rxjs 按顺序获取结果
Fire async request in parallel but get result in order using rxjs
例如:
使用 jquery ajax 并行获取 5 页。当第 2 页 return 时,什么都不做。当 page1 return 时,对 page1 和 page2 做一些事情。
// assume there is some operator that can do this,
// then it might look like this?
Rx.Observable.range(1, 5).
someOperator(function(page) {
return Rx.Observable.defer( () => $.get(page) );
}).scan(function(preVal, curItem) {
preVal.push(curItem);
return preVal;
}, []);
存在 forkJoin
运算符,它将 run all observable sequences in parallel and collect their last elements.
(引用自文档)。但是如果你使用那个,你将不得不等待所有 5 个承诺都解决,或者 5 个承诺中的一个出错。它与 RSVP.all
或 jQuery.when
非常接近。所以一旦你有了第二个,那将不允许你做某事。无论如何我都会提到它,以防它在其他情况下对您有用。
另一种可能性是使用 concatMap
,这样您就可以按顺序接收已解决的承诺。但是,我不清楚它们是否会并行启动,第二个 promise 应该只有在第一个 promise 解决后才开始。
我能想到的最后一个选择是使用 merge(2)
,这应该 运行 两个并行的承诺,并且在任何时候它们只会是两个承诺 'launched'。
现在,如果您不使用 defer
,而使用 concatMap
,我相信您应该启动所有 AJAX 请求,并且顺序仍然正确。所以你可以写:
.concatMap(function(page) {
return $.get(page);
})
相关文档:
concatMap 保持顺序,但按顺序处理元素。
mergeMap 不保持顺序,而是并行运行。
我在下面创建了运算符 mergeMapAsync 以并行处理元素(例如页面下载)但按顺序发出。它甚至支持节流(例如并行下载最多 6 页)。
Rx.Observable.prototype.mergeMapAsync = mergeMapAsync;
function mergeMapAsync(func, concurrent) {
return new Rx.Observable(observer => {
let outputIndex = 0;
const inputLen = this.array ? this.array.length : this.source.array.length;
const responses = new Array(inputLen);
const merged = this.map((value, index) => ({ value, index })) // Add index to input value.
.mergeMap(value => {
return Rx.Observable.fromPromise(new Promise(resolve => {
console.log(`${now()}: Call func for ${value.value}`);
// Return func retVal and index.
func(value.value).then(retVal => {
resolve({ value: retVal, index: value.index });
});
}));
}, concurrent);
const mergeObserver = {
next: (x) => {
console.log(`${now()}: Promise returned for ${x.value}`);
responses[x.index] = x.value;
// Emit in order using outputIndex.
for (let i = outputIndex, len = responses.length; i < len; i++) {
if (typeof responses[i] !== "undefined") {
observer.next(responses[i]);
outputIndex = i + 1;
} else {
break;
}
}
},
error: (err) => observer.error(err),
complete: () => observer.complete()
};
return merged.subscribe(mergeObserver);
});
};
// ----------------------------------------
const CONCURRENT = 3;
var start = Date.now();
var now = () => Date.now() - start;
const array = ["a", "b", "c", "d", "e"];
Rx.Observable.from(array)
.mergeMapAsync(value => getData(value), CONCURRENT)
.finally(() => console.log(`${now()}: End`))
.subscribe(value => {
console.log(`${now()}: ${value}`); // getData
});
function getData(input) {
const delayMin = 500; // ms
const delayMax = 2000; // ms
return new Promise(resolve => {
setTimeout(() => resolve(`${input}+`), Math.floor(Math.random() * delayMax) + delayMin);
});
}
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width">
<title>mergeMapAsync</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.8/Rx.min.js"></script>
</head>
<body>
</body>
</html>
例如:
使用 jquery ajax 并行获取 5 页。当第 2 页 return 时,什么都不做。当 page1 return 时,对 page1 和 page2 做一些事情。
// assume there is some operator that can do this,
// then it might look like this?
Rx.Observable.range(1, 5).
someOperator(function(page) {
return Rx.Observable.defer( () => $.get(page) );
}).scan(function(preVal, curItem) {
preVal.push(curItem);
return preVal;
}, []);
存在 forkJoin
运算符,它将 run all observable sequences in parallel and collect their last elements.
(引用自文档)。但是如果你使用那个,你将不得不等待所有 5 个承诺都解决,或者 5 个承诺中的一个出错。它与 RSVP.all
或 jQuery.when
非常接近。所以一旦你有了第二个,那将不允许你做某事。无论如何我都会提到它,以防它在其他情况下对您有用。
另一种可能性是使用 concatMap
,这样您就可以按顺序接收已解决的承诺。但是,我不清楚它们是否会并行启动,第二个 promise 应该只有在第一个 promise 解决后才开始。
我能想到的最后一个选择是使用 merge(2)
,这应该 运行 两个并行的承诺,并且在任何时候它们只会是两个承诺 'launched'。
现在,如果您不使用 defer
,而使用 concatMap
,我相信您应该启动所有 AJAX 请求,并且顺序仍然正确。所以你可以写:
.concatMap(function(page) {
return $.get(page);
})
相关文档:
concatMap 保持顺序,但按顺序处理元素。
mergeMap 不保持顺序,而是并行运行。
我在下面创建了运算符 mergeMapAsync 以并行处理元素(例如页面下载)但按顺序发出。它甚至支持节流(例如并行下载最多 6 页)。
Rx.Observable.prototype.mergeMapAsync = mergeMapAsync;
function mergeMapAsync(func, concurrent) {
return new Rx.Observable(observer => {
let outputIndex = 0;
const inputLen = this.array ? this.array.length : this.source.array.length;
const responses = new Array(inputLen);
const merged = this.map((value, index) => ({ value, index })) // Add index to input value.
.mergeMap(value => {
return Rx.Observable.fromPromise(new Promise(resolve => {
console.log(`${now()}: Call func for ${value.value}`);
// Return func retVal and index.
func(value.value).then(retVal => {
resolve({ value: retVal, index: value.index });
});
}));
}, concurrent);
const mergeObserver = {
next: (x) => {
console.log(`${now()}: Promise returned for ${x.value}`);
responses[x.index] = x.value;
// Emit in order using outputIndex.
for (let i = outputIndex, len = responses.length; i < len; i++) {
if (typeof responses[i] !== "undefined") {
observer.next(responses[i]);
outputIndex = i + 1;
} else {
break;
}
}
},
error: (err) => observer.error(err),
complete: () => observer.complete()
};
return merged.subscribe(mergeObserver);
});
};
// ----------------------------------------
const CONCURRENT = 3;
var start = Date.now();
var now = () => Date.now() - start;
const array = ["a", "b", "c", "d", "e"];
Rx.Observable.from(array)
.mergeMapAsync(value => getData(value), CONCURRENT)
.finally(() => console.log(`${now()}: End`))
.subscribe(value => {
console.log(`${now()}: ${value}`); // getData
});
function getData(input) {
const delayMin = 500; // ms
const delayMax = 2000; // ms
return new Promise(resolve => {
setTimeout(() => resolve(`${input}+`), Math.floor(Math.random() * delayMax) + delayMin);
});
}
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width">
<title>mergeMapAsync</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.8/Rx.min.js"></script>
</head>
<body>
</body>
</html>