subscribe 方法中的 Observable unsubscribe

Observable unsubscribe inside subscribe method

我尝试在订阅方法中取消订阅。好像还行,网上没找到这样的例子。

我知道还有很多其他的方法可以取消订阅方法或者用管道限制它。请不要建议任何其他解决方案,但请回答为什么你不应该这样做或者这是一种可能的方法吗?

示例:

let localSubscription = someObservable.subscribe(result => {
  this.result = result;
  if (localSubscription && someStatement) {
    localSubscription.unsubscribe();
  }
});

是工作方式,但是RxJS主要推荐在Angular中使用async管道。这是完美的解决方案。在您的示例中,您将 result 分配给对象 属性,这不是一个好的做法。

如果您在模板中使用变量,则只需使用 async 管道。如果你不这样做,就让它以这种方式被观察到:

private readonly result$ = someObservable.pipe(/...get exactly what you need here.../)

然后您可以在需要时使用结果$:在其他可观察对象或模板中。

您也可以使用 pipe(take(1))pipe(first()) 取消订阅。还有一些其他的管道方法允许您取消订阅而无需额外的代码。

有多种取消订阅数据的方法:

Method 1: Unsubscribe after subscription; (Not preferred)

let localSubscription = someObservable.subscribe(result => {
  this.result = result;
}).unsubscribe();
---------------------
Method 2: If you want only first one or 2 values, use take operator or first operator

a) let localSubscription = 
  someObservable.pipe(take(1)).subscribe(result => {
     this.result = result;
  });

b) let localSubscription = 
  someObservable.pipe(first()).subscribe(result => {
     this.result = result;
  });
---------------------
Method 3: Use Subscription and unsubscribe in your ngOnDestroy();

let localSubscription = 
  someObservable.subscribe(result => {
     this.result = result;
  });

ngOnDestroy() { this.localSubscription.unsubscribe() }
----------------------

Method 4: Use Subject and takeUntil Operator and destroy in ngOnDestroy

let destroySubject: Subject<any> = new Subject();

let localSubscription = 
  someObservable.pipe(takeUntil(this.destroySubject)).subscribe(result => {
     this.result = result;
  });

ngOnDestroy() { 
  this.destroySubject.next();
  this.destroySubject.complete();
}

我个人更喜欢方法4,因为如果你在一个页面中,你可以对多个订阅使用同一个销毁主题。

问题

有时您在上面使用的模式会起作用,有时则不会。这里有两个例子,你可以自己尝试运行。一个会抛出错误,另一个不会。

const subscription = of(1,2,3,4,5).pipe(
  tap(console.log)
).subscribe(v => {
  if(v === 4) subscription.unsubscribe();
});

输出:

1
2
3
4
Error: Cannot access 'subscription' before initialization

类似的东西:

const subscription = of(1,2,3,4,5).pipe(
  tap(console.log),
  delay(0)
).subscribe(v => {
  if (v === 4) subscription.unsubscribe();
});

输出:

1
2
3
4

这次你没有收到错误,但你也在 5 从源可观察对象发出之前取消订阅 of(1,2,3,4,5)

隐藏的约束

如果您熟悉 RxJS 中的调度程序,您可能会立即发现允许一个示例工作而另一个示例不工作的额外隐藏信息。

delay(即使延迟0毫秒)returns一个使用异步调度器的Observable。这实际上意味着当前代码块将在延迟的可观察对象有机会发出之前完成执行。

这保证在单线程环境中(例如 Javascript 运行 当前在浏览器中找到的时间)您的订阅已经初始化。

解决方案

1。保持脆弱的代码库

一个可能的解决方案是忽略常识并继续使用此模式取消订阅。为此,您和您团队中可能使用您的代码作为参考或有朝一日可能需要维护您的代码的任何人都必须承担额外的认知负担,即记住哪个 Observable 使用了正确的调度程序。

在应用程序的一部分中更改 observable 转换数据的方式可能会导致依赖于异步调度程序提供的此数据的应用程序的每个部分出现意外错误。

例如:在查询服务器时 运行 正常的代码可能会在同步返回兑现结果时中断。看似优化的东西现在会对您的代码库造成严重破坏。出现这种错误时,源头就很难追查了。

最后,如果浏览器(或者您是 Node.js 中的 运行ning 代码)开始支持多线程环境,您的代码将不得不在没有增强功能的情况下凑合使用,或者重写。

2。使“取消订阅内部订阅回调”成为一种安全模式

惯用的 RxJS 代码尽可能地与调度无关。

以下是您可以如何使用上述模式而不必担心可观察对象正在使用哪个调度程序。这实际上与调度程序无关,尽管它可能使一个相当简单的任务变得比它需要的复杂得多。

const stream = publish()(of(1,2,3,4,5));

const subscription = stream.pipe(
  tap(console.log)
).subscribe(x => {
  if(x === 4) subscription.unsubscribe();
});

stream.connect();

这让您可以安全地使用“在订阅中取消订阅”模式。无论调度程序如何,这都将始终有效,并且如果(例如)您将代码置于多线程环境中(上面的 delay 示例可能会中断,但不会),它将继续工作。

3。 RxJS 运算符

最好的解决方案是使用代表您处理 subscription/unsubscription 的运算符。他们在最好的情况下不需要额外的认知负担,并且在更奇特的情况下能够相对较好地 contain/manage 错误(远距离的怪异动作较少)。

大多数高阶运算符都这样做(concatmergeconcatMapswitchMapmergeMap、等)。 taketakeUntiltakeWhile 等其他运算符可让您使用更具声明性的样式来管理订阅。

在可能的情况下,这些是更可取的,因为它们都不太可能在使用它们的团队中引起奇怪的错误或混乱。

重写上面的例子:

of(1,2,3,4,5).pipe(
  tap(console.log)
  first(v => v === 4)
).subscribe();