如何使用 rxjs 有条件地重复承诺

How to repeat a promise conditionally using rxjs

我想重复一个 API 调用,其中 returns 一个 Promise,有条件地使用 rxjs。

API 方法接收一个 ID,该 ID 将在每次调用时通过向其添加计数器前缀来更改。将重复调用,直到数据满足某些条件或计数器达到特定数字 X。如何使用 rxjs 完成?

API方法:

fetchData(id):Promise<data>

尝试 1:fetchData(id)

尝试 2:fetchData(id_1)

尝试 3:fetchData(id_2)

let count = 0
const timerId = setTimout( () =>{
   if(count){
      fetchData(`id_${count}`)
   }else{
      fetchData('id')
   }
   count = count + 1
,60000}) //runs every 60000 milliseconds

const stopTimer = () =>{ //call this to stop timer
    clearTimeout(timerId);
}

您可以尝试 retryWhen:

let counter=0;

const example = of(1).pipe(
  switchMap(x => of(counter)), // Replace of() with from(fetchData('id_'+counter))
  map(val => {
    if (val < 5) {
      counter++;
      // error will be picked up by retryWhen
      throw val;
    }
    return val;
  }),
  retryWhen(errors =>
    errors.pipe(
      // log error message
      tap(val => console.log(`Response was missing something`)),
    )
  )
);

它并不理想,因为它需要在外部范围内使用计数器,但在有更好的解决方案(尤其是没有基于时间的重试)之前,这应该可行。

IMO,最好通过 Promises 或 RxJS 处理轮询,而不要混合使用它们。我将使用 RxJS 进行说明。

尝试以下方法

  1. 使用 RxJS from 函数将承诺转换为可观察对象。
  2. 使用 timer or interval 等 RxJS 函数以固定间隔定期发出值。
  3. 使用像switchMap to map from the outer emission to your API call. Refer 这样的高阶映射运算符来简要描述不同类型的高阶映射运算符。
  4. 使用两个 takeWhile 运算符,分别对应您的每个条件,完成订阅。
  5. 使用filter运算符只转发满足条件的排放。
import { from } from 'rxjs';

fetchData(id: any): Observable<any> {  // <-- return an observable
  return from(apiCall);                // <-- use `from` to convert Promise to Observable
}
import { timer } from 'rxjs';
import { filter, switchMap, takeWhile } from 'rxjs/operators';

timer(0, 5000).pipe(                        // <-- poll every 5 seconds
  takeWhile((index: number) => index < 20)  // <-- stop polling after 20 attempts
  switchMap((index: number) => 
    this.someService.apiCall(index+1)       // <-- first emission from `timer` is 0
  ),
  takeWhile(                                // <-- stop polling when a condition from the response is unmet
    (response: any) => response.someValue !== someOtherValue,
    true                                    // <-- emit the response that failed the test
  ),
  filter((response: any) => 
    response.someValue === someOtherValue   // <-- forward only emissions that pass the condition
  )
).subscribe({
  next: (response: any) => {
    // handle response
  },
  error: (error: any) => {
    // handle error
  }
});

编辑:2nd takeWhile 中的条件与要求相反。我调整了条件并包含了 inclusive=true 参数。感谢评论中的@Siddhant。

我知道您已指定使用 rxjs,但是您还指定 fetchData() return 是 promise 而不是 observable。在这种情况下,我建议使用 asyncawait 而不是 rxjs.

  async retryFetch() {
    let counter = 0;
    while (counter++ < 20 && !this.data) {
      this.data = await this.fetchData(counter);
    }
  }

你可以把你想要的任何东西放在条件句中。

即使您的 api 调用 return 一个可观察对象,我仍然建议将其包装在一个 promise 中并使用这个非常易读的解决方案。

下面的 stackblitz 包装了一个带有承诺的标准 http.get 并实现了上述功能。承诺将随机 return 数据或未定义。

https://stackblitz.com/edit/angular-ivy-rflclt?file=src/app/app.component.ts

您可以使用 concatMap 来确保一次只尝试一个调用。 range 给出了最大调用次数,因为如果条件 is/isn 不满足,takeWhile 将提前取消订阅(在范围完成之前)。

可能看起来像这样:

// the data met some condition
function metCondition(data){
  if(data/*something*/){
    return true;
  } else {
    return false
  }
}

// the counter reach to a specific number X
const x = 30;

range(0, x).pipe(
  concatMap(v => fetchData(`id_${v === 0 ? '' : v}`)),
  takeWhile(v => !metCondition(v))
).subscribe(datum => {
  /* Do something with your data? */
});