不一致的 mergeMap 行为

Inconsitant mergeMap behaviour

我目前正在研究一种文件上传方法,该方法要求我限制通过的并发请求数。

我已经开始编写一个关于如何处理它的原型

const items = Array.from({ length: 50 }).map((_, n) => n);
from(items)
  .pipe(
    mergeMap(n => {
      return of(n).pipe(delay(2000));
    }, 5)
  )
  .subscribe(n => {
    console.log(n);
  });

它确实有效,但是当我用实际调用换掉 of 时。它只处理一个块,所以假设 20 个文件中的 5 个

from(files)
  .pipe(mergeMap(handleFile, 5))
  .subscribe(console.log);

handleFile 函数 returns 调用我的自定义 ajax 实现

import { Observable, Subscriber } from 'rxjs';
import axios from 'axios';

const { CancelToken } = axios;

class AjaxSubscriber extends Subscriber {
  constructor(destination, settings) {
    super(destination);
    this.send(settings);
  }

  send(settings) {
    const cancelToken = new CancelToken(cancel => {
      // An executor function receives a cancel function as a parameter
      this.cancel = cancel;
    });
    axios(Object.assign({ cancelToken }, settings))
      .then(resp => this.next([null, resp.data]))
      .catch(e => this.next([e, null]));
  }

  next(config) {
    this.done = true;
    const { destination } = this;
    destination.next(config);
  }

  unsubscribe() {
    if (this.cancel) {
      this.cancel();
    }
    super.unsubscribe();
  }
}

export class AjaxObservable extends Observable {
  static create(settings) {
    return new AjaxObservable(settings);
  }

  constructor(settings) {
    super();
    this.settings = settings;
  }

  _subscribe(subscriber) {
    return new AjaxSubscriber(subscriber, this.settings);
  }
}

所以看起来像这样

function handleFile() {
  return AjaxObservable.create({
    url: "https://jsonplaceholder.typicode.com/todos/1"
  });
}

CodeSandbox

如果我从合并映射函数中删除并发参数,一切正常,但它会一次性上传所有文件。有什么办法可以解决这个问题吗?

原来问题是我没有在AjaxSubscriber中调用complete()方法,所以我将代码修改为:

pass(response) {
  this.next(response);
  this.complete();
}

从 axios 调用:

axios(Object.assign({ cancelToken }, settings))
  .then(resp => this.pass([null, resp.data]))
  .catch(e => this.pass([e, null]));