rxjs - 过滤后结合内部可观察量

rxjs - combining inner observables after filtering

我调用后端响应:

[
  "https://some-url.com/someData1.json",
  "https://some-url.com/someData2.json"
]

每个 JSON 可以有以下架构:

{
  "isValid": boolean,
  "data": string
}

我想获取包含所有数据的数组,其中 isValid 设置为 true

backend.get(url)
    .pipe(
        mergeMap((urls: []) =>
            urls.map((url: string) =>
                backend.get(url)
                    .pipe(
                        filter(response => response.isValid),
                        map(response => response.data)
                    )
            )
        ),
        combineAll()
    )

当两个 .json 都将 "isValid" 设置为 true 时,我得到包含两个数据的数组。 但是当其中之一 "isValid" 设置为 false observable 永远不会完成。

我可以使用 mergeAll 而不是 combineAll,但是我收到的是单个数据流,而不是所有数据的集合。

有没有更好的方法过滤掉observable?

如您所说,内部可观察对象永远不会发出,因为 filter 不会转发 backend.get 可观察对象曾经发出的唯一值。在这种情况下,订阅该可观察对象的运算符 - 在您的情况下 combineAll - 也永远不会收到任何值并且永远不会发出自己。

我要做的就是通过提供项目功能将过滤和映射移动到 combineAll,例如:

backend.get(url)
    .pipe(
        mergeMap((urls: string[]) =>
            urls.map((url: string) => backend.get(url))
        ),
        combineAll(responses =>
            responses
                .filter(response => response.isValid)
                .map(response => response.data)
        )
    )

看看是否适合你 ;)

import { forkJoin, Observable } from 'rxjs';
import { map } from 'rxjs/operators';

interface IRes {
  isValid: boolean;
  data: string;
}

interface IResValid {
  isValid: true;
  data: string;
}

function isValid(data: IRes): data is IResValid {
  return data.isValid;
}

const res1$: Observable<IRes> = backend.get(url1);
const res2$: Observable<IRes> = backend.get(url2);

// When all observables complete, emit the last emitted value from each.
forkJoin([res1$, res2$])
  .pipe(map((results: IRes[]) => results.filter(isValid)))
  .subscribe((results: IResValid[]) => console.log(results));