RxJs:轮询直到完成间隔或收到正确的数据

RxJs: poll until interval done or correct data received

如何使用 RxJs 在浏览器中执行以下场景:

我想出的中间解决方案:

 Rx.Observable
    .fromPromise(submitJobToQueue(jobData))
    .flatMap(jobQueueData => 
      Rx.Observable
            .interval(1000)
            .delay(5000)
            .map(_ => jobQueueData.jobId)
            .take(55)
    )
    .flatMap(jobId => Rx.Observable.fromPromise(pollQueueForResult(jobId)))
    .filter(result => result.completed)
    .subscribe(
      result => console.log('Result', result),
      error =>  console.log('Error', error)
    );
  1. 有没有没有中间变量的方法,一旦数据到达或发生错误就停止定时器?我现在可以引入新的可观察对象,然后使用 takeUntil
  2. 这里的flatMap用法在语义上是否正确?也许这整件事应该重写而不是与 flatMap ?
  3. 链接

从头开始,您已经得到了变成可观察对象的承诺。一旦这产生一个值,您希望每秒调用一次,直到您收到特定的响应(成功)或直到经过特定的时间。我们可以将这个解释的每个部分映射到一个 Rx 方法:

"Once this yields a value" = map/flatMapflatMap 因为接下来的也是可观察的,我们需要将它们拉平)

"once per second" = interval

"receive a certain response" = filter

"or" = amb

"certain amount of time has passed" = timer

从那里,我们可以像这样拼凑起来:

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .filter(x => x.completed)
      .take(1)
      .map(() => 'Completed')
      .amb(
        Rx.Observable.timer(60000)
          .flatMap(() => Rx.Observable.throw(new Error('Timeout')))
      )
  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  )
;

一旦我们得到初始结果,我们就会将其投射到两个可观察对象之间的竞争中,一个在收到成功响应时产生一个值,另一个在一定时间后产生一个值已经过去了。第二个 flatMap 是因为 .throw 不存在于 observable 实例中,而 Rx.Observable returns 上的方法也是一个 observable 也需要展平。

事实证明 amb / timer 组合实际上可以用 timeout 代替,像这样:

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .filter(x => x.completed)
      .take(1)
      .map(() => 'Completed')
      .timeout(60000, Rx.Observable.throw(new Error('Timeout')))
  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  )
;

我省略了您示例中的 .delay,因为它没有在您想要的逻辑中描述,但它可以简单地适用于此解决方案。

所以,直接回答你的问题:

  1. 在上面的代码中,不需要手动停止任何东西,因为 interval 将在订阅者计数降为零时被处理掉,这将在 take(1)amb / timeout 完成。
  2. 是的,您原来的两种用法都是有效的,因为在这两种情况下,您都将一个可观察对象的每个元素投影到一个新的可观察对象中,并希望将可观察对象的结果可观察对象扁平化为常规可观察对象。

Here's the jsbin 我一起测试解决方案(您可以调整 pollQueueForResult 中返回的值以获得所需的 success/timeout;为了快速测试)。

对@matt-burnell 的出色回答进行了小幅优化。您可以将 filtertake 运算符替换为 first 运算符,如下所示

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .first(x => x.completed)
      .map(() => 'Completed')
      .timeout(60000, Rx.Observable.throw(new Error('Timeout')))

  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  );

此外,对于可能不知道的人,flatMap 运算符是 RxJS 5.0 中 mergeMap 的别名。

不是你的问题,但我需要相同的功能

import { takeWhileInclusive } from 'rxjs-take-while-inclusive'
import { of, interval, race, throwError } from 'rxjs'
import { catchError, timeout, mergeMap, delay, switchMapTo } from 'rxjs/operators'

const defaultMaxWaitTimeMilliseconds = 5 * 1000

function isAsyncThingSatisfied(result) {
  return true
}

export function doAsyncThingSeveralTimesWithTimeout(
  doAsyncThingReturnsPromise,
  maxWaitTimeMilliseconds = defaultMaxWaitTimeMilliseconds,
  checkEveryMilliseconds = 500,
) {
  const subject$ = race(
    interval(checkEveryMilliseconds).pipe(
      mergeMap(() => doAsyncThingReturnsPromise()),
      takeWhileInclusive(result => isAsyncThingSatisfied(result)),
    ),
    of(null).pipe(
      delay(maxWaitTimeMilliseconds),
      switchMapTo(throwError('doAsyncThingSeveralTimesWithTimeout timeout'))
    )
  )

  return subject$.toPromise(Promise) // will return first result satistieble result of doAsyncThingReturnsPromise or throw error on timeout
}

示例

// mailhogWaitForNEmails
import { takeWhileInclusive } from 'rxjs-take-while-inclusive'
import { of, interval, race, throwError } from 'rxjs'
import { catchError, timeout, mergeMap, delay, switchMap } from 'rxjs/operators'

const defaultMaxWaitTimeMilliseconds = 5 * 1000

export function mailhogWaitForNEmails(
  mailhogClient,
  numberOfExpectedEmails,
  maxWaitTimeMilliseconds = defaultMaxWaitTimeMilliseconds,
  checkEveryMilliseconds = 500,
) {
  let tries = 0

  const mails$ = race(
    interval(checkEveryMilliseconds).pipe(
      mergeMap(() => mailhogClient.getAll()),
      takeWhileInclusive(mails => {
        tries += 1
        return mails.total < numberOfExpectedEmails
      }),
    ),
    of(null).pipe(
      delay(maxWaitTimeMilliseconds),
      switchMap(() => throwError(`mailhogWaitForNEmails timeout after ${tries} tries`))
    )
  )

  // toPromise returns promise which contains the last value from the Observable sequence.
  // If the Observable sequence is in error, then the Promise will be in the rejected stage.
  // If the sequence is empty, the Promise will not resolve.
  return mails$.toPromise(Promise)
}

// mailhogWaitForEmailAndClean
import { mailhogWaitForNEmails } from './mailhogWaitForNEmails'

export async function mailhogWaitForEmailAndClean(mailhogClient) {
  const mails = await mailhogWaitForNEmails(mailhogClient, 1)

  if (mails.count !== 1) {
    throw new Error(
      `Expected to receive 1 email, but received ${mails.count} emails`,
    )
  }

  await mailhogClient.deleteAll()

  return mails.items[0]
}

Angular / 上面的 typescript 重写解决方案:

export interface PollOptions {
  interval: number;
  timeout: number;
}

const OPTIONS_DEFAULT: PollOptions = {
  interval: 5000,
  timeout: 60000
};
@Injectable()
class PollHelper {
  startPoll<T>(
    pollFn: () => Observable<T>, // intermediate polled responses
    stopPollPredicate: (value: T) => boolean, // condition to stop polling
    options: PollOptions = OPTIONS_DEFAULT): Observable<T> {
    return interval(options.interval)
      .pipe(
        exhaustMap(() => pollFn()),
        first(value => stopPollPredicate(value)),
        timeout(options.timeout)
      );
  }
}

示例:

pollHelper.startPoll<Response>(
  () => httpClient.get<Response>(...),
  response => response.isDone()
).subscribe(result => {
  console.log(result);
});

我们也有相同的用例,下面的 code 效果很好。

import { timer, Observable } from "rxjs";
import { scan, tap, switchMapTo, first } from "rxjs/operators";

function checkAttempts(maxAttempts: number) {
  return (attempts: number) => {
    if (attempts > maxAttempts) {
      throw new Error("Error: max attempts");
    }
  };
}

export function pollUntil<T>(
  pollInterval: number,
  maxAttempts: number,
  responsePredicate: (res: any) => boolean
) {
  return (source$: Observable<T>) =>
    timer(0, pollInterval).pipe(
      scan(attempts => ++attempts, 0),
      tap(checkAttempts(maxAttempts)),
      switchMapTo(source$),
      first(responsePredicate)
    );
}

如果尝试次数已达到限制,则会抛出一个错误,导致输出流被取消订阅。此外,您只能在不满足定义为 responsePredicate 的给定条件之前发出 http 请求。

示例用法:

import { of } from "rxjs";

import { pollUntil } from "./poll-until-rxjs";

const responseObj = { body: { inProgress: true } };
const response$ = of(responseObj);
// this is to simulate a http call
response$
  .pipe(pollUntil(1000, 3, ({ body }) => !body.inProgress))
  .subscribe(({ body }) => console.log("Response body: ", body));

setTimeout(() => (responseObj.body.inProgress = false), 1500);