不一致的 mergeMap 行为
Inconsitant mergeMap behaviour
我目前正在研究一种文件上传方法,该方法要求我限制通过的并发请求数。
我已经开始编写一个关于如何处理它的原型
const items = Array.from({ length: 50 }).map((_, n) => n);
from(items)
.pipe(
mergeMap(n => {
return of(n).pipe(delay(2000));
}, 5)
)
.subscribe(n => {
console.log(n);
});
它确实有效,但是当我用实际调用换掉 of
时。它只处理一个块,所以假设 20 个文件中的 5 个
from(files)
.pipe(mergeMap(handleFile, 5))
.subscribe(console.log);
handleFile
函数 returns 调用我的自定义 ajax 实现
import { Observable, Subscriber } from 'rxjs';
import axios from 'axios';
const { CancelToken } = axios;
class AjaxSubscriber extends Subscriber {
constructor(destination, settings) {
super(destination);
this.send(settings);
}
send(settings) {
const cancelToken = new CancelToken(cancel => {
// An executor function receives a cancel function as a parameter
this.cancel = cancel;
});
axios(Object.assign({ cancelToken }, settings))
.then(resp => this.next([null, resp.data]))
.catch(e => this.next([e, null]));
}
next(config) {
this.done = true;
const { destination } = this;
destination.next(config);
}
unsubscribe() {
if (this.cancel) {
this.cancel();
}
super.unsubscribe();
}
}
export class AjaxObservable extends Observable {
static create(settings) {
return new AjaxObservable(settings);
}
constructor(settings) {
super();
this.settings = settings;
}
_subscribe(subscriber) {
return new AjaxSubscriber(subscriber, this.settings);
}
}
所以看起来像这样
function handleFile() {
return AjaxObservable.create({
url: "https://jsonplaceholder.typicode.com/todos/1"
});
}
如果我从合并映射函数中删除并发参数,一切正常,但它会一次性上传所有文件。有什么办法可以解决这个问题吗?
原来问题是我没有在AjaxSubscriber
中调用complete()
方法,所以我将代码修改为:
pass(response) {
this.next(response);
this.complete();
}
从 axios 调用:
axios(Object.assign({ cancelToken }, settings))
.then(resp => this.pass([null, resp.data]))
.catch(e => this.pass([e, null]));
我目前正在研究一种文件上传方法,该方法要求我限制通过的并发请求数。
我已经开始编写一个关于如何处理它的原型
const items = Array.from({ length: 50 }).map((_, n) => n);
from(items)
.pipe(
mergeMap(n => {
return of(n).pipe(delay(2000));
}, 5)
)
.subscribe(n => {
console.log(n);
});
它确实有效,但是当我用实际调用换掉 of
时。它只处理一个块,所以假设 20 个文件中的 5 个
from(files)
.pipe(mergeMap(handleFile, 5))
.subscribe(console.log);
handleFile
函数 returns 调用我的自定义 ajax 实现
import { Observable, Subscriber } from 'rxjs';
import axios from 'axios';
const { CancelToken } = axios;
class AjaxSubscriber extends Subscriber {
constructor(destination, settings) {
super(destination);
this.send(settings);
}
send(settings) {
const cancelToken = new CancelToken(cancel => {
// An executor function receives a cancel function as a parameter
this.cancel = cancel;
});
axios(Object.assign({ cancelToken }, settings))
.then(resp => this.next([null, resp.data]))
.catch(e => this.next([e, null]));
}
next(config) {
this.done = true;
const { destination } = this;
destination.next(config);
}
unsubscribe() {
if (this.cancel) {
this.cancel();
}
super.unsubscribe();
}
}
export class AjaxObservable extends Observable {
static create(settings) {
return new AjaxObservable(settings);
}
constructor(settings) {
super();
this.settings = settings;
}
_subscribe(subscriber) {
return new AjaxSubscriber(subscriber, this.settings);
}
}
所以看起来像这样
function handleFile() {
return AjaxObservable.create({
url: "https://jsonplaceholder.typicode.com/todos/1"
});
}
如果我从合并映射函数中删除并发参数,一切正常,但它会一次性上传所有文件。有什么办法可以解决这个问题吗?
原来问题是我没有在AjaxSubscriber
中调用complete()
方法,所以我将代码修改为:
pass(response) {
this.next(response);
this.complete();
}
从 axios 调用:
axios(Object.assign({ cancelToken }, settings))
.then(resp => this.pass([null, resp.data]))
.catch(e => this.pass([e, null]));