rxJs Subject,订阅错误会杀死整个流
rxJs Subject, error in subscriptions kills the whole stream
我知道这是此处描述的已知行为
基本问题在上面的link中描述,但这里是相关代码(摘自link)
// This is going to behave strangely
const source$ = Observable.interval(1000).share();
const mapped$ = source$.map(x => {
if (x === 1) {
throw new Error('oops');
}
return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"
订阅中的错误将终止整个源流。
Observable 的解决方案是使用 .observeOn(Rx.Scheduler.asap);
我对整个反应式编程还很陌生,我很难将这个解决方案应用到我的 Subject
因为主题不支持 observeOn
.
但我需要一个 Subject
因为我需要将新的价值推向 steam。
如何解决此问题或将 observeOn
与 Subject
一起使用?
observeOn
returns 一个 Observable
。但我很难将 observeOn
与我的 Subject
结合起来。
如何使用 observeOn 并仍然能够向我的主题推送值?
这是当前代码(简体)
export class MyClass{
private messages: Subject<Message> = new Subject<Message>();
dispatchMessage(message: Message) {
this.messages.next(message);
}
想法?
P.S.
对于任何使用 angular 的人(比如我),observeOn 可能会产生一些不良副作用。
https://github.com/angular/angular/issues/14316
作为回答此问题的任何人的附加信息。
在这种情况下,您只需要在附加 observeOn
运算符后单独引用 Subject
以及对链的引用:
const subject$ = new Subject();
const obs$ = subject$.observeOn(...);
obs$.subscribe(...);
subject$.next(...);
我知道这是此处描述的已知行为
基本问题在上面的link中描述,但这里是相关代码(摘自link)
// This is going to behave strangely
const source$ = Observable.interval(1000).share();
const mapped$ = source$.map(x => {
if (x === 1) {
throw new Error('oops');
}
return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"
订阅中的错误将终止整个源流。
Observable 的解决方案是使用 .observeOn(Rx.Scheduler.asap);
我对整个反应式编程还很陌生,我很难将这个解决方案应用到我的 Subject
因为主题不支持 observeOn
.
但我需要一个 Subject
因为我需要将新的价值推向 steam。
如何解决此问题或将 observeOn
与 Subject
一起使用?
observeOn
returns 一个 Observable
。但我很难将 observeOn
与我的 Subject
结合起来。
如何使用 observeOn 并仍然能够向我的主题推送值?
这是当前代码(简体)
export class MyClass{
private messages: Subject<Message> = new Subject<Message>();
dispatchMessage(message: Message) {
this.messages.next(message);
}
想法?
P.S.
对于任何使用 angular 的人(比如我),observeOn 可能会产生一些不良副作用。 https://github.com/angular/angular/issues/14316
作为回答此问题的任何人的附加信息。
在这种情况下,您只需要在附加 observeOn
运算符后单独引用 Subject
以及对链的引用:
const subject$ = new Subject();
const obs$ = subject$.observeOn(...);
obs$.subscribe(...);
subject$.next(...);