RXJS 错误后继续 concat 订阅
RXJS Continue with concat subscribe after error
我有一组需要按顺序触发的可观察对象。一旦发生错误,我需要捕获错误,记录错误并继续观察。
目前,一旦出现错误,观察者就会停止。观察者必须继续并且不会在出错时重新启动或完成。
import * as Rx from "rxjs";
const source = [
Rx.Observable.from("1").delay(200),
Rx.Observable.from("2").delay(150),
Rx.Observable.throw("error"),
Rx.Observable.from("3").delay(124),
Rx.Observable.from("4").delay(201),
];
let sSource = Rx.Observable.concat(...source);
sSource.subscribe((v) => {console.log(v)}, (e) => {console.log(e)});
当前输出:
1
2
error
预期输出:
1
2
error
3
4
我们能想出的唯一解决方案是预循环 source
observables 并分别向它们添加 catch 处理程序,然后一旦发生错误,它就会得到正确处理并且观察者可以继续而无需完成整个串联的可观察对象。
我们觉得应该有一个更优雅的解决方案。如果需要,我会 post 我们目前拥有的解决方案。
您可以将 catch
运算符应用于每个源可观察对象,并可以在其中执行错误记录。像这样:
const sources = [
Rx.Observable.from("1").delay(200),
Rx.Observable.from("2").delay(150),
Rx.Observable.throw("error"),
Rx.Observable.from("3").delay(124),
Rx.Observable.from("4").delay(201),
];
const sourcesWithCatch = sources.map(s => s.catch(e => {
console.log(e);
return Rx.Observable.empty();
}));
const concatted = Rx.Observable.concat(...sourcesWithCatch);
concatted.subscribe(v => console.log(v));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
为了将来参考,Rxjs 5 具有 onErrorResumeNext
函数,其行为类似于 Visual Basic On Error Resume Next
语句。
这是来自 documentation
的示例
var source = Rx.Observable.onErrorResumeNext(
Rx.Observable.just(42),
Rx.Observable.throw(new Error()),
Rx.Observable.just(56),
Rx.Observable.throw(new Error()),
Rx.Observable.just(78)
);
var subscription = source.subscribe(
data => console.log(data)
);
// => 42
// => 56
// => 78
我有一组需要按顺序触发的可观察对象。一旦发生错误,我需要捕获错误,记录错误并继续观察。
目前,一旦出现错误,观察者就会停止。观察者必须继续并且不会在出错时重新启动或完成。
import * as Rx from "rxjs";
const source = [
Rx.Observable.from("1").delay(200),
Rx.Observable.from("2").delay(150),
Rx.Observable.throw("error"),
Rx.Observable.from("3").delay(124),
Rx.Observable.from("4").delay(201),
];
let sSource = Rx.Observable.concat(...source);
sSource.subscribe((v) => {console.log(v)}, (e) => {console.log(e)});
当前输出:
1
2
error
预期输出:
1
2
error
3
4
我们能想出的唯一解决方案是预循环 source
observables 并分别向它们添加 catch 处理程序,然后一旦发生错误,它就会得到正确处理并且观察者可以继续而无需完成整个串联的可观察对象。
我们觉得应该有一个更优雅的解决方案。如果需要,我会 post 我们目前拥有的解决方案。
您可以将 catch
运算符应用于每个源可观察对象,并可以在其中执行错误记录。像这样:
const sources = [
Rx.Observable.from("1").delay(200),
Rx.Observable.from("2").delay(150),
Rx.Observable.throw("error"),
Rx.Observable.from("3").delay(124),
Rx.Observable.from("4").delay(201),
];
const sourcesWithCatch = sources.map(s => s.catch(e => {
console.log(e);
return Rx.Observable.empty();
}));
const concatted = Rx.Observable.concat(...sourcesWithCatch);
concatted.subscribe(v => console.log(v));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
为了将来参考,Rxjs 5 具有 onErrorResumeNext
函数,其行为类似于 Visual Basic On Error Resume Next
语句。
这是来自 documentation
的示例var source = Rx.Observable.onErrorResumeNext(
Rx.Observable.just(42),
Rx.Observable.throw(new Error()),
Rx.Observable.just(56),
Rx.Observable.throw(new Error()),
Rx.Observable.just(78)
);
var subscription = source.subscribe(
data => console.log(data)
);
// => 42
// => 56
// => 78