服务中的 RXJS 订阅链

RXJS Subscription Chaining in Services

我有两个服务和一个组件。组件监听服务 A,服务 A 监听服务 B。我目前正在服务 A 中订阅服务 B,运行 过滤函数,然后过滤函数触发组件监听的服务 A 中的可观察对象。最佳做法是不从服务 A 订阅服务 B,而是将该可观察对象传递给将成为唯一侦听器的组件,但我不知道如何正确处理它。

当前设置:

当用户更改类型时,服务 B 中的订阅会触发服务 A 监听的内容,然后获取该数据并创建一个新数组并触发组件监听的 productsChanged 主题

Service A {
  productsChanged = new BehaviorSubject<Product[]>([]);

  types$: Subscription;

  private json: Product[] = ProductsJSON;
  private currentProducts: Product[] = [];

  constructor(private typeService: TypeService) {
    this.types$ = this.typeService.typeChanged.subscribe((type: Type) => {
      this.filterProducts(type);
    });
  }


  filterProducts(type: Type): void {
    this.currentProducts = [];

    // Do Some Filtering to Set the Current Products Based on Given Type
  }
}

当此人更改产品类型时,它会触发 'TypeChanged',对象 A 会监听它以过滤掉与类型不匹配的产品

Service B {
  typeChanged = new BehaviorSubject<Type>(Defaults.TYPE);
  private chosenType: Type;

  constructor() {
    this.typeChanged.pipe(take(1)).subscribe((type) => {
      this.chosenType = type;
    });
  }

  setType(type: Type): void {
    this.chosenType = type;

    this.typeChanged.next(this.chosenType);
  }
}

组件在 HTML 中订阅并仅发布产品列表

Component {
  products$: Observable<any>;

  constructor(private productService: ProductService) {
    this.products$ = this.productService.productsChanged.pipe(
      map((product) => {
        return product;
      }),
    );
  }
}

您可以在服务 A 中进行以下更改以实现此目的。

您的 productsChanged 不应再是 BehaviorSubject,而应是未分配的 属性 类型 Observable,如下所示:

productsChanged: Observable<Product[]>;

然后在构造函数中执行以下操作:

this.productsChanged = this.typeService.typeChanged.pipe(map((type: Type) => {
  return this.filterProducts(type);
}));

最后,您的 filterProducts 方法需要 return 过滤后的产品,所以像这样:

  filterProducts(type: Type): void {
    this.currentProducts = [];

    // Do Some Filtering to Set the Current Products Based on Given Type
    
    return filteredProducts;
  }

我看到有 2 件事可以更改。

  1. 如果您不使用服务 B 中的 chosenType 变量,它可以使您免于在其构造函数中进行额外订阅。由于您已经有一个保存当前类型的变量 typeChanged,因此您可以在需要 chosenType 值的地方使用它。

  2. 在服务 A 中,您可以将 productsChanged BehaviorSubject 替换为一个函数,该函数通过管道传输到服务 B 的 typeChanged 可观察对象,并使用 RxJS 运算符 filter and switchMap along with RxJS function of 到 return 类型为 Products[] 的可观察对象。

服务 A (ProductService)

{
  private json: Product[] = ProductsJSON;
  private currentProducts: Product[] = [];

  constructor(private typeService: TypeService) { }

  productsChanged(): Observable<Products[]> {
    this.currentProducts = [];
    return this.typeService.typeChanged.pipe(
      tap((type: Type) => this.currentProducts = /* something */ ),
      filter((type: Type) => /* filter `type` notifications based on certain conditions */ ),
      switchMap((type: Type) => 
        /* do something and return an observable of type `Products[]` using RxJS `of` function */
      )
    );
  }
}

您可以像现在一样在组件中通过管道连接到它。

组件

{
  products$: Observable<any>;

  constructor(private productService: ProductService) {
    this.products$ = this.productService.productsChanged().pipe(  // <-- function instead of observable
      map((product) => {
        return product;
      }),
    );
  }
}