如何使用 rxjs 依次轮询多个端点?
How to poll several endpoints sequentially with rxjs?
我正在尝试使用 Rxjs 实现以下目标:给定一个作业 ID 数组,对于数组中的每个 ID,轮询一个 returns 作业状态的端点。状态可以是“运行”或“已完成”。代码应该一个接一个地轮询作业,并继续轮询直到作业处于“运行”状态。一旦作业达到“已完成”状态,就应该将其传递给下游,并排除在进一步的轮询之外。
下面是一个演示问题的最小玩具箱。
const {
from,
of,
interval,
mergeMap,
filter,
take,
tap,
delay
} = rxjs;
const { range } = _;
const doRequest = (input) => {
const status = Math.random() < 0.15 ? 'FINISHED' : 'RUNNING';
return of({ status, value: input })
.pipe(delay(500));
};
const number$ = from(range(1, 10));
const poll = (number) => interval(5000).pipe(
mergeMap(() => {
return doRequest(number)
}),
tap(console.log),
filter(( {status} ) => status === 'FINISHED'),
take(1)
);
const printout$ = number$.pipe(
mergeMap((number) => {
return poll(number)
})
);
printout$.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.5/rxjs.umd.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/lodash.js/4.17.21/lodash.min.js"></script>
它完成了我描述的大部分工作;但它会同时轮询所有端点,而不是一个接一个地轮询。在这里,大致是我想要实现的模式:
starting with ids: [1, 2, 3]
polling: await request 1 then await request 2 then await request 3
then wait for n seconds; then repeat
after job 2 is finished, send request 1, then send request 3, then wait, then repeat
after job 3 is finished, send request 1, then wait, repeat
after job 1 is finished, complete the stream
我觉得为了实现请求的顺序发送,应该concatMap
ed;但在上面的代码片段中这是不可能的,因为间隔会阻止每个轮询流终止。
能否请您建议如何修改我的代码以实现我所描述的内容?
更新:原来的回答不对。
我们想要实现的是,在间隔的每一次循环中,我们都会按顺序轮询所有未完成的作业。我们将所有已完成的作业交给可观察输出,并且我们还会从后续轮询中忽略那些已完成的作业。
我们可以使用 Subject instead of a static observable of the job IDs. We start our poll interval and we use withLatestFrom
来包含最新的作业 ID 列表。然后,我们可以在完成作业时将 tap
添加到输出可观察对象中,并更新 Subject 以忽略该作业。
为了结束轮询间隔,我们可以创建一个可观察对象,当未完成的作业数组为空时触发,并使用 takeUntil
。
const number$ = new Subject();
const noMoreNumber$ = number$.pipe(skipWhile((numbers) => numbers.length > 0));
const printout$ = interval(5000).pipe(
withLatestFrom(number$),
switchMap(([_, numbers]) => {
return numbers.map((number) => defer(() => doRequest(number)));
}),
concatAll(),
//tap(console.log),
filter(({ status }) => status === 'FINISHED'),
withLatestFrom(number$),
tap(([{ value }, numbers]) =>
number$.next(numbers.filter((num) => num != value))
),
map(([item]) => item),
takeUntil(noMoreNumber$)
);
printout$.subscribe({
next: console.log,
error: console.error,
complete: () => console.log('COMPLETE'),
});
number$.next([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
我要做的另一个调整是在轮询器本身内部使用 switchMap
而不是 mergeMap
。如果您将它与 fromFetch
结合使用来执行您的 HTTP 调用,那么,如果有一些 long-running HTTP 调用卡住了,在下一次轮询中,先前的调用将在它进行下一个 HTTP 之前被取消之所以调用,是因为 switchMap
在订阅新的之前处理了之前的可观察对象。
这是一个工作示例:
https://stackblitz.com/edit/js-gxrrb3?devToolsHeight=33&file=index.js
生成如下所示的控制台输出...
如果我理解正确,我会这样处理。
首先,我将创建一个 poll
函数,该函数 returns 一个 Observable,它在一轮轮询后发出通知,并发出一个数组,其中包含调用 [=15] 的所有数字=] returns 'RUNNING'
。这样的功能看起来像这样
const poll = (numbers: number[]) => {
return from(numbers).pipe(
concatMap((n) =>
doRequest(n).pipe(
filter((resp) => resp.status === 'RUNNING'),
map((resp) => resp.value)
)
),
toArray()
);
};
然后你需要做的是递归迭代调用poll
函数,直到poll
返回的Observable发出的数组为空。
rxjs 中的递归通常使用 expand
运算符获得,这是我们在这种情况下也将使用的运算符,例如
poll(numbers)
.pipe(
expand((numbers) =>
numbers.length === 0
? EMPTY
: timer(2000).pipe(concatMap(() => poll(numbers)))
)
)
.subscribe(console.log);
一个完整的例子可以在this stackblitz中看到。
更新
如果 objective 是通知已完成轮询逻辑的作业 ID,解决方案的结构保持不变(poll
函数和通过 expand
递归) 但细节有所不同。
poll
函数确保我们发出轮询的所有响应,它看起来像这样:
const poll = (
numbers: number[]
) => {
console.log(`Polling ${numbers}`);
return from(numbers).pipe(
concatMap((n) => doRequest(n)),
toArray()
);
};
递归逻辑确保再次轮询所有仍处于“运行”状态的作业,然后我们仅过滤已完成的作业并将其传递到下游。换句话说,逻辑看起来像这样
poll(start)
.pipe(
expand((responses) => {
const numbers = responses.filter(r => r.status === 'RUNNING').map(r => r.value)
return numbers.length === 0
? EMPTY
: timer(2000).pipe(concatMap(() => poll(numbers)));
}),
map(responses => responses.filter(r => r.status === 'FINISHED')),
filter(finished => finished.length > 0)
)
.subscribe({
next: responses => console.log(`Job finished ${responses.map(r => r.value)}`),
complete: () => {console.log('All processed')}
});
可以在 this stackblitz 中看到一个工作示例。
试试这个
import { delay, EMPTY, from, of, range } from 'rxjs';
import { concatMap, filter, mergeMap, tap, toArray } from 'rxjs/operators';
const number$ = from(range(1, 3));
const doRequest = (input) => {
const status = Math.random() < 0.15 ? 'FINISHED' : 'RUNNING';
return of({ status, value: input }).pipe(delay(1000));
};
const poll = (jobs: object[]) => {
return from(jobs).pipe(
filter((job) => job['status'] !== 'FINISHED'),
concatMap((job) => doRequest(job['value'])),
tap((job) => {
console.log('polling with................', job);
}),
toArray(),
tap((result) => {
console.log('curent jobs................', JSON.stringify(result));
}),
mergeMap((result) =>
result.length > 0 ? poll(result) : of('All job completed!')
)
);
};
const initiateJob = number$.pipe(
mergeMap((id) => doRequest(id)),
toArray(),
tap((jobs) => {
console.log('initialJobs: ', JSON.stringify(jobs));
}),
concatMap(poll)
);
initiateJob.subscribe({
next: console.log,
error: console.log,
complete: () => console.log('COMPLETED'),
});
我正在尝试使用 Rxjs 实现以下目标:给定一个作业 ID 数组,对于数组中的每个 ID,轮询一个 returns 作业状态的端点。状态可以是“运行”或“已完成”。代码应该一个接一个地轮询作业,并继续轮询直到作业处于“运行”状态。一旦作业达到“已完成”状态,就应该将其传递给下游,并排除在进一步的轮询之外。
下面是一个演示问题的最小玩具箱。
const {
from,
of,
interval,
mergeMap,
filter,
take,
tap,
delay
} = rxjs;
const { range } = _;
const doRequest = (input) => {
const status = Math.random() < 0.15 ? 'FINISHED' : 'RUNNING';
return of({ status, value: input })
.pipe(delay(500));
};
const number$ = from(range(1, 10));
const poll = (number) => interval(5000).pipe(
mergeMap(() => {
return doRequest(number)
}),
tap(console.log),
filter(( {status} ) => status === 'FINISHED'),
take(1)
);
const printout$ = number$.pipe(
mergeMap((number) => {
return poll(number)
})
);
printout$.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.5/rxjs.umd.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/lodash.js/4.17.21/lodash.min.js"></script>
它完成了我描述的大部分工作;但它会同时轮询所有端点,而不是一个接一个地轮询。在这里,大致是我想要实现的模式:
starting with ids: [1, 2, 3]
polling: await request 1 then await request 2 then await request 3
then wait for n seconds; then repeat
after job 2 is finished, send request 1, then send request 3, then wait, then repeat
after job 3 is finished, send request 1, then wait, repeat
after job 1 is finished, complete the stream
我觉得为了实现请求的顺序发送,应该concatMap
ed;但在上面的代码片段中这是不可能的,因为间隔会阻止每个轮询流终止。
能否请您建议如何修改我的代码以实现我所描述的内容?
更新:原来的回答不对。
我们想要实现的是,在间隔的每一次循环中,我们都会按顺序轮询所有未完成的作业。我们将所有已完成的作业交给可观察输出,并且我们还会从后续轮询中忽略那些已完成的作业。
我们可以使用 Subject instead of a static observable of the job IDs. We start our poll interval and we use withLatestFrom
来包含最新的作业 ID 列表。然后,我们可以在完成作业时将 tap
添加到输出可观察对象中,并更新 Subject 以忽略该作业。
为了结束轮询间隔,我们可以创建一个可观察对象,当未完成的作业数组为空时触发,并使用 takeUntil
。
const number$ = new Subject();
const noMoreNumber$ = number$.pipe(skipWhile((numbers) => numbers.length > 0));
const printout$ = interval(5000).pipe(
withLatestFrom(number$),
switchMap(([_, numbers]) => {
return numbers.map((number) => defer(() => doRequest(number)));
}),
concatAll(),
//tap(console.log),
filter(({ status }) => status === 'FINISHED'),
withLatestFrom(number$),
tap(([{ value }, numbers]) =>
number$.next(numbers.filter((num) => num != value))
),
map(([item]) => item),
takeUntil(noMoreNumber$)
);
printout$.subscribe({
next: console.log,
error: console.error,
complete: () => console.log('COMPLETE'),
});
number$.next([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
我要做的另一个调整是在轮询器本身内部使用 switchMap
而不是 mergeMap
。如果您将它与 fromFetch
结合使用来执行您的 HTTP 调用,那么,如果有一些 long-running HTTP 调用卡住了,在下一次轮询中,先前的调用将在它进行下一个 HTTP 之前被取消之所以调用,是因为 switchMap
在订阅新的之前处理了之前的可观察对象。
这是一个工作示例: https://stackblitz.com/edit/js-gxrrb3?devToolsHeight=33&file=index.js
生成如下所示的控制台输出...
如果我理解正确,我会这样处理。
首先,我将创建一个 poll
函数,该函数 returns 一个 Observable,它在一轮轮询后发出通知,并发出一个数组,其中包含调用 [=15] 的所有数字=] returns 'RUNNING'
。这样的功能看起来像这样
const poll = (numbers: number[]) => {
return from(numbers).pipe(
concatMap((n) =>
doRequest(n).pipe(
filter((resp) => resp.status === 'RUNNING'),
map((resp) => resp.value)
)
),
toArray()
);
};
然后你需要做的是递归迭代调用poll
函数,直到poll
返回的Observable发出的数组为空。
rxjs 中的递归通常使用 expand
运算符获得,这是我们在这种情况下也将使用的运算符,例如
poll(numbers)
.pipe(
expand((numbers) =>
numbers.length === 0
? EMPTY
: timer(2000).pipe(concatMap(() => poll(numbers)))
)
)
.subscribe(console.log);
一个完整的例子可以在this stackblitz中看到。
更新
如果 objective 是通知已完成轮询逻辑的作业 ID,解决方案的结构保持不变(poll
函数和通过 expand
递归) 但细节有所不同。
poll
函数确保我们发出轮询的所有响应,它看起来像这样:
const poll = (
numbers: number[]
) => {
console.log(`Polling ${numbers}`);
return from(numbers).pipe(
concatMap((n) => doRequest(n)),
toArray()
);
};
递归逻辑确保再次轮询所有仍处于“运行”状态的作业,然后我们仅过滤已完成的作业并将其传递到下游。换句话说,逻辑看起来像这样
poll(start)
.pipe(
expand((responses) => {
const numbers = responses.filter(r => r.status === 'RUNNING').map(r => r.value)
return numbers.length === 0
? EMPTY
: timer(2000).pipe(concatMap(() => poll(numbers)));
}),
map(responses => responses.filter(r => r.status === 'FINISHED')),
filter(finished => finished.length > 0)
)
.subscribe({
next: responses => console.log(`Job finished ${responses.map(r => r.value)}`),
complete: () => {console.log('All processed')}
});
可以在 this stackblitz 中看到一个工作示例。
试试这个
import { delay, EMPTY, from, of, range } from 'rxjs';
import { concatMap, filter, mergeMap, tap, toArray } from 'rxjs/operators';
const number$ = from(range(1, 3));
const doRequest = (input) => {
const status = Math.random() < 0.15 ? 'FINISHED' : 'RUNNING';
return of({ status, value: input }).pipe(delay(1000));
};
const poll = (jobs: object[]) => {
return from(jobs).pipe(
filter((job) => job['status'] !== 'FINISHED'),
concatMap((job) => doRequest(job['value'])),
tap((job) => {
console.log('polling with................', job);
}),
toArray(),
tap((result) => {
console.log('curent jobs................', JSON.stringify(result));
}),
mergeMap((result) =>
result.length > 0 ? poll(result) : of('All job completed!')
)
);
};
const initiateJob = number$.pipe(
mergeMap((id) => doRequest(id)),
toArray(),
tap((jobs) => {
console.log('initialJobs: ', JSON.stringify(jobs));
}),
concatMap(poll)
);
initiateJob.subscribe({
next: console.log,
error: console.log,
complete: () => console.log('COMPLETED'),
});