为什么 Rxjs 会在出错时退订?

Why does Rxjs unsubscribe on error?

简而言之: 如何在流中发生错误后继续收听而不在每个 .subscribe?

之前放置 .catch

如果您需要更多详细信息,请访问此处:

假设我有一个当前用户的主题或 null。我有时从 API 获取数据并发送给主题。它会相应地更新视图。 但是在某些时候我的服务器上发生了错误,我希望我的应用程序继续像以前一样工作,但通知一些地方有关错误并继续收听我的主题。

最初我认为如果我只是做 userSubject.error(...) 它只会触发 .catch 回调和 error 订阅处理程序并跳过所有成功处理程序和链。 如果在我调用 userSubject.next(...) 之后,我的所有连锁店和订阅者都将像以前一样工作

但不幸的是,情况并非如此。在第一个未捕获的 .error 之后,它取消订阅流的订阅者并且他们不再操作。

所以我的问题是:为什么??? 如果我想正常处理 null 值但也只在某些地方处理错误,该怎么办?

这里是 link 到 RxJs 的源代码,订阅者在出错时取消订阅 https://github.com/ReactiveX/rxjs/blob/master/src/Subscriber.ts#L140

Rx observables follow the grammar next*(error|complete)?,这意味着它们在 errorcomplete 通知发出后不会产生任何结果。

可以从 Rx design guidelines:

中找到为什么这很重要的解释

The single message indicating that an observable sequence has finished ensures that consumers of the observable sequence can deterministically establish that it is safe to perform cleanup operations.

A single failure further ensures that abort semantics can be maintained for operators that work on multiple observable sequences.

简而言之,如果您希望您的观察者在发生服务器错误后继续收听主题,请不要将该错误传递给主题,而是以其他方式处理它(例如使用 catch, retry 或将错误发送给专门的主题)。

每个 Observable 发出零个或多个 next 通知和一个 errorcomplete 但不会同时发出两个。

因此,Subject 具有内部状态。

那就看你怎么建链了。例如,你可以使用 retry() 重新订阅它的源 Observable 错误。

或者当您将值传递给您的主题时,您只能发送 next 通知而忽略其他两个:

.subscribe(v => subject.next(v));

或者如果您想在用户 null 时抛出错误,您可以使用任何捕获异常并将它们作为错误通知发送的运算符。例如像这样:

.map(v => {
    if (v === null) {
        throw new Error("It's broken");
    }
    return v;
})

反正没有代码很难给出更准确的建议。