Angular/RxJS 我应该什么时候退订“订阅”

Angular/RxJS When should I unsubscribe from `Subscription`

我应该在 ngOnDestroy 生命周期中什么时候存储 Subscription 实例并调用 unsubscribe() 以及什么时候可以简单地忽略它们?

保存所有订阅会给组件代码带来很多混乱。

HTTP Client Guide 忽略这样的订阅:

getHeroes() {
  this.heroService.getHeroes()
                  .subscribe(
                     heroes => this.heroes = heroes,
                     error =>  this.errorMessage = <any>error);
}

同时 Route & Navigation Guide 说:

Eventually, we'll navigate somewhere else. The router will remove this component from the DOM and destroy it. We need to clean up after ourselves before that happens. Specifically, we must unsubscribe before Angular destroys the component. Failure to do so could create a memory leak.

We unsubscribe from our Observable in the ngOnDestroy method.

private sub: any;

ngOnInit() {
  this.sub = this.route.params.subscribe(params => {
     let id = +params['id']; // (+) converts string 'id' to a number
     this.service.getHero(id).then(hero => this.hero = hero);
   });
}

ngOnDestroy() {
  this.sub.unsubscribe();
}

Angular2 官方文档提供了何时退订以及何时可以安全忽略的解释。看看这个 link:

https://angular.io/docs/ts/latest/cookbook/component-communication.html#!#bidirectional-service

查找标题为 Parent 和 children 通过服务进行通信的段落,然后是蓝色框:

Notice that we capture the subscription and unsubscribe when the AstronautComponent is destroyed. This is a memory-leak guard step. There is no actual risk in this app because the lifetime of a AstronautComponent is the same as the lifetime of the app itself. That would not always be true in a more complex application.

We do not add this guard to the MissionControlComponent because, as the parent, it controls the lifetime of the MissionService.

希望对您有所帮助。

视情况而定。如果通过调用 someObservable.subscribe(),您开始占用一些必须在组件生命周期结束时手动释放的资源,那么您应该调用 theSubscription.unsubscribe() 以防止内存泄漏。

让我们仔细看看你的例子:

getHero() returns http.get() 的结果。如果您查看 angular 2 source codehttp.get() 会创建两个事件侦听器:

_xhr.addEventListener('load', onLoad);
_xhr.addEventListener('error', onError);

并通过调用 unsubscribe(),您可以取消请求以及监听器:

_xhr.removeEventListener('load', onLoad);
_xhr.removeEventListener('error', onError);
_xhr.abort();

请注意,_xhr 是特定于平台的,但我认为在您的情况下可以安全地假设它是 XMLHttpRequest()

通常,这足以证明需要手动 unsubscribe() 调用。但是根据此 WHATWG specXMLHttpRequest() 一旦成为 "done" 就会受到垃圾回收,即使有事件侦听器附加到它也是如此。所以我想这就是为什么 angular 2 官方指南省略 unsubscribe() 并让 GC 清理监听器的原因。

至于你的第二个例子,这取决于params的实现。从今天开始,angular 官方指南不再显示取消订阅 params。我调查了 src again and found that params is a just a BehaviorSubject。由于没有使用事件侦听器或计时器,也没有创建全局变量,因此省略 unsubscribe().

应该是安全的

你的问题的底线是总是调用 unsubscribe() 来防止内存泄漏,除非你确定可观察对象的执行不会创建全局变量、添加事件侦听器、设置计时器,或执行任何其他导致内存泄漏的操作。

如有疑问,请查看该可观察对象的实现。如果 observable 已经在其 unsubscribe() 中写入了一些清理逻辑,这通常是构造函数返回的函数,那么您有充分的理由认真考虑调用 unsubscribe().

TL;DR

对于这个问题,有两种 Observables - finite 值和 infinite 值。

http Observables 产生 finite (1) 值和类似 DOM 事件监听器 Observable 产生 infinite 值。

如果您手动调用 subscribe(不使用异步管道),那么 unsubscribe 来自 infinite Observables。

不用担心 有限 个,RxJs 会处理它们。


来源:

  1. 我在 Angular 的 Gitter here.

    中找到了 Rob Wormald 的回答

    他说(为了清楚起见,我重新组织了重点,重点是我的):

    if its a single-value-sequence (like an http request) the manual cleanup is unnecessary (assuming you subscribe in the controller manually)

    i should say "if its a sequence that completes" (of which single value sequences, a la http, are one)

    if its an infinite sequence, you should unsubscribe which the async pipe does for you

    他还在 this YouTube video 关于 Observables 中提到 “他们自己清理......” 在 Observables 的上下文中 complete(就像 Promises,它总是完成,因为它们总是产生一个值并结束——我们从不担心取消订阅 Promises 以确保它们清理 XHR 事件监听器,对吧?)

  2. 也在Rangle guide to Angular 2中显示

    In most cases we will not need to explicitly call the unsubscribe method unless we want to cancel early or our Observable has a longer lifespan than our subscription. The default behavior of Observable operators is to dispose of the subscription as soon as .complete() or .error() messages are published. Keep in mind that RxJS was designed to be used in a "fire and forget" fashion most of the time.

    短语 “我们的 Observable 的生命周期比我们的订阅更长” 何时适用?

    当在 Observable 完成之前(或未 'long' 之前)销毁的组件内创建订阅时适用。

    我认为这意味着如果我们订阅了一个 http 请求或一个发出 10 个值的 Observable 并且我们的组件在那个 http 请求 return 或 10值已发出,我们仍然可以!

    当请求执行 return 或最终发出第 10 个值时,Observable 将完成并清理所有资源。

  3. 如果我们从同一个 Rangle 指南中查看 this example,我们可以看到订阅 route.params 确实需要 unsubscribe(),因为我们不知道当那些 params 将停止更改(发出新值)时。

    组件可能会因导航离开而被销毁,在这种情况下,路由参数可能仍会发生变化(它们在技术上可能会发生变化,直到应用程序结束)并且订阅中分配的资源仍将被分配,因为还没有完成.

  4. 自 2016 年 11 月起 this video from NgEurope Rob Wormald also says you do not need to unsubscribe from Router Observables. He also mentions the http service and ActivatedRoute.params in this video

  5. Angular 教程,the Routing chapter 现在声明如下:

    The Router manages the observables it provides and localizes the subscriptions. The subscriptions are cleaned up when the component is destroyed, protecting against memory leaks, so we don't need to unsubscribe from the route params Observable.

    这里有一个 discussion 关于 GitHub 问题的 Angular 文档关于 Router Observables 的文档,其中 Ward Bell 提到正在对所有这些进行澄清。


我在 NGConf 上与 Ward Bell 讨论了这个问题(我什至向他展示了这个答案,他说是正确的)但他告诉我 Angular 的文档团队对这个问题有一个未发布的解决方案(尽管他们正在努力获得批准)。他还告诉我,我可以用即将发布的官方推荐来更新我的 SO 答案。

我们今后都应该使用的解决方案是向所有组件添加一个 private ngUnsubscribe = new Subject<void>(); 字段,这些组件在其 class 代码中对 Observables 进行了 .subscribe() 调用。

然后我们在 ngOnDestroy() 方法中调用 this.ngUnsubscribe.next(); this.ngUnsubscribe.complete();

秘诀(正如 已经指出的那样)是在我们的每个 .subscribe() 调用之前调用 takeUntil(this.ngUnsubscribe),这将保证在组件运行时清除所有订阅销毁。

示例:

import { Component, OnDestroy, OnInit } from '@angular/core';
// RxJs 6.x+ import paths
import { filter, startWith, takeUntil } from 'rxjs/operators';
import { Subject } from 'rxjs';
import { BookService } from '../books.service';

@Component({
    selector: 'app-books',
    templateUrl: './books.component.html'
})
export class BooksComponent implements OnDestroy, OnInit {
    private ngUnsubscribe = new Subject<void>();

    constructor(private booksService: BookService) { }

    ngOnInit() {
        this.booksService.getBooks()
            .pipe(
               startWith([]),
               filter(books => books.length > 0),
               takeUntil(this.ngUnsubscribe)
            )
            .subscribe(books => console.log(books));

        this.booksService.getArchivedBooks()
            .pipe(takeUntil(this.ngUnsubscribe))
            .subscribe(archivedBooks => console.log(archivedBooks));
    }

    ngOnDestroy() {
        this.ngUnsubscribe.next();
        this.ngUnsubscribe.complete();
    }
}

注意:重要的是添加 takeUntil 运算符作为最后一个运算符,以防止运算符链中的中间 Observables 泄漏。


最近,在 Adventures in Angular 的一集中,Ben Lesh 和 Ward Bell 讨论了围绕 how/when 取消订阅组件的问题。讨论大约从 1:05:30.

开始

Ward 提到 “现在有一个糟糕的 takeUntil 舞蹈需要很多机器” 并且 Shai Reznik 提到 “Angular 处理了一些http 和路由等订阅.

作为回应,Ben 提到现在正在讨论允许 Observables 挂钩到 Angular 组件生命周期事件,Ward 建议组件可以订阅生命周期事件的 Observable 作为了解何时的一种方式完成作为组件内部状态维护的 Observables。

也就是说,我们现在最需要的是解决方案,所以这里有一些其他资源。

  1. RxJs 核心团队成员 Nicholas Jamieson 对 takeUntil() 模式的建议和帮助实施它的 TSLint 规则:https://ncjamieson.com/avoiding-takeuntil-leaks/

  2. 轻量级 npm 包,公开一个 Observable 操作符,该操作符将组件实例 (this) 作为参数并在 ngOnDestroy 期间自动取消订阅:https://github.com/NetanelBasal/ngx-take-until-destroy

  3. 如果您不进行 AOT 构建(但我们现在都应该进行 AOT),则上述的另一种变体具有更好的人体工程学:https://github.com/smnbbrv/ngx-rx-collector

  4. 自定义指令*ngSubscribe,它的工作方式类似于异步管道,但在您的模板中创建了一个嵌入式视图,因此您可以在整个模板中引用 'unwrapped' 值:https://netbasal.com/diy-subscription-handling-directive-in-angular-c8f6e762697f

我在 Nicholas 博客的评论中提到,过度使用 takeUntil() 可能表明您的组件试图做太多事情,并且将现有组件分离到 Feature应考虑 Presentational 组件。然后,您可以将 Feature 组件中的 Observable | async 转换为 Presentational 组件的 Input,这意味着任何地方都不需要订阅。阅读有关此方法的更多信息 here

您不需要一堆订阅和手动退订。使用 Subject and takeUntil 组合来像老板一样处理订阅:

import { Subject } from "rxjs"
import { takeUntil } from "rxjs/operators"

@Component({
  moduleId: __moduleName,
  selector: "my-view",
  templateUrl: "../views/view-route.view.html"
})
export class ViewRouteComponent implements OnInit, OnDestroy {
  componentDestroyed$: Subject<boolean> = new Subject()

  constructor(private titleService: TitleService) {}

  ngOnInit() {
    this.titleService.emitter1$
      .pipe(takeUntil(this.componentDestroyed$))
      .subscribe((data: any) => { /* ... do something 1 */ })

    this.titleService.emitter2$
      .pipe(takeUntil(this.componentDestroyed$))
      .subscribe((data: any) => { /* ... do something 2 */ })

    //...

    this.titleService.emitterN$
      .pipe(takeUntil(this.componentDestroyed$))
      .subscribe((data: any) => { /* ... do something N */ })
  }

  ngOnDestroy() {
    this.componentDestroyed$.next(true)
    this.componentDestroyed$.complete()
  }
}

替代方法,已提出 , uses takeWhile instead of takeUntil. You may prefer it, but mind that this way your Observable execution will not be cancelled on ngDestroy of your component (e.g. when you make time consuming calculations or wait for data from server). Method, which is based on takeUntil, doesn't have this drawback and leads to immediate cancellation of request.

代码如下:

@Component({
  moduleId: __moduleName,
  selector: "my-view",
  templateUrl: "../views/view-route.view.html"
})
export class ViewRouteComponent implements OnInit, OnDestroy {
  alive: boolean = true

  constructor(private titleService: TitleService) {}

  ngOnInit() {
    this.titleService.emitter1$
      .pipe(takeWhile(() => this.alive))
      .subscribe((data: any) => { /* ... do something 1 */ })

    this.titleService.emitter2$
      .pipe(takeWhile(() => this.alive))
      .subscribe((data: any) => { /* ... do something 2 */ })

    // ...

    this.titleService.emitterN$
      .pipe(takeWhile(() => this.alive))
      .subscribe((data: any) => { /* ... do something N */ })
  }

  ngOnDestroy() {
    this.alive = false
  }
}

我尝试了 seangwright 的解决方案(编辑 3)

这不适用于由计时器或间隔创建的 Observable。

但是,我使用另一种方法让它工作:

import { Component, OnDestroy, OnInit } from '@angular/core';
import 'rxjs/add/operator/takeUntil';
import { Subject } from 'rxjs/Subject';
import { Subscription } from 'rxjs/Subscription';
import 'rxjs/Rx';

import { MyThingService } from '../my-thing.service';

@Component({
   selector: 'my-thing',
   templateUrl: './my-thing.component.html'
})
export class MyThingComponent implements OnDestroy, OnInit {
   private subscriptions: Array<Subscription> = [];

  constructor(
     private myThingService: MyThingService,
   ) { }

  ngOnInit() {
    const newSubs = this.myThingService.getThings()
        .subscribe(things => console.log(things));
    this.subscriptions.push(newSubs);
  }

  ngOnDestroy() {
    for (const subs of this.subscriptions) {
      subs.unsubscribe();
   }
 }
}

基于:Using Class inheritance to hook to Angular 2 component lifecycle

另一种通用方法:

export abstract class UnsubscribeOnDestroy implements OnDestroy {
  protected d$: Subject<any>;

  constructor() {
    this.d$ = new Subject<void>();

    const f = this.ngOnDestroy;
    this.ngOnDestroy = () => {
      f();
      this.d$.next();
      this.d$.complete();
    };
  }

  public ngOnDestroy() {
    // no-op
  }

}

并使用:

@Component({
    selector: 'my-comp',
    template: ``
})
export class RsvpFormSaveComponent extends UnsubscribeOnDestroy implements OnInit {

    constructor() {
        super();
    }

    ngOnInit(): void {
      Observable.of('bla')
      .takeUntil(this.d$)
      .subscribe(val => console.log(val));
    }
}

由于 seangwright 的解决方案(编辑 3)似乎非常有用,我也发现很难将此功能打包到基础组件中,并提示其他项目队友记得在 ngOnDestroy 上调用 super() 以激活此功能.

这个答案提供了一种摆脱超级调用的方法,并使"componentDestroyed$"成为基本组件的核心。

class BaseClass {
    protected componentDestroyed$: Subject<void> = new Subject<void>();
    constructor() {

        /// wrap the ngOnDestroy to be an Observable. and set free from calling super() on ngOnDestroy.
        let _$ = this.ngOnDestroy;
        this.ngOnDestroy = () => {
            this.componentDestroyed$.next();
            this.componentDestroyed$.complete();
            _$();
        }
    }

    /// placeholder of ngOnDestroy. no need to do super() call of extended class.
    ngOnDestroy() {}
}

然后您可以自由使用此功能,例如:

@Component({
    selector: 'my-thing',
    templateUrl: './my-thing.component.html'
})
export class MyThingComponent extends BaseClass implements OnInit, OnDestroy {
    constructor(
        private myThingService: MyThingService,
    ) { super(); }

    ngOnInit() {
        this.myThingService.getThings()
            .takeUntil(this.componentDestroyed$)
            .subscribe(things => console.log(things));
    }

    /// optional. not a requirement to implement OnDestroy
    ngOnDestroy() {
        console.log('everything works as intended with or without super call');
    }

}

我喜欢最后两个答案,但如果子类在 ngOnDestroy 中引用 "this",我会遇到问题。

我把它修改成这样,看起来它解决了那个问题。

export abstract class BaseComponent implements OnDestroy {
    protected componentDestroyed$: Subject<boolean>;
    constructor() {
        this.componentDestroyed$ = new Subject<boolean>();
        let f = this.ngOnDestroy;
        this.ngOnDestroy = function()  {
            // without this I was getting an error if the subclass had
            // this.blah() in ngOnDestroy
            f.bind(this)();
            this.componentDestroyed$.next(true);
            this.componentDestroyed$.complete();
        };
    }
    /// placeholder of ngOnDestroy. no need to do super() call of extended class.
    ngOnDestroy() {}
}

订阅 class 有一个有趣的特点:

Represents a disposable resource, such as the execution of an Observable. A Subscription has one important method, unsubscribe, that takes no argument and just disposes the resource held by the subscription.
Additionally, subscriptions may be grouped together through the add() method, which will attach a child Subscription to the current Subscription. When a Subscription is unsubscribed, all its children (and its grandchildren) will be unsubscribed as well.

您可以创建一个聚合订阅对象来对您的所有订阅进行分组。 为此,您可以创建一个空订阅并使用其 add() 方法向其添加订阅。当你的组件被销毁时,你只需要取消订阅聚合订阅即可。

@Component({ ... })
export class SmartComponent implements OnInit, OnDestroy {
  private subscriptions = new Subscription();

  constructor(private heroService: HeroService) {
  }

  ngOnInit() {
    this.subscriptions.add(this.heroService.getHeroes().subscribe(heroes => this.heroes = heroes));
    this.subscriptions.add(/* another subscription */);
    this.subscriptions.add(/* and another subscription */);
    this.subscriptions.add(/* and so on */);
  }

  ngOnDestroy() {
    this.subscriptions.unsubscribe();
  }
}

官方编辑 #3 答案(和变体)效果很好,但让我感兴趣的是围绕可观察订阅的业务逻辑 'muddying'。

这是使用包装器的另一种方法。

Warining: experimental code

文件 subscribeAndGuard.ts 用于创建一个新的 Observable 扩展来包装 .subscribe() 并在其中包装 ngOnDestroy().
用法与 .subscribe() 相同,除了引用组件的附加第一个参数。

import { Observable } from 'rxjs/Observable';
import { Subscription } from 'rxjs/Subscription';

const subscribeAndGuard = function(component, fnData, fnError = null, fnComplete = null) {

  // Define the subscription
  const sub: Subscription = this.subscribe(fnData, fnError, fnComplete);

  // Wrap component's onDestroy
  if (!component.ngOnDestroy) {
    throw new Error('To use subscribeAndGuard, the component must implement ngOnDestroy');
  }
  const saved_OnDestroy = component.ngOnDestroy;
  component.ngOnDestroy = () => {
    console.log('subscribeAndGuard.onDestroy');
    sub.unsubscribe();
    // Note: need to put original back in place
    // otherwise 'this' is undefined in component.ngOnDestroy
    component.ngOnDestroy = saved_OnDestroy;
    component.ngOnDestroy();

  };

  return sub;
};

// Create an Observable extension
Observable.prototype.subscribeAndGuard = subscribeAndGuard;

// Ref: https://www.typescriptlang.org/docs/handbook/declaration-merging.html
declare module 'rxjs/Observable' {
  interface Observable<T> {
    subscribeAndGuard: typeof subscribeAndGuard;
  }
}

这是一个有两个订阅的组件,一个有包装器,一个没有。唯一需要注意的是 必须实现 OnDestroy(如果需要,可以使用空主体),否则 Angular 不知道调用包装版本。

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Observable } from 'rxjs/Observable';
import 'rxjs/Rx';
import './subscribeAndGuard';

@Component({
  selector: 'app-subscribing',
  template: '<h3>Subscribing component is active</h3>',
})
export class SubscribingComponent implements OnInit, OnDestroy {

  ngOnInit() {

    // This subscription will be terminated after onDestroy
    Observable.interval(1000)
      .subscribeAndGuard(this,
        (data) => { console.log('Guarded:', data); },
        (error) => { },
        (/*completed*/) => { }
      );

    // This subscription will continue after onDestroy
    Observable.interval(1000)
      .subscribe(
        (data) => { console.log('Unguarded:', data); },
        (error) => { },
        (/*completed*/) => { }
      );
  }

  ngOnDestroy() {
    console.log('SubscribingComponent.OnDestroy');
  }
}

一个演示插件是 here

补充说明: 重新编辑 3 - 'Official' 解决方案,这可以通过在订阅前使用 takeWhile() 而不是 takeUntil() 以及在 ngOnDestroy 中使用简单的布尔值而不是另一个 Observable 来简化。

@Component({...})
export class SubscribingComponent implements OnInit, OnDestroy {

  iAmAlive = true;
  ngOnInit() {

    Observable.interval(1000)
      .takeWhile(() => { return this.iAmAlive; })
      .subscribe((data) => { console.log(data); });
  }

  ngOnDestroy() {
    this.iAmAlive = false;
  }
}

你通常需要在组件被销毁时取消订阅,但是 Angular 会随着我们的进行越来越多地处理它,例如在 Angular4 的新次要版本中,他们有此部分用于路由退订:

Do you need to unsubscribe?

As described in the ActivatedRoute: the one-stop-shop for route information section of the Routing & Navigation page, the Router manages the observables it provides and localizes the subscriptions. The subscriptions are cleaned up when the component is destroyed, protecting against memory leaks, so you don't need to unsubscribe from the route paramMap Observable.

另外下面的例子是Angular的一个很好的例子,创建一个组件然后销毁它,看看组件如何实现OnDestroy,如果你需要onInit,你也可以在你的组件中实现它,比如实现 OnInit, OnDestroy

import { Component, Input, OnDestroy } from '@angular/core';  
import { MissionService } from './mission.service';
import { Subscription }   from 'rxjs/Subscription';

@Component({
  selector: 'my-astronaut',
  template: `
    <p>
      {{astronaut}}: <strong>{{mission}}</strong>
      <button
        (click)="confirm()"
        [disabled]="!announced || confirmed">
        Confirm
      </button>
    </p>
  `
})

export class AstronautComponent implements OnDestroy {
  @Input() astronaut: string;
  mission = '<no mission announced>';
  confirmed = false;
  announced = false;
  subscription: Subscription;

  constructor(private missionService: MissionService) {
    this.subscription = missionService.missionAnnounced$.subscribe(
      mission => {
        this.mission = mission;
        this.announced = true;
        this.confirmed = false;
    });
  }

  confirm() {
    this.confirmed = true;
    this.missionService.confirmMission(this.astronaut);
  }

  ngOnDestroy() {
    // prevent memory leak when component destroyed
    this.subscription.unsubscribe();
  }
}

根据 的回答,我写了一个摘要 class 来处理组件中 "infinite" observables 的订阅:

import { OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs/Subscription';
import { Subject } from 'rxjs/Subject';
import { Observable } from 'rxjs/Observable';
import { PartialObserver } from 'rxjs/Observer';

export abstract class InfiniteSubscriberComponent implements OnDestroy {
  private onDestroySource: Subject<any> = new Subject();

  constructor() {}

  subscribe(observable: Observable<any>): Subscription;

  subscribe(
    observable: Observable<any>,
    observer: PartialObserver<any>
  ): Subscription;

  subscribe(
    observable: Observable<any>,
    next?: (value: any) => void,
    error?: (error: any) => void,
    complete?: () => void
  ): Subscription;

  subscribe(observable: Observable<any>, ...subscribeArgs): Subscription {
    return observable
      .takeUntil(this.onDestroySource)
      .subscribe(...subscribeArgs);
  }

  ngOnDestroy() {
    this.onDestroySource.next();
    this.onDestroySource.complete();
  }
}

要使用它,只需在您的 angular 组件中扩展它并调用 subscribe() 方法,如下所示:

this.subscribe(someObservable, data => doSomething());

它也像往常一样接受错误和完成回调,一个观察者对象,或者根本不接受回调。如果您也在子组件中实现该方法,请记住调用 super.ngOnDestroy()

在这里找到 Ben Lesh 的附加参考:RxJS: Don’t Unsubscribe

上述情况的另外一个简短补充是:

  • 始终取消订阅,当订阅流中的新值不再需要或无关紧要时,这将导致触发器数量减少并在少数情况下提高性能。订阅的组件 data/event 不再存在或需要对全新流的新订阅(刷新等)等情况是取消订阅的一个很好的例子。

有关 Angular 组件内可观察对象取消订阅的一些最佳实践:

引自Routing & Navigation

When subscribing to an observable in a component, you almost always arrange to unsubscribe when the component is destroyed.

There are a few exceptional observables where this is not necessary. The ActivatedRoute observables are among the exceptions.

The ActivatedRoute and its observables are insulated from the Router itself. The Router destroys a routed component when it is no longer needed and the injected ActivatedRoute dies with it.

Feel free to unsubscribe anyway. It is harmless and never a bad practice.

并回复以下链接:

我在 Angular 组件中收集了一些关于 observables 取消订阅的最佳实践与您分享:

  • http 可观察取消订阅是有条件的,我们应该根据具体情况考虑组件被销毁后 'subscribe callback' 成为 运行 的影响。我们知道 angular 取消订阅并清理 http 可观察对象本身 (1), (2). While this is true from the perspective of resources it only tells half the story. Let's say we're talking about directly calling http from within a component, and the http response took longer than needed so the user closed the component. The subscribe() handler will still be called even if the component is closed and destroyed. This can have unwanted side effects and in the worse scenarios leave the application state broken. It can also cause exceptions if the code in the callback tries to call something that has just been disposed of. However at the same time occasionally they are desired. Like, let's say you're creating an email client and you trigger a sound when the email is done sending - well you'd still want that to occur even if the component is closed (8).
  • 无需取消订阅已完成或出错的可观察对象。不过,这样做也无妨(7).
  • 尽可能使用AsyncPipe,因为它会在组件销毁时自动取消订阅可观察对象。
  • 取消订阅像 route.params 这样的 ActivatedRoute 可观察对象,如果它们是在嵌套(使用组件选择器添加到 tpl 中)或动态组件中订阅的,因为只要parent/host 组件存在。无需在其他情况下取消订阅,如上文引用 Routing & Navigation 文档中所述。
  • 例如,取消订阅通过 Angular 服务公开的组件之间共享的全局 observable,因为只要组件初始化,它们就可能被多次订阅。
  • 无需取消订阅应用程序范围服务的内部可观察对象,因为此服务永远不会被销毁,除非您的整个应用程序被销毁,否则没有真正的理由取消订阅它,也不存在内存泄漏的可能性。

    注意:关于作用域服务,即组件提供者,当组件被销毁时,它们也被销毁。在这种情况下,如果我们订阅此提供程序内的任何可观察对象,我们应该考虑使用 OnDestroy 生命周期挂钩取消订阅,根据文档,该挂钩将在服务被销毁时调用。
  • 使用抽象技术来避免可能因取消订阅而导致的任何代码混乱。您可以使用 takeUntil (3) or you can use this npm package mentioned at (4) The easiest way to unsubscribe from Observables in Angular.
  • 管理您的订阅
  • 始终取消订阅 FormGroup 可观察对象,例如 form.valueChangesform.statusChanges
  • 始终取消订阅 Renderer2 服务的可观察对象,例如 renderer2.listen
  • 取消订阅所有其他可观察对象作为内存泄漏保护步骤,直到 Angular 文档明确告诉我们哪些可观察对象不需要取消订阅(检查问题:(5) Documentation for RxJS Unsubscribing (Open))。
  • 奖励:始终使用 Angular 方法来绑定事件,例如 HostListener,因为 angular 非常关心在需要时删除事件侦听器,并防止由于事件绑定而导致的任何潜在内存泄漏.

最后一个不错的提示:如果您不知道是否自动 unsubscribed/completed 观察对象,请添加一个 complete 回调到 subscribe(...) 并检查它是否在组件被销毁时被调用。

如果需要取消订阅,可以使用以下用于可观察管道方法的运算符

import { Observable, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { OnDestroy } from '@angular/core';

export const takeUntilDestroyed = (componentInstance: OnDestroy) => <T>(observable: Observable<T>) => {
  const subjectPropertyName = '__takeUntilDestroySubject__';
  const originalOnDestroy = componentInstance.ngOnDestroy;
  const componentSubject = componentInstance[subjectPropertyName] as Subject<any> || new Subject();

  componentInstance.ngOnDestroy = (...args) => {
    originalOnDestroy.apply(componentInstance, args);
    componentSubject.next(true);
    componentSubject.complete();
  };

  return observable.pipe(takeUntil<T>(componentSubject));
};

可以这样使用:

import { Component, OnDestroy, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Component({ template: '<div></div>' })
export class SomeComponent implements OnInit, OnDestroy {

  ngOnInit(): void {
    const observable = Observable.create(observer => {
      observer.next('Hello');
    });

    observable
      .pipe(takeUntilDestroyed(this))
      .subscribe(val => console.log(val));
  }

  ngOnDestroy(): void {
  }
}

算子包装了组件的ngOnDestroy方法

重要:操作符应该是可观察管道中的最后一个。

在 SPA 应用程序中 ngOnDestroy 函数(angular 生命周期)对于每个 subscribe 你需要 取消订阅 它。 advantage => 防止状态变得太重。

例如: 在组件 1 中:

import {UserService} from './user.service';

private user = {name: 'test', id: 1}

constructor(public userService: UserService) {
    this.userService.onUserChange.next(this.user);
}

在役:

import {BehaviorSubject} from 'rxjs/BehaviorSubject';

public onUserChange: BehaviorSubject<any> = new BehaviorSubject({});

在组件 2 中:

import {Subscription} from 'rxjs/Subscription';
import {UserService} from './user.service';

private onUserChange: Subscription;

constructor(public userService: UserService) {
    this.onUserChange = this.userService.onUserChange.subscribe(user => {
        console.log(user);
    });
}

public ngOnDestroy(): void {
    // note: Here you have to be sure to unsubscribe to the subscribe item!
    this.onUserChange.unsubscribe();
}

为了处理订阅,我使用 "Unsubscriber" class。

这是退订者Class。

export class Unsubscriber implements OnDestroy {
  private subscriptions: Subscription[] = [];

  addSubscription(subscription: Subscription | Subscription[]) {
    if (Array.isArray(subscription)) {
      this.subscriptions.push(...subscription);
    } else {
      this.subscriptions.push(subscription);
    }
  }

  unsubscribe() {
    this.subscriptions
      .filter(subscription => subscription)
      .forEach(subscription => {
        subscription.unsubscribe();
      });
  }

  ngOnDestroy() {
    this.unsubscribe();
  }
}

并且您可以在任何组件/服务/效果等中使用此 class

示例:

class SampleComponent extends Unsubscriber {
    constructor () {
        super();
    }

    this.addSubscription(subscription);
}

您可以使用最新的 Subscription class 取消订阅 Observable,代码不那么混乱。

我们可以用 normal variable 来做到这一点,但它会在每个新订阅上 override the last subscription 所以避免这种情况,当你处理更多数量的 Obseravable 时,这种方法非常有用,并且Obeservable 的类型,例如 BehavoiurSubject and Subject

订阅

Represents a disposable resource, such as the execution of an Observable. A Subscription has one important method, unsubscribe, that takes no argument and just disposes the resource held by the subscription.

您可以通过两种方式使用它,

  • 可以直接推送订阅到订阅数组

     subscriptions:Subscription[] = [];
    
     ngOnInit(): void {
    
       this.subscription.push(this.dataService.getMessageTracker().subscribe((param: any) => {
                //...  
       }));
    
       this.subscription.push(this.dataService.getFileTracker().subscribe((param: any) => {
            //...
        }));
     }
    
     ngOnDestroy(){
        // prevent memory leak when component destroyed
        this.subscriptions.forEach(s => s.unsubscribe());
      }
    
  • 使用 add()Subscription

    subscriptions = new Subscription();
    
    this.subscriptions.add(subscribeOne);
    this.subscriptions.add(subscribeTwo);
    
    ngOnDestroy() {
      this.subscriptions.unsubscribe();
    }
    

A Subscription 可以保留子订阅并安全地取消订阅。此方法处理可能的错误(例如,如果任何子订阅为空)。

希望这对您有所帮助..:)

SubSink 包,一个简单且一致的退订解决方案

因为没有人提到过,我想推荐 Ward Bell 创建的 Subsink 包:https://github.com/wardbell/subsink#readme

我一直在一个项目中使用它,我们是几个开发人员都在使用它。在任何情况下都采用一致的方法很有帮助。

对于像 AsyncSubject 这样在发出结果后直接完成的可观察对象,或者例如来自 http 请求的可观察对象,您不需要取消订阅。 为那些调用 unsubscribe() 并没有什么坏处,但是如果可观察到的是 closed 取消订阅方法 will simply not do anything:

if (this.closed) {
  return;
}

当您有长期存在的可观察对象随着时间的推移发出多个值时(例如 BehaviorSubjectReplaySubject),您需要取消订阅以防止内存泄漏。

您可以轻松地创建一个可观察对象,该可观察对象在使用管道运算符从如此长寿的可观察对象发出结果后直接完成。 在这里的一些答案中提到了 take(1) 管道。但我更喜欢the first() pipe。与 take(1) 的区别在于它将:

deliver an EmptyError to the Observer's error callback if the Observable completes before any next notification was sent.

第一个管道的另一个优点是您可以传递一个谓词来帮助您 return 满足特定条件的第一个值:

const predicate = (result: any) => { 
  // check value and return true if it is the result that satisfies your needs
  return true;
}
observable.pipe(first(predicate)).subscribe(observer);

First 将在发出第一个值(或传递给函数参数时满足谓词的第一个值)后直接完成,因此无需取消订阅。

有时您不确定自己是否拥有长寿命的可观察对象。我并不是说这是很好的做法,但您可以随时添加 first 管道以确保您不需要手动取消订阅。在一个只会发出一个值的可观察对象上添加一个额外的 first 管道并没有什么坏处。

在开发过程中,您可以使用 the single pipe 如果源可观察对象发出多个事件,它将失败。这可以帮助您探索可观察对象的类型以及是否有必要取消订阅。

observable.pipe(single()).subscribe(observer);

firstsingle 看起来非常相似,两个管道都可以采用可选的谓词,但差异很重要,并且在 :

中得到了很好的总结

First

Will emit as soon as first item appears. Will complete right after that.

Single

Will fail if source observable emits several events.


注意 我参考了官方文档,力求在回答中尽可能准确和完整,但如果缺少重要内容,请发表评论。 ..

--- 更新Angular 9 和 Rxjs 6 解决方案

  1. 在 Angular 组件的 ngDestroy 生命周期中使用 unsubscribe
class SampleComponent implements OnInit, OnDestroy {
  private subscriptions: Subscription;
  private sampleObservable$: Observable<any>;

  constructor () {}

  ngOnInit(){
    this.subscriptions = this.sampleObservable$.subscribe( ... );
  }

  ngOnDestroy() {
    this.subscriptions.unsubscribe();
  }
}
  1. 在 Rxjs 中使用 takeUntil
class SampleComponent implements OnInit, OnDestroy {
  private unsubscribe$: new Subject<void>;
  private sampleObservable$: Observable<any>;

  constructor () {}

  ngOnInit(){
    this.subscriptions = this.sampleObservable$
    .pipe(takeUntil(this.unsubscribe$))
    .subscribe( ... );
  }

  ngOnDestroy() {
    this.unsubscribe$.next();
    this.unsubscribe$.complete();
  }
}
  1. 对于您在 ngOnInit 调用的某些操作,它只在组件初始化时发生一次。
class SampleComponent implements OnInit {

  private sampleObservable$: Observable<any>;

  constructor () {}

  ngOnInit(){
    this.subscriptions = this.sampleObservable$
    .pipe(take(1))
    .subscribe( ... );
  }
}

我们还有 async 管道。但是,这个在模板上使用(不在 Angular 组件中)。

订阅本质上只有一个 unsubscribe() 函数来释放资源或取消 Observable 的执行。 在 Angular 中,我们必须在组件被销毁时取消订阅 Observable。幸运的是,Angular 有一个在销毁组件之前调用的 ngOnDestroy 挂钩,这使开发人员能够在此处提供清理人员,以避免挂起订阅、打开门户以及将来可能出现的问题来咬我们背面

@Component({...})
export class AppComponent implements OnInit, OnDestroy {
    subscription: Subscription 
    ngOnInit () {
        var observable = Rx.Observable.interval(1000);
        this.subscription = observable.subscribe(x => console.log(x));
    }
    ngOnDestroy() {
        this.subscription.unsubscribe()
    }
}

我们将 ngOnDestroy 添加到我们的 AppCompoennt 并在 this.subscription Observable

上调用了取消订阅方法

如果有多个订阅:

@Component({...})
export class AppComponent implements OnInit, OnDestroy {
    subscription1$: Subscription
    subscription2$: Subscription 
    ngOnInit () {
        var observable1$ = Rx.Observable.interval(1000);
        var observable2$ = Rx.Observable.interval(400);
        this.subscription1$ = observable.subscribe(x => console.log("From interval 1000" x));
        this.subscription2$ = observable.subscribe(x => console.log("From interval 400" x));
    }
    ngOnDestroy() {
        this.subscription1$.unsubscribe()
        this.subscription2$.unsubscribe()
    }
}

就我而言,我使用的是@seanwright 提出的解决方案的变体:
https://github.com/NetanelBasal/ngx-take-until-destroy

这是ngx-rocket / starter-kit project. You can access it here until-destroyed.ts

中使用的文件

组件看起来像这样

/**
 * RxJS operator that unsubscribe from observables on destory.
 * Code forked from https://github.com/NetanelBasal/ngx-take-until-destroy
 *
 * IMPORTANT: Add the `untilDestroyed` operator as the last one to
 * prevent leaks with intermediate observables in the
 * operator chain.
 *
 * @param instance The parent Angular component or object instance.
 * @param destroyMethodName The method to hook on (default: 'ngOnDestroy').
 */
import { untilDestroyed } from '../../core/until-destroyed';

@Component({
  selector: 'app-example',
  templateUrl: './example.component.html'
})
export class ExampleComponent implements OnInit, OnDestroy {

  ngOnInit() {
    interval(1000)
        .pipe(untilDestroyed(this))
        .subscribe(val => console.log(val));

    // ...
  }


  // This method must be present, even if empty.
  ngOnDestroy() {
    // To protect you, an error will be thrown if it doesn't exist.
  }
}

这里有很多很棒的答案...

让我添加另一个选择:

import { interval    } from "rxjs";
import { takeUntil   } from "rxjs/operators";
import { Component   } from "@angular/core";
import { Destroyable } from "@bespunky/angular-zen/core";

@Component({
    selector: 'app-no-leak-demo',
    template: ' Destroyable component rendered. Unload me and watch me cleanup...'
})
export class NoLeakComponent extends Destroyable
{
    constructor()
    {
        super();

        this.subscribeToInterval();
    }

    private subscribeToInterval(): void
    {
        const value    = interval(1000);
        const observer = {
            next    : value => console.log(` Destroyable: ${value}`),
            complete: ()    => console.log(' Observable completed.')
        };

        // ==== Comment one and uncomment the other to see the difference ====
        
        // Subscribe using the inherited subscribe method
         this.subscribe(value, observer);

        // ... or pipe-in the inherited destroyed subject
        //value.pipe(takeUntil(this.destroyed)).subscribe(observer);
    }
}

Live Example

这里发生了什么

component/service 扩展了 Destroyable(来自名为 @bespunky/angular-zen 的库)。

class 现在可以简单地使用 this.subscribe()takeUntil(this.destroyed),无需任何额外的样板代码。

To install the library use:
> npm install @bespunky/angular-zen

这是我对这个问题的看法,让我的生活变得简单我选择了在组件被销毁时取消订阅的手动方式。

为此我创建了一个 class 命名的 Subscriptor,它主要包含静态成员,即:

  • 一个私有变量订阅 - 包含所有提供的订阅
  • 订阅 setter - 将每个新订阅推送到订阅数组
  • 取消订阅方法 - 取消订阅订阅数组包含的每个订阅(如果已定义),并清空订阅数组

subscriptor.ts

import { Subscription } from "rxjs";

export class Subscriptor {
    private static subscriptions: Subscription[] = [];

    static set subscription(subscription: Subscription) {
        Subscriptor.subscriptions.push(subscription);
    }

    static unsubscribe() {
        Subscriptor.subscriptions.forEach(subscription => subscription ? subscription.unsubscribe() : 0);
        Subscriptor.subscriptions = [];
    }
}

一个组件内部的用法如下:

当您想订阅任何服务时,只需将订阅放在订阅者的setter。

ngOnInit(): void {
    Subscriptor.subscription = this.userService.getAll().subscribe(users => this.users = users);
    Subscriptor.subscription = this.categoryService.getAll().subscribe(categories => this.categories = categories);
    Subscriptor.subscription = this.postService.getAll().subscribe(posts => this.posts = posts);
}

当您想退订任何服务时,只需调用Subscriptor的退订方法即可。

ngOnDestroy(): void {
    Subscriptor.unsubscribe();
}

出于性能原因,始终建议取消订阅您的可观察订阅以避免内存泄漏,并且有不同的方法可以做到这一点,

顺便看了大部分的回答,没找到说async管子的人,推荐Rxjs 模式与 Angular 应用程序,因为它提供自动订阅和离开将被销毁的组件时的订阅:

请找到一个如何实施的例子

app.compoennt.ts:

import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

import { BookService } from './book.service';
import { Book } from './book';

@Component({
   selector: 'app-observable',
   templateUrl: './observable.component.html'
})
export class AppComponent implements OnInit { 
   books$: Observable<Book[]>
   constructor(private bookService: BookService) { }
   ngOnInit(): void {
        this.books$ = this.bookService.getBooksWithObservable();
   }
} 

app.compoennt.html:

<h3>AsyncPipe with Promise using NgFor</h3>
<ul>
  <li *ngFor="let book of books$ | async" >
    Id: {{book?.id}}, Name: {{book?.name}}
  </li>
</ul>

DisposeBag

这个想法受到了 RxSwift 的 DisposeBag 的启发,所以我决定开发一个类似但简单的结构。

DisposeBag 是一种数据结构,它包含对所有打开的订阅的引用。它有助于处理我们组件中的订阅,同时为我们提供 API 来跟踪打开订阅的状态。

优势

非常简单API,让你的代码看起来简单又小巧。 提供 API 用于跟踪打开订阅的状态(允许您显示不确定的进度条) 无依赖性 injections/packages.

用法

在组件中:

@Component({
  selector: 'some-component',
  templateUrl: './some-component.component.html',
  changeDetection: ChangeDetectionStrategy.OnPush
})
export class SomeComponent implements OnInit, OnDestroy {

  public bag = new DisposeBag()
  
  constructor(private _change: ChangeDetectorRef) {
  }

  ngOnInit(): void {

    // an observable that takes some time to finish such as an api call.
    const aSimpleObservable = of(0).pipe(delay(5000))

    // this identifier allows us to track the progress for this specific subscription (very useful in template)
    this.bag.subscribe("submission", aSimpleObservable, () => { 
      this._change.markForCheck() // trigger UI change
     })
  }

  ngOnDestroy(): void {
    // never forget to add this line.
    this.bag.destroy()
  }
}

在模板中:


<!-- will be shown as long as the submission subscription is open -->
<span *ngIf="bag.inProgress('submission')">submission in progress</span>

<!-- will be shown as long as there's an open subscription in the bag  -->
<span *ngIf="bag.hasInProgress">some subscriptions are still in progress</span>

实施

import { Observable, Observer, Subject, Subscription, takeUntil } from "rxjs";


/**
 * This class facilitates the disposal of the subscription in our components.
 * instead of creating _unsubscribeAll and lots of boilerplates to create different variables for Subscriptions; 
 * you can just easily use subscribe(someStringIdentifier, observable, observer). then you can use bag.inProgress() with
 * the same someStringIdentifier on you html or elsewhere to determine the state of the ongoing subscription.
 *
 *  don't forget to add onDestroy() { this.bag.destroy() }
 * 
 *  Author: Hamidreza Vakilian (hvakilian1@gmail.com)
 * @export
 * @class DisposeBag
 */
export class DisposeBag {
    private _unsubscribeAll: Subject<any> = new Subject<any>();

    private subscriptions = new Map<string, Subscription>()


    /**
     * this method automatically adds takeUntil to your observable, adds it to a private map.
     * this method enables inProgress to work. don't forget to add onDestroy() { this.bag.destroy() }
     *
     * @template T
     * @param {string} id
     * @param {Observable<T>} obs
     * @param {Partial<Observer<T>>} observer
     * @return {*}  {Subscription}
     * @memberof DisposeBag
     */
    public subscribe<T>(id: string, obs: Observable<T>, observer: Partial<Observer<T>> | ((value: T) => void)): Subscription {
        if (id.isEmpty()) {
            throw new Error('disposable.subscribe is called with invalid id')
        }
        if (!obs) {
            throw new Error('disposable.subscribe is called with an invalid observable')
        }

        /* handle the observer */
        let subs: Subscription
        if (typeof observer === 'function') {
            subs = obs.pipe(takeUntil(this._unsubscribeAll)).subscribe(observer)
        } else if (typeof observer === 'object') {
            subs = obs.pipe(takeUntil(this._unsubscribeAll)).subscribe(observer)
        } else {
            throw new Error('disposable.subscribe is called with an invalid observer')
        }

        /* unsubscribe from the last possible subscription if in progress. */
        let possibleSubs = this.subscriptions.get(id)
        if (possibleSubs && !possibleSubs.closed) {
            console.info(`Disposebag: a subscription with id=${id} was disposed and replaced.`)
            possibleSubs.unsubscribe()
        }

        /* store the reference in the map */
        this.subscriptions.set(id, subs)

        return subs
    }


    /**
     * Returns true if any of the registered subscriptions is in progress.
     *
     * @readonly
     * @type {boolean}
     * @memberof DisposeBag
     */
    public get hasInProgress(): boolean {
        return Array.from(this.subscriptions.values()).reduce(
            (prev, current: Subscription) => { 
                return prev || !current.closed }
            , false)
    }

    /**
     * call this from your template or elsewhere to determine the state of each subscription.
     *
     * @param {string} id
     * @return {*} 
     * @memberof DisposeBag
     */
    public inProgress(id: string) {
        let possibleSubs = this.subscriptions.get(id)
        if (possibleSubs) {
            return !possibleSubs.closed
        } else {
            return false
        }
    }


    /**
     * Never forget to call this method in your onDestroy() method of your components.
     *
     * @memberof DisposeBag
     */
    public destroy() {
        this._unsubscribeAll.next(null);
        this._unsubscribeAll.complete();
    }
}