使用 Rxjs 的尾递归可观察返回函数

Tail recursive observable returning function with Rxjs

假设我有一个服务,其方法可以 ping url 直到满足特定条件。之后是 return 结果。整个事情应该用 Observables 构建(可以很容易地从外部取消,所以没有 Promises 之类的递归)。

我试过这个,我希望每秒调用一次,直到满足条件,然后 return 结果。

getData(): Observable<MyData> {
  return this.apiService.get('/url').pipe(
    map((res) => {
      if (...certain condition concerning 'res') {
        return res;
      }
      return timer(1000).pipe(
        map(() => this.getData())
      );
    })
  );
}

如果我像这样调用它,虽然 apiService 只被调用一次并且订阅方法中的结果立即被解析并且是另一个 Observable。我正在做的事情肯定有问题,但无法弄清楚是什么,可能是在 return 延迟递归调用时。

getData().subscribe(res => {
  // res is an Observable, I expected an MyData after a delay of at least 1 second
  // as the condition is guaranteed to fail the first time (for testing purposes)
})

使用 RxJs 递归通常意味着使用 expand 运算符。

在你的情况下你可以尝试这样的事情

// condition is a function that returns true if a certain condition on res is met, otherwise false
const condition = (res) => {
   if (...certain condition concerning 'res') {
        return true;
      } else {
        return false
      }
};

function getData() {
  return this.apiService.get('/url').pipe(
    // If the condition is met the Observable stream is completed with no value notified
   // if instead the condition in not met, then the API call is performed again
    expand((res) => {
      return condition(res) ? EMPTY : this.apiService.get('/url').pipe(delay(1000));
    }),
    // this filter condition is optional and its use is to avoid notifying values
    // until the expected one is received
    filter(condition)
  );
}

Here a stackblitz 模拟了这个逻辑。