rxJs Subject,订阅错误会杀死整个流

rxJs Subject, error in subscriptions kills the whole stream

我知道这是此处描述的已知行为

基本问题在上面的link中描述,但这里是相关代码(摘自link)

// This is going to behave strangely
const source$ = Observable.interval(1000).share();
const mapped$ = source$.map(x => {
  if (x === 1) {
    throw new Error('oops');
  }
  return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"

订阅中的错误将终止整个源流。

Observable 的解决方案是使用 .observeOn(Rx.Scheduler.asap);

我对整个反应式编程还很陌生,我很难将这个解决方案应用到我的 Subject 因为主题不支持 observeOn.

但我需要一个 Subject 因为我需要将新的价值推向 steam。

如何解决此问题或将 observeOnSubject 一起使用?

observeOn returns 一个 Observable。但我很难将 observeOn 与我的 Subject 结合起来。 如何使用 observeOn 并仍然能够向我的主题推送值?

这是当前代码(简体)

export class MyClass{

    private messages: Subject<Message> = new Subject<Message>();

    dispatchMessage(message: Message) {

     this.messages.next(message);
}

想法?

P.S.

对于任何使用 angular 的人(比如我),observeOn 可能会产生一些不良副作用。 https://github.com/angular/angular/issues/14316

作为回答此问题的任何人的附加信息。

在这种情况下,您只需要在附加 observeOn 运算符后单独引用 Subject 以及对链的引用:

const subject$ = new Subject();
const obs$ = subject$.observeOn(...);

obs$.subscribe(...);
subject$.next(...);