如何使用 rxjs 依次轮询多个端点?

How to poll several endpoints sequentially with rxjs?

我正在尝试使用 Rxjs 实现以下目标:给定一个作业 ID 数组,对于数组中的每个 ID,轮询一个 returns 作业状态的端点。状态可以是“运行”或“已完成”。代码应该一个接一个地轮询作业,并继续轮询直到作业处于“运行”状态。一旦作业达到“已完成”状态,就应该将其传递给下游,并排除在进一步的轮询之外。

下面是一个演示问题的最小玩具箱。

const {
from,
of,
interval,
mergeMap,
filter,
take,
tap,
delay
} = rxjs;
const { range } = _;

const doRequest = (input) => {
  const status = Math.random() < 0.15 ? 'FINISHED' : 'RUNNING';

  return of({ status, value: input })
    .pipe(delay(500));
};

const number$ = from(range(1, 10));


const poll = (number) => interval(5000).pipe(
  mergeMap(() => {
    return doRequest(number)
  }),
  tap(console.log),
  filter(( {status} ) => status === 'FINISHED'),
  take(1)
);

const printout$ = number$.pipe(
  mergeMap((number) => {
    return poll(number)
  })
);



printout$.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.5/rxjs.umd.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/lodash.js/4.17.21/lodash.min.js"></script>

它完成了我描述的大部分工作;但它会同时轮询所有端点,而不是一个接一个地轮询。在这里,大致是我想要实现的模式:

starting with ids: [1, 2, 3]
polling: await request 1 then await request 2 then await request 3
then wait for n seconds; then repeat
after job 2 is finished, send request 1, then send request 3, then wait, then repeat
after job 3 is finished, send request 1, then wait, repeat
after job 1 is finished, complete the stream

我觉得为了实现请求的顺序发送,应该concatMaped;但在上面的代码片段中这是不可能的,因为间隔会阻止每个轮询流终止。

能否请您建议如何修改我的代码以实现我所描述的内容?

更新:原来的回答不对。

我们想要实现的是,在间隔的每一次循环中,我们都会按顺序轮询所有未完成的作业。我们将所有已完成的作业交给可观察输出,并且我们还会从后续轮询中忽略那些已完成的作业。

我们可以使用 Subject instead of a static observable of the job IDs. We start our poll interval and we use withLatestFrom 来包含最新的作业 ID 列表。然后,我们可以在完成作业时将 tap 添加到输出可观察对象中,并更新 Subject 以忽略该作业。

为了结束轮询间隔,我们可以创建一个可观察对象,当未完成的作业数组为空时触发,并使用 takeUntil

const number$ = new Subject();
const noMoreNumber$ = number$.pipe(skipWhile((numbers) => numbers.length > 0));

const printout$ = interval(5000).pipe(
  withLatestFrom(number$),
  switchMap(([_, numbers]) => {
    return numbers.map((number) => defer(() => doRequest(number)));
  }),
  concatAll(),
  //tap(console.log),
  filter(({ status }) => status === 'FINISHED'),
  withLatestFrom(number$),
  tap(([{ value }, numbers]) =>
    number$.next(numbers.filter((num) => num != value))
  ),
  map(([item]) => item),
  takeUntil(noMoreNumber$)
);

printout$.subscribe({
  next: console.log,
  error: console.error,
  complete: () => console.log('COMPLETE'),
});

number$.next([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

我要做的另一个调整是在轮询器本身内部使用 switchMap 而不是 mergeMap。如果您将它与 fromFetch 结合使用来执行您的 HTTP 调用,那么,如果有一些 long-running HTTP 调用卡住了,在下一次轮询中,先前的调用将在它进行下一个 HTTP 之前被取消之所以调用,是因为 switchMap 在订阅新的之前处理了之前的可观察对象。

这是一个工作示例: https://stackblitz.com/edit/js-gxrrb3?devToolsHeight=33&file=index.js

生成如下所示的控制台输出...

如果我理解正确,我会这样处理。

首先,我将创建一个 poll 函数,该函数 returns 一个 Observable,它在一轮轮询后发出通知,并发出一个数组,其中包含调用 [=15] 的所有数字=] returns 'RUNNING'。这样的功能看起来像这样

const poll = (numbers: number[]) => {
  return from(numbers).pipe(
    concatMap((n) =>
      doRequest(n).pipe(
        filter((resp) => resp.status === 'RUNNING'),
        map((resp) => resp.value)
      )
    ),
    toArray()
  );
};

然后你需要做的是递归迭代调用poll函数,直到poll返回的Observable发出的数组为空。

rxjs 中的递归通常使用 expand 运算符获得,这是我们在这种情况下也将使用的运算符,例如

poll(numbers)
  .pipe(
    expand((numbers) =>
      numbers.length === 0
        ? EMPTY
        : timer(2000).pipe(concatMap(() => poll(numbers)))
    )
  )
  .subscribe(console.log);

一个完整的例子可以在this stackblitz中看到。

更新

如果 objective 是通知已完成轮询逻辑的作业 ID,解决方案的结构保持不变(poll 函数和通过 expand 递归) 但细节有所不同。

poll 函数确保我们发出轮询的所有响应,它看起来像这样:

const poll = (
  numbers: number[]
) => {
  console.log(`Polling ${numbers}`);
  return from(numbers).pipe(
    concatMap((n) => doRequest(n)),
    toArray()
  );
};

递归逻辑确保再次轮询所有仍处于“运行”状态的作业,然后我们仅过滤已完成的作业并将其传递到下游。换句话说,逻辑看起来像这样

poll(start)
  .pipe(
    expand((responses) => {
      const numbers = responses.filter(r => r.status === 'RUNNING').map(r => r.value)
      return numbers.length === 0
        ? EMPTY
        : timer(2000).pipe(concatMap(() => poll(numbers)));
    }),
    map(responses => responses.filter(r => r.status === 'FINISHED')),
    filter(finished => finished.length > 0)
  )
  .subscribe({
    next: responses => console.log(`Job finished ${responses.map(r => r.value)}`),
    complete: () => {console.log('All processed')}
  });

可以在 this stackblitz 中看到一个工作示例。

试试这个

import { delay, EMPTY, from, of, range } from 'rxjs';
import { concatMap, filter, mergeMap, tap, toArray } from 'rxjs/operators';

const number$ = from(range(1, 3));

const doRequest = (input) => {
  const status = Math.random() < 0.15 ? 'FINISHED' : 'RUNNING';

  return of({ status, value: input }).pipe(delay(1000));
};

const poll = (jobs: object[]) => {
  return from(jobs).pipe(
    filter((job) => job['status'] !== 'FINISHED'),
    concatMap((job) => doRequest(job['value'])),
    tap((job) => {
      console.log('polling with................', job);
    }),
    toArray(),
    tap((result) => {
      console.log('curent jobs................', JSON.stringify(result));
    }),
    mergeMap((result) =>
      result.length > 0 ? poll(result) : of('All job completed!')
    )
  );
};

const initiateJob = number$.pipe(
  mergeMap((id) => doRequest(id)),
  toArray(),
  tap((jobs) => {
    console.log('initialJobs: ', JSON.stringify(jobs));
  }),
  concatMap(poll)
);

initiateJob.subscribe({
  next: console.log,
  error: console.log,
  complete: () => console.log('COMPLETED'),
});