当 observable 满足条件时,RxJS 停止 foreach

RxJS stop foreach when observable meets condition

我有一组对象。我必须通过 API 调用来检查每个对象,以查看该对象是否符合特定促销条件。 继续使用对象调用 API 的最佳方法是什么,直到最后一个对象被调用,其中 observable 应该 return false,或者其中一个对象从 API 获得 true 状态?

目前我有这个作为代码,但感觉应该有更好的方法来使用 RxJS 运算符。

checkProductForPromo(items: []) {
   const promoChecker$ = new EventEmitter<boolean>();

   items.forEach((item, key, arr) => {
        // This does a HTTP callback to the API
        this.ApiService.getProductInformation(item).subscribe(
            (product) => {
                // Based on some properties this will return true or false of the product is eligible for the promo.
                if (product.isEligibleForPromo()) {
                    promoChecker$.emit(true);
                } else if (Object.is(arr.length - 1, key)) {
                    promoChecker$.emit(false);
                }
            }
        );
    });

    return promoChecker$;
}

您可以使用 from 运算符创建一个 Observable:

checkItemsForPromo(items: any[]) {
  return from(items).pipe(
    concatMap(item => this.ApiService.getProductInformation(item)),
    map(product => !!product.isEligibleForPromo()),
    takeWhile(isEligible => !isEligible, true)    
  );
}

这个 Observable 按顺序为每个项目调用 API。它会等待上一个请求完成,然后再发送下一个请求。 map 运算符根据需要将发射设置为 truefalse

takeWhile 意味着 Observable 将继续发射 false 直到出现 true 值,此时它将完成。传递给 takeWhiletrue 参数实际上是包含标志,这意味着 Observable 在完成之前发出单个 true 值。

如果您只希望它在完成时发出 false 一次,而不是针对每个不合格的产品,请尝试将 distinctUntilChanged() 添加到 pipe.

不要忘记订阅此方法返回的 Observable,并在必要时取消订阅。