RxJS 5 任务队列,如果任务失败则继续
RxJS 5 task queue, continue if a task fails
假设我们有一个 HTML 页面触发 AJAX 请求。我们要确保 AJAX 请求按顺序执行。在前一个请求完成或出错之前,下一个 AJAX 请求不会被触发。
我尝试使用 RxJS concatMap
通过任务队列对此进行建模。每个 AJAX 请求都被建模为 Observable
。如果 AJAX 请求成功完成,则一切正常,但如果出错,则不会执行队列中的下一个任务。
这是一个示例,它使用 setTimeout()
来模拟长 运行ning 异步任务:
function identity(observable) {
return observable;
}
function createTaskQueue() {
var subject= new Rx.Subject();
subject
.concatMap(identity)
.onErrorResumeNext(Rx.Observable.of('error'))
.subscribe(function(data) {
console.log('onNext', data);
},
function(error) {
console.log('onError', error);
});
return {
addTask: function(task) {
subject.next(task);
}
}
}
function createTask(data, delay) {
return Rx.Observable.create(function(obs) {
setTimeout(function() {
obs.next(data);
obs.complete();
}, delay);
});
}
function createErrorTask(data, delay) {
return Rx.Observable.create(function(obs) {
setTimeout(function() {
obs.error('Error: ' + data);
obs.complete();
}, delay);
});
}
var taskQueue = createTaskQueue();
taskQueue.addTask(createTask(11, 500))
taskQueue.addTask(createTask(22, 200));
taskQueue.addTask(createErrorTask(33, 1000));
taskQueue.addTask(createTask(44, 300));
taskQueue.addTask(createErrorTask(55, 300));
taskQueue.addTask(createTask(66, 300));
这是一个可执行示例:https://jsfiddle.net/artur_ciocanu/s6ftxwnf/。
当我 运行 这段代码时,控制台会打印以下内容:
onNext 11
onNext 22
onNext error
这是预期的,但我想知道为什么其他任务如 44、55 等没有执行。
我很确定我在用 onErrorResumeNext()
做一些愚蠢的事情,或者整个方法可能是完全错误的。
非常感谢任何帮助。
observable 中错误的概念与常规函数中的相同。这意味着如果您在常规函数中抛出错误 - 函数将不会 return 任何东西。 observables 也是如此——如果 observable 发出一个错误,这意味着流已经完成并且没有更多的值出现。所以是的,这是根本错误的。
更好(正确)的方法是拥有一个响应流,其中下一个值可以是成功响应或错误响应。如果您需要将它们分开,您可以稍后将响应流拆分为两个 successful/error 响应。
希望对您有所帮助。
如果您阅读 onErrorResumeNext
、
的文档
Continues an observable sequence that is terminated normally or by an
exception with the next observable sequence or Promise.
这意味着当您的源可观察对象遇到错误时,它将切换到您传递给 onErrorResumeNext
的任何内容。这里发生的是 Rx.of(...)
在发出它的值后立即终止。因此,您观察到的行为。
所以简而言之,你不想onErrorResumeNext
在这里。
您可以改为 .catch(...)
可能会发出错误的流。所以,像 :
subject
.concatMap(obs => obs.catch(Rx.Observable.of('error')))
.subscribe(...)
假设我们有一个 HTML 页面触发 AJAX 请求。我们要确保 AJAX 请求按顺序执行。在前一个请求完成或出错之前,下一个 AJAX 请求不会被触发。
我尝试使用 RxJS concatMap
通过任务队列对此进行建模。每个 AJAX 请求都被建模为 Observable
。如果 AJAX 请求成功完成,则一切正常,但如果出错,则不会执行队列中的下一个任务。
这是一个示例,它使用 setTimeout()
来模拟长 运行ning 异步任务:
function identity(observable) {
return observable;
}
function createTaskQueue() {
var subject= new Rx.Subject();
subject
.concatMap(identity)
.onErrorResumeNext(Rx.Observable.of('error'))
.subscribe(function(data) {
console.log('onNext', data);
},
function(error) {
console.log('onError', error);
});
return {
addTask: function(task) {
subject.next(task);
}
}
}
function createTask(data, delay) {
return Rx.Observable.create(function(obs) {
setTimeout(function() {
obs.next(data);
obs.complete();
}, delay);
});
}
function createErrorTask(data, delay) {
return Rx.Observable.create(function(obs) {
setTimeout(function() {
obs.error('Error: ' + data);
obs.complete();
}, delay);
});
}
var taskQueue = createTaskQueue();
taskQueue.addTask(createTask(11, 500))
taskQueue.addTask(createTask(22, 200));
taskQueue.addTask(createErrorTask(33, 1000));
taskQueue.addTask(createTask(44, 300));
taskQueue.addTask(createErrorTask(55, 300));
taskQueue.addTask(createTask(66, 300));
这是一个可执行示例:https://jsfiddle.net/artur_ciocanu/s6ftxwnf/。
当我 运行 这段代码时,控制台会打印以下内容:
onNext 11
onNext 22
onNext error
这是预期的,但我想知道为什么其他任务如 44、55 等没有执行。
我很确定我在用 onErrorResumeNext()
做一些愚蠢的事情,或者整个方法可能是完全错误的。
非常感谢任何帮助。
observable 中错误的概念与常规函数中的相同。这意味着如果您在常规函数中抛出错误 - 函数将不会 return 任何东西。 observables 也是如此——如果 observable 发出一个错误,这意味着流已经完成并且没有更多的值出现。所以是的,这是根本错误的。
更好(正确)的方法是拥有一个响应流,其中下一个值可以是成功响应或错误响应。如果您需要将它们分开,您可以稍后将响应流拆分为两个 successful/error 响应。
希望对您有所帮助。
如果您阅读 onErrorResumeNext
、
Continues an observable sequence that is terminated normally or by an exception with the next observable sequence or Promise.
这意味着当您的源可观察对象遇到错误时,它将切换到您传递给 onErrorResumeNext
的任何内容。这里发生的是 Rx.of(...)
在发出它的值后立即终止。因此,您观察到的行为。
所以简而言之,你不想onErrorResumeNext
在这里。
您可以改为 .catch(...)
可能会发出错误的流。所以,像 :
subject
.concatMap(obs => obs.catch(Rx.Observable.of('error')))
.subscribe(...)