如何将 Subjects 和 Observables 与 Angular rxjs 结合起来

How to combine Subjects and Observables with Angular rxjs

我有以下两个订阅:

this.service1.source1.subscribe(countries => {
  this.filteredData =  [];
  countries.forEach(country => {
    this.filteredData = this.data.filter(user => {
      return user.country == country;
    });
  });
});

this.service2.source2.subscribe(companies => {
  companies.forEach(company => {
    this.filteredData =  [];
    this.filteredData = this.data.filter(user => {
      return user.company == company;
    });
  });
});

其中 source1 和 source2 是 BehaviorSubject,并且两个事件都被 subscribe 捕获。

我想合并两个订阅,因为我想根据 subscribe

返回的两个结果过滤数据

我尝试了 forkJoinzip

zip([this.service1.source1, this.service1.source2]).subscribe(results => {
  console.log('zip results:', results);
});

forkJoin([this.service1.source1, this.service1.source2]).subscribe(results => {
  console.log('forkJoin results:', results);
});

但是对于那些(zip 和 forkJoin)我注意到我什至没有在控制台上得到任何东西,就好像 subscribe 在发出事件时没有被执行。

如何将两个来源的结果合并为一个订阅?

语法错误。 forkJoin and zip 都接受 Observables 作为参数。不是一个 Observable 数组。从通话中删除 [],您应该没问题。

试试这个:

zip(this.service1.source1, this.service1.source2).subscribe(results => {
  console.log('zip results:', results);
});

forkJoin(this.service1.source1, this.service1.source2).subscribe(results => {
  console.log('forkJoin results:', results);
});

forkJoin 可能不适合您的用例,因为它 在所有可观察对象完成时从每个对象发出最后一个发出的值。

zip 也可能不会给你想要的行为,因为 它等待所有输入流都发出它们的 n-th 值,一旦满足这个条件,它结合了所有这些 n-th 值并发出 n-th 组合值。

因此,在任何一种情况下,只有在两个可观察量都发生发射之前,您才会获得发射。因为 this.service1.source1this.service1.source2BehaviorSubject 使用 zip 保证初始发射。但只有当两个 observable 都发出时才会发生后来的发射。

我建议使用 combineLatest 因为 每当任何输入流发出一个值时,它都会结合每个输入流发出的最新值。

combineLatest(this.service1.source1, this.service1.source2).subscribe(results => {
  console.log('combineLatest results:', results);
});

并且由于 this.service1.source1this.service1.source2BehaviorSubject 这样:

A variant of Subject that requires an initial value and emits its current value whenever it is subscribed to.

保证您在订阅它时以及任何可观察对象发出值时都会获得发射。

正如@ysf 上面指出的那样,如果你想在任何一个发射时用两个结果过滤本地数据,你应该选择 combineLatest.

此外,我建议您在每个服务中创建一个变量作为 BehaviorSubject 的 Observable 实例。喜欢:

// at Service1.service.ts
export class Service1Service {
  // Do not expose Subject directly.
  private source1 = new BehaviorSubject<YourDataType[]>(YourInitialValue);
  private source2 = new BehaviorSubject<YourDataType[]>(YourInitialValue);

  source1$: Observable<YourDataType[]>;
  source2$: Observable<YourDataType[]>;

  constructor() {
    // Create an Observable instance for your sake and open it to the public.
    this.source1$ = this.source1.asObservable();
    this.source2$ = this.source2.asObservable();
  }
}


// at wherever you subscribe
combineLatest(this.service1.source1$, this.service1.source2$).pipe(
  filter(([res1, res2]: YourDataType[]) => {
    // Filter your data with res1, res2
  })
).subscribe();