rxjs5 合并和错误处理
rxjs5 merge and error handling
我想 combine/merge 多个 Observables,当每个都完成时执行 finally 函数。 merge
运算符似乎并行执行每个订阅,这正是我所需要的,但如果其中任何一个抛出错误,则执行将停止。
RxJS 版本 4 有一个运算符 mergeDelayError 应该保持所有订阅执行直到所有订阅都完成,但是这个运算符没有在 版本 5.
我应该改用其他运营商吗?
var source1 = Rx.Observable.of(1,2,3).delay(3000);
var source2 = Rx.Observable.throw(new Error('woops'));
var source3 = Rx.Observable.of(4,5,6).delay(1000);
// Combine the 3 sources into 1
var source = Rx.Observable
.merge(source1, source2, source3)
.finally(() => {
// finally is executed before all
// subscriptions are completed.
console.log('finally');
});
var subscription = source.subscribe(
x => console.log('next:', x),
e => console.log('error:', e),
() => console.log('completed'));
JSBin
我认为您可以使用 catch()
模拟相同的行为。你只需要将它附加到每个源 Observable:
const sources = [source1, source2, source3].map(obs =>
obs.catch(() => Observable.empty())
);
Rx.Observable
.merge(sources)
.finally(...)
...
如果你不想吞下你的错误,而是想把它们拖到最后,你可以:
const mergeDelayErrors = [];
const sources = [source1, source2, source3].map(obs => obs.catch((error) => {
mergeDelayErrors.push(error);
return Rx.Observable.empty();
}));
return Rx.Observable
.merge(...sources)
.toArray()
.flatMap(allEmissions => {
let spreadObs = Rx.Observable.of(...allEmissions);
if (mergeDelayErrors.length) {
spreadObs = spreadObs.concat(Rx.Observable.throw(mergeDelayErrors));
}
return spreadObs;
})
您可能只想抛出第一个错误,或者创建一个 CompositeError。我不确定抛出多个错误时 mergeDelayErrors 最初的行为方式。
不幸的是,因为此实现必须等到所有可观察对象完成后才能发出错误,所以它也会等到所有可观察对象完成后再发出下一个错误。这可能不是 mergeDelayError 的原始行为,它应该作为流发出,而不是在最后发出它们。
我们可以通过收集错误并在最后发出它们来避免阻塞流。
function mergeDelayError(...sources) {
const errors = [];
const catching = sources.map(obs => obs.catch(e => {
errors.push(e);
return Rx.Observable.empty();
}));
return Rx.Observable
.merge(...catching)
.concat(Rx.Observable.defer(
() => errors.length === 0 ? Rx.Observable.empty() : Rx.Observable.throw(errors)));
}
const source1 = Rx.Observable.of(1,2,3);
const source2 = Rx.Observable.throw(new Error('woops'));
const source3 = Rx.Observable.of(4,5,6);
mergeDelayError(source1, source2, source3).subscribe(
x => console.log('next:', x),
e => console.log('error:', e),
() => console.log('completed'));
我想 combine/merge 多个 Observables,当每个都完成时执行 finally 函数。 merge
运算符似乎并行执行每个订阅,这正是我所需要的,但如果其中任何一个抛出错误,则执行将停止。
RxJS 版本 4 有一个运算符 mergeDelayError 应该保持所有订阅执行直到所有订阅都完成,但是这个运算符没有在 版本 5.
我应该改用其他运营商吗?
var source1 = Rx.Observable.of(1,2,3).delay(3000);
var source2 = Rx.Observable.throw(new Error('woops'));
var source3 = Rx.Observable.of(4,5,6).delay(1000);
// Combine the 3 sources into 1
var source = Rx.Observable
.merge(source1, source2, source3)
.finally(() => {
// finally is executed before all
// subscriptions are completed.
console.log('finally');
});
var subscription = source.subscribe(
x => console.log('next:', x),
e => console.log('error:', e),
() => console.log('completed'));
JSBin
我认为您可以使用 catch()
模拟相同的行为。你只需要将它附加到每个源 Observable:
const sources = [source1, source2, source3].map(obs =>
obs.catch(() => Observable.empty())
);
Rx.Observable
.merge(sources)
.finally(...)
...
如果你不想吞下你的错误,而是想把它们拖到最后,你可以:
const mergeDelayErrors = [];
const sources = [source1, source2, source3].map(obs => obs.catch((error) => {
mergeDelayErrors.push(error);
return Rx.Observable.empty();
}));
return Rx.Observable
.merge(...sources)
.toArray()
.flatMap(allEmissions => {
let spreadObs = Rx.Observable.of(...allEmissions);
if (mergeDelayErrors.length) {
spreadObs = spreadObs.concat(Rx.Observable.throw(mergeDelayErrors));
}
return spreadObs;
})
您可能只想抛出第一个错误,或者创建一个 CompositeError。我不确定抛出多个错误时 mergeDelayErrors 最初的行为方式。
不幸的是,因为此实现必须等到所有可观察对象完成后才能发出错误,所以它也会等到所有可观察对象完成后再发出下一个错误。这可能不是 mergeDelayError 的原始行为,它应该作为流发出,而不是在最后发出它们。
我们可以通过收集错误并在最后发出它们来避免阻塞流。
function mergeDelayError(...sources) {
const errors = [];
const catching = sources.map(obs => obs.catch(e => {
errors.push(e);
return Rx.Observable.empty();
}));
return Rx.Observable
.merge(...catching)
.concat(Rx.Observable.defer(
() => errors.length === 0 ? Rx.Observable.empty() : Rx.Observable.throw(errors)));
}
const source1 = Rx.Observable.of(1,2,3);
const source2 = Rx.Observable.throw(new Error('woops'));
const source3 = Rx.Observable.of(4,5,6);
mergeDelayError(source1, source2, source3).subscribe(
x => console.log('next:', x),
e => console.log('error:', e),
() => console.log('completed'));