RxJS 5 任务队列,如果任务失败则继续

RxJS 5 task queue, continue if a task fails

假设我们有一个 HTML 页面触发 AJAX 请求。我们要确保 AJAX 请求按顺序执行。在前一个请求完成或出错之前,下一个 AJAX 请求不会被触发。

我尝试使用 RxJS concatMap 通过任务队列对此进行建模。每个 AJAX 请求都被建模为 Observable。如果 AJAX 请求成功完成,则一切正常,但如果出错,则不会执行队列中的下一个任务。

这是一个示例,它使用 setTimeout() 来模拟长 运行ning 异步任务:

  function identity(observable) {
    return observable;
  }

  function createTaskQueue() {
    var subject= new Rx.Subject();

    subject
    .concatMap(identity)
    .onErrorResumeNext(Rx.Observable.of('error'))
    .subscribe(function(data) {
      console.log('onNext', data);
    }, 
    function(error) {
      console.log('onError', error);
    });

    return {
      addTask: function(task) {
        subject.next(task);
      }
    }
  }

  function createTask(data, delay) {
    return Rx.Observable.create(function(obs) {
      setTimeout(function() {
        obs.next(data);
        obs.complete();
      }, delay);
    });
  }

  function createErrorTask(data, delay) {
    return Rx.Observable.create(function(obs) {
      setTimeout(function() {
        obs.error('Error: ' + data);
        obs.complete();
      }, delay);
    });
  }

  var taskQueue = createTaskQueue();

  taskQueue.addTask(createTask(11, 500))
  taskQueue.addTask(createTask(22, 200));
  taskQueue.addTask(createErrorTask(33, 1000));
  taskQueue.addTask(createTask(44, 300));
  taskQueue.addTask(createErrorTask(55, 300));
  taskQueue.addTask(createTask(66, 300));

这是一个可执行示例:https://jsfiddle.net/artur_ciocanu/s6ftxwnf/

当我 运行 这段代码时,控制台会打印以下内容: onNext 11 onNext 22 onNext error

这是预期的,但我想知道为什么其他任务如 44、55 等没有执行。

我很确定我在用 onErrorResumeNext() 做一些愚蠢的事情,或者整个方法可能是完全错误的。

非常感谢任何帮助。

observable 中错误的概念与常规函数中的相同。这意味着如果您在常规函数中抛出错误 - 函数将不会 return 任何东西。 observables 也是如此——如果 observable 发出一个错误,这意味着流已经完成并且没有更多的值出现。所以是的,这是根本错误的。

更好(正确)的方法是拥有一个响应流,其中下一个值可以是成功响应或错误响应。如果您需要将它们分开,您可以稍后将响应流拆分为两个 successful/error 响应。

希望对您有所帮助。

如果您阅读 onErrorResumeNext

的文档

Continues an observable sequence that is terminated normally or by an exception with the next observable sequence or Promise.

这意味着当您的源可观察对象遇到错误时,它将切换到您传递给 onErrorResumeNext 的任何内容。这里发生的是 Rx.of(...) 在发出它的值后立即终止。因此,您观察到的行为。

所以简而言之,你不想onErrorResumeNext在这里。

您可以改为 .catch(...) 可能会发出错误的流。所以,像 :

subject
    .concatMap(obs => obs.catch(Rx.Observable.of('error')))
    .subscribe(...)