Angular 组件中的 RxJS 订阅

RxJS subscription in Angular component

我不完全明白我需要在哪里以及如何在 Angular 组件中声明可观察对象/主题。

目前我开发了一个与 MovieDB 交互的网站 API 并且一切正常,但同时我明白我的代码很糟糕,因为在销毁组件后没有清理订阅,但要进行这些清理,我至少需要了解如何正确使用 RxJS。

我认为我的用法不正确,因为我在与页面的每次交互中都有新的订阅。另外据我了解,它们需要在构造函数中声明。

这个页面的想法是有一个输入表单,用户可以在其中键入查询并选中单选按钮以选择要搜索的内容:'tv' 或 'movies'。当有搜索结果时,会出现一个按钮来展开结果。

所以,这是代码:

import {Component, OnDestroy} from '@angular/core';
import {ISearchParams} from '../../models/search-params.interface';
import {ShowService} from '../../services/show.service';
import {IResultsIds} from '../../models/results.interface';
import {distinctUntilChanged} from 'rxjs/operators';
import {Subscription} from 'rxjs';

@Component({
  selector: 'app-search',
  templateUrl: './search-tab.component.html',
  styleUrls: ['./search-tab.component.scss']
})
export class SearchTabComponent implements OnDestroy {

  searchParams!: ISearchParams;

  searchResults!: IResultsIds;

  searchSub!: Subscription;
  showMoreSub!: Subscription;

  constructor(private movieService: ShowService) {
  }

  search(searchParams: ISearchParams): void {
    this.searchParams = searchParams;

    this.searchSub = this.movieService.search(searchParams)
      .pipe(distinctUntilChanged())
      .subscribe(results => {
        this.searchResults = results;
      });
  }

  showMore(): void {
    if (!this.isFinished()) {
      this.searchParams.page++;

      this.showMoreSub = this.movieService.search(this.searchParams)
        .subscribe(results => {
          this.searchResults!.ids.push(...results.ids);
        });
    }
  }

  isFinished = () => this.searchParams.page >= this.searchResults!.total_pages;

  ngOnDestroy(): void {
    // this.searchSub.unsubscribe();
    // this.showMoreSub.unsubscribe();
  }
}

和 HTML:

<main class="main-content search-container">
  <app-search-form (searchParams)="search($event)"></app-search-form>
  <div class="results" *ngIf="searchResults">
    <app-show-description *ngFor="let id of searchResults.ids"
                          [showType]="searchParams.type"
                          [showId]="id"></app-show-description>
  </div>
  <button *ngIf="searchResults && !isFinished()"
          (click)="showMore()"
          class="load-btn more-btn">show more...</button>
</main>

如果你能帮助我并告诉我哪里有错误,我将不胜感激。再过一段时间,一切都是这样,但我想了解 RxJS 的用法。

UPD @H3AR7B3A7 非常感谢,你让我的知识变得清晰了一点! 但现在我又有了一个误解:如何将 2 个数组 observables 转换为一个?我用谷歌搜索但找不到问题的解决方案:我有加载更多电影的功能 - 它必须扩展 distinctMovies$ 数组,它看起来就像一个数字 (ids) 数组,但我不能加入两个数组,我得到的只是 [...],[...] 但不是 [......]

我试过这个:

showMore(): void {
    this.searchParams.page++;

    this.distinctMovies$ = concat(this.distinctMovies$,
      this.movieService.search(this.searchParams)
        .pipe(pluck('ids')))
      .pipe(tap(console.log));
  }

乍一看,您可以在此处进行一些改进。

关于退订

有几种模式被认为是对此的“良好做法”。其中之一是:

// Define destroy subject
readonly destroy$ = new Subject<any>();

// On each subscription add
.pipe(takeUntil(this.destroy$))

// And just emit value to destroy$ subject in onDestroy hook
// And all streams that had takeUntil will be ended
ngOnDestroy(): void {
  this.destroy$.next();
  this.destroy$.complete();
}

关于搜索

当您处理异步请求(例如 API 调用)时,您必须考虑如果您多次单击“搜索”按钮会发生什么。会有几个 API 电话,对吧?但我们不能 100% 确定服务器响应的顺序。

如果您点击例如:

点击 1,点击 2,点击 3,点击 4...将产生 APICall1,APICall2,APICall3,APICALL4.. .那将是那个顺序......但是来自服务器的响应可以是其他顺序(是的那是可能的),而且如果你毫不拖延地点击几次,那么你的服务器此时会收到很多请求但你可能会只需要最后一个。

因此,一种常见的解决方案是让一个流正在侦听例如搜索更改:

searchTerm = new Subject();

ngOnInit() {

  this.searchTerms
    .pipe(
      distinctUntilChanged(),
      // Debounce for 250 miliseconds (avoid flooding backend server with too many requests in a short period of time)
      debounceTime(250),
      // Call api calls
      switchMap(terms => {
        return this.movieService.serach(terms);
      })),
      // Unsubsribe when onDestroy is triggered
      takeUntil(this.destroy$),
    .subscribe(results => {
         // Push/set results to this.searchResults
         if (this.addingMore) {
         //  push in this.searchResults
         } else {
            this.searchResults = results;
         }

         this.addingMore = false;
    });

}

search(searchTerms) {
  this.searchParams = searchParams;
  this.searchTerms.next(searchTerms);
}

showMore(): void {
  if (this.isFinished() return;

  this.searchParams.page++;

  // You can for example set a flag to know if its search or loadMore
  this.addingMore = true;
 
  this.searchTerms.next(searchTerms);
}

看来你了解我的基本知识。

不过,您不需要在构造函数中声明任何内容。

Also as I understand they need to be declared in constructor.

您通常只使用构造函数来注入服务,就像您已经在做的那样。

您可能想使用 ngOnInit() 方法来声明组件的初始状态:

export class SearchTabComponent implements OnInit {
  distinctMovies$!: Observable<Movie[]>

  ngOninit(): void {
    //...
  }
}

您可以通过从不订阅您的组件代码来解决很多问题。这样你就不必在 OnDestroy 中取消订阅...

例如(在你的onInit中):

this.distinctMovies$ = this.movieService.search(searchParams).pipe(distinctUntilChanged())

并且在模板中只使用异步管道:

*ngFor = "let movie of distinctMovies$ | async"

除了不必取消订阅之外,您还可以通过使用异步管道而不是订阅来使用 OnPush ChangeDetectionStrategy。