如何在某些可观察对象中将 combineLatest 与过滤器一起使用?

How to use combineLatest with filter in certain observable?

这是对复杂情况的简化,如果值无效,可以过滤数组中的某些可观察值。问题是过滤后的可观察对象不允许另一个完成合并。什么运营商或方法可以处理这种情况,允许订阅中的有效数据记录?

// RxJS v6+
import { fromEvent, combineLatest, of } from 'rxjs';
import { mapTo, startWith, scan, tap, map, filter } from 'rxjs/operators';

const userData$ = [
   of({ name: 'Joseph', age: 23}), 
   of({ name: 'Mario', age: 33}), 
   of({ name: 'Robert', age: 24}), 
   of({ name: 'Alonso', age: 25})
];

const names = ['Joseph', 'Mario', 'Robert', 'Alonso'];

combineLatest(
  names.map((name, i) => {
     return userData$[i].pipe(
         map(({name, age})=> { return{ name, age: age * 2} }),
         filter(({age}) => age < 67),
         map(({name, age})=> { return{ name: name.toLocaleUpperCase(), age} }),
     )
 })
)
   .pipe(
     tap(console.log),
   )
   .subscribe();

Sample in stackblitz

如果我们将值更改为 67,则所有 observables 都会显示数据。

您可以将 combineLatest 替换为 from,因为如果流数组中的任何一项不发出

combineLatest 将不会发出
const userData$ = [
  { name: 'Joseph', age: 23 },
  { name: 'Mario', age: 33 },
  { name: 'Robert', age: 24 },
  { name: 'Alonso', age: 25 }
];

const names = ['Joseph', 'Mario', 'Robert', 'Alonso'];

from(
  userData$
)
  .pipe(
    map(({ name, age }) => { return { name, age: age * 2 } }),
    filter(({ age }) => age < 66),
    map(({ name, age }) => { return { name: name.toLocaleUpperCase(), age } }),
    tap(console.log),
  )
  .subscribe();

combineLatest 的一个典型问题是它要求所有源 Observable 至少发射一次,所以如果你使用 filter 丢弃它的唯一值,那么 combineLatest 将永远不会发射任何东西.

一个简单的解决方案是确保它始终发出 defaultIfEmpty:

combineLatest(
  names.map((name, i) => {
    return userData$[i].pipe(
      map(({name, age})=> { return { name, age: age * 2} }),
      filter(({age}) => age < 66),
      map(({name, age})=> { return { name: name.toLocaleUpperCase(), age} }),
      defaultIfEmpty(null),
    )
  })
)

现场演示:https://stackblitz.com/edit/typescript-rsffbs?file=index.ts

如果您的实际用例使用其他源 Observable 而不是 of() 并且不能立即完成,您可能需要使用 startWith