RxJs:轮询直到完成间隔或收到正确的数据
RxJs: poll until interval done or correct data received
如何使用 RxJs 在浏览器中执行以下场景:
- 将数据提交到队列进行处理
- 取回作业 ID
- 每 1 秒轮询一次另一个端点,直到结果可用或 60 秒过去(然后失败)
我想出的中间解决方案:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable
.interval(1000)
.delay(5000)
.map(_ => jobQueueData.jobId)
.take(55)
)
.flatMap(jobId => Rx.Observable.fromPromise(pollQueueForResult(jobId)))
.filter(result => result.completed)
.subscribe(
result => console.log('Result', result),
error => console.log('Error', error)
);
- 有没有没有中间变量的方法,一旦数据到达或发生错误就停止定时器?我现在可以引入新的可观察对象,然后使用
takeUntil
- 这里的
flatMap
用法在语义上是否正确?也许这整件事应该重写而不是与 flatMap
? 链接
从头开始,您已经得到了变成可观察对象的承诺。一旦这产生一个值,您希望每秒调用一次,直到您收到特定的响应(成功)或直到经过特定的时间。我们可以将这个解释的每个部分映射到一个 Rx 方法:
"Once this yields a value" = map
/flatMap
(flatMap
因为接下来的也是可观察的,我们需要将它们拉平)
"once per second" = interval
"receive a certain response" = filter
"or" = amb
"certain amount of time has passed" = timer
从那里,我们可以像这样拼凑起来:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.filter(x => x.completed)
.take(1)
.map(() => 'Completed')
.amb(
Rx.Observable.timer(60000)
.flatMap(() => Rx.Observable.throw(new Error('Timeout')))
)
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
)
;
一旦我们得到初始结果,我们就会将其投射到两个可观察对象之间的竞争中,一个在收到成功响应时产生一个值,另一个在一定时间后产生一个值已经过去了。第二个 flatMap
是因为 .throw
不存在于 observable 实例中,而 Rx.Observable
returns 上的方法也是一个 observable 也需要展平。
事实证明 amb
/ timer
组合实际上可以用 timeout
代替,像这样:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.filter(x => x.completed)
.take(1)
.map(() => 'Completed')
.timeout(60000, Rx.Observable.throw(new Error('Timeout')))
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
)
;
我省略了您示例中的 .delay
,因为它没有在您想要的逻辑中描述,但它可以简单地适用于此解决方案。
所以,直接回答你的问题:
- 在上面的代码中,不需要手动停止任何东西,因为
interval
将在订阅者计数降为零时被处理掉,这将在 take(1)
或amb
/ timeout
完成。
- 是的,您原来的两种用法都是有效的,因为在这两种情况下,您都将一个可观察对象的每个元素投影到一个新的可观察对象中,并希望将可观察对象的结果可观察对象扁平化为常规可观察对象。
Here's the jsbin 我一起测试解决方案(您可以调整 pollQueueForResult
中返回的值以获得所需的 success/timeout;为了快速测试)。
对@matt-burnell 的出色回答进行了小幅优化。您可以将 filter 和 take 运算符替换为 first 运算符,如下所示
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.first(x => x.completed)
.map(() => 'Completed')
.timeout(60000, Rx.Observable.throw(new Error('Timeout')))
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
);
此外,对于可能不知道的人,flatMap 运算符是 RxJS 5.0 中 mergeMap 的别名。
不是你的问题,但我需要相同的功能
import { takeWhileInclusive } from 'rxjs-take-while-inclusive'
import { of, interval, race, throwError } from 'rxjs'
import { catchError, timeout, mergeMap, delay, switchMapTo } from 'rxjs/operators'
const defaultMaxWaitTimeMilliseconds = 5 * 1000
function isAsyncThingSatisfied(result) {
return true
}
export function doAsyncThingSeveralTimesWithTimeout(
doAsyncThingReturnsPromise,
maxWaitTimeMilliseconds = defaultMaxWaitTimeMilliseconds,
checkEveryMilliseconds = 500,
) {
const subject$ = race(
interval(checkEveryMilliseconds).pipe(
mergeMap(() => doAsyncThingReturnsPromise()),
takeWhileInclusive(result => isAsyncThingSatisfied(result)),
),
of(null).pipe(
delay(maxWaitTimeMilliseconds),
switchMapTo(throwError('doAsyncThingSeveralTimesWithTimeout timeout'))
)
)
return subject$.toPromise(Promise) // will return first result satistieble result of doAsyncThingReturnsPromise or throw error on timeout
}
示例
// mailhogWaitForNEmails
import { takeWhileInclusive } from 'rxjs-take-while-inclusive'
import { of, interval, race, throwError } from 'rxjs'
import { catchError, timeout, mergeMap, delay, switchMap } from 'rxjs/operators'
const defaultMaxWaitTimeMilliseconds = 5 * 1000
export function mailhogWaitForNEmails(
mailhogClient,
numberOfExpectedEmails,
maxWaitTimeMilliseconds = defaultMaxWaitTimeMilliseconds,
checkEveryMilliseconds = 500,
) {
let tries = 0
const mails$ = race(
interval(checkEveryMilliseconds).pipe(
mergeMap(() => mailhogClient.getAll()),
takeWhileInclusive(mails => {
tries += 1
return mails.total < numberOfExpectedEmails
}),
),
of(null).pipe(
delay(maxWaitTimeMilliseconds),
switchMap(() => throwError(`mailhogWaitForNEmails timeout after ${tries} tries`))
)
)
// toPromise returns promise which contains the last value from the Observable sequence.
// If the Observable sequence is in error, then the Promise will be in the rejected stage.
// If the sequence is empty, the Promise will not resolve.
return mails$.toPromise(Promise)
}
// mailhogWaitForEmailAndClean
import { mailhogWaitForNEmails } from './mailhogWaitForNEmails'
export async function mailhogWaitForEmailAndClean(mailhogClient) {
const mails = await mailhogWaitForNEmails(mailhogClient, 1)
if (mails.count !== 1) {
throw new Error(
`Expected to receive 1 email, but received ${mails.count} emails`,
)
}
await mailhogClient.deleteAll()
return mails.items[0]
}
Angular / 上面的 typescript 重写解决方案:
export interface PollOptions {
interval: number;
timeout: number;
}
const OPTIONS_DEFAULT: PollOptions = {
interval: 5000,
timeout: 60000
};
@Injectable()
class PollHelper {
startPoll<T>(
pollFn: () => Observable<T>, // intermediate polled responses
stopPollPredicate: (value: T) => boolean, // condition to stop polling
options: PollOptions = OPTIONS_DEFAULT): Observable<T> {
return interval(options.interval)
.pipe(
exhaustMap(() => pollFn()),
first(value => stopPollPredicate(value)),
timeout(options.timeout)
);
}
}
示例:
pollHelper.startPoll<Response>(
() => httpClient.get<Response>(...),
response => response.isDone()
).subscribe(result => {
console.log(result);
});
我们也有相同的用例,下面的 code 效果很好。
import { timer, Observable } from "rxjs";
import { scan, tap, switchMapTo, first } from "rxjs/operators";
function checkAttempts(maxAttempts: number) {
return (attempts: number) => {
if (attempts > maxAttempts) {
throw new Error("Error: max attempts");
}
};
}
export function pollUntil<T>(
pollInterval: number,
maxAttempts: number,
responsePredicate: (res: any) => boolean
) {
return (source$: Observable<T>) =>
timer(0, pollInterval).pipe(
scan(attempts => ++attempts, 0),
tap(checkAttempts(maxAttempts)),
switchMapTo(source$),
first(responsePredicate)
);
}
如果尝试次数已达到限制,则会抛出一个错误,导致输出流被取消订阅。此外,您只能在不满足定义为 responsePredicate 的给定条件之前发出 http 请求。
示例用法:
import { of } from "rxjs";
import { pollUntil } from "./poll-until-rxjs";
const responseObj = { body: { inProgress: true } };
const response$ = of(responseObj);
// this is to simulate a http call
response$
.pipe(pollUntil(1000, 3, ({ body }) => !body.inProgress))
.subscribe(({ body }) => console.log("Response body: ", body));
setTimeout(() => (responseObj.body.inProgress = false), 1500);
如何使用 RxJs 在浏览器中执行以下场景:
- 将数据提交到队列进行处理
- 取回作业 ID
- 每 1 秒轮询一次另一个端点,直到结果可用或 60 秒过去(然后失败)
我想出的中间解决方案:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable
.interval(1000)
.delay(5000)
.map(_ => jobQueueData.jobId)
.take(55)
)
.flatMap(jobId => Rx.Observable.fromPromise(pollQueueForResult(jobId)))
.filter(result => result.completed)
.subscribe(
result => console.log('Result', result),
error => console.log('Error', error)
);
- 有没有没有中间变量的方法,一旦数据到达或发生错误就停止定时器?我现在可以引入新的可观察对象,然后使用
takeUntil
- 这里的
flatMap
用法在语义上是否正确?也许这整件事应该重写而不是与flatMap
? 链接
从头开始,您已经得到了变成可观察对象的承诺。一旦这产生一个值,您希望每秒调用一次,直到您收到特定的响应(成功)或直到经过特定的时间。我们可以将这个解释的每个部分映射到一个 Rx 方法:
"Once this yields a value" = map
/flatMap
(flatMap
因为接下来的也是可观察的,我们需要将它们拉平)
"once per second" = interval
"receive a certain response" = filter
"or" = amb
"certain amount of time has passed" = timer
从那里,我们可以像这样拼凑起来:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.filter(x => x.completed)
.take(1)
.map(() => 'Completed')
.amb(
Rx.Observable.timer(60000)
.flatMap(() => Rx.Observable.throw(new Error('Timeout')))
)
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
)
;
一旦我们得到初始结果,我们就会将其投射到两个可观察对象之间的竞争中,一个在收到成功响应时产生一个值,另一个在一定时间后产生一个值已经过去了。第二个 flatMap
是因为 .throw
不存在于 observable 实例中,而 Rx.Observable
returns 上的方法也是一个 observable 也需要展平。
事实证明 amb
/ timer
组合实际上可以用 timeout
代替,像这样:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.filter(x => x.completed)
.take(1)
.map(() => 'Completed')
.timeout(60000, Rx.Observable.throw(new Error('Timeout')))
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
)
;
我省略了您示例中的 .delay
,因为它没有在您想要的逻辑中描述,但它可以简单地适用于此解决方案。
所以,直接回答你的问题:
- 在上面的代码中,不需要手动停止任何东西,因为
interval
将在订阅者计数降为零时被处理掉,这将在take(1)
或amb
/timeout
完成。 - 是的,您原来的两种用法都是有效的,因为在这两种情况下,您都将一个可观察对象的每个元素投影到一个新的可观察对象中,并希望将可观察对象的结果可观察对象扁平化为常规可观察对象。
Here's the jsbin 我一起测试解决方案(您可以调整 pollQueueForResult
中返回的值以获得所需的 success/timeout;为了快速测试)。
对@matt-burnell 的出色回答进行了小幅优化。您可以将 filter 和 take 运算符替换为 first 运算符,如下所示
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.first(x => x.completed)
.map(() => 'Completed')
.timeout(60000, Rx.Observable.throw(new Error('Timeout')))
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
);
此外,对于可能不知道的人,flatMap 运算符是 RxJS 5.0 中 mergeMap 的别名。
不是你的问题,但我需要相同的功能
import { takeWhileInclusive } from 'rxjs-take-while-inclusive'
import { of, interval, race, throwError } from 'rxjs'
import { catchError, timeout, mergeMap, delay, switchMapTo } from 'rxjs/operators'
const defaultMaxWaitTimeMilliseconds = 5 * 1000
function isAsyncThingSatisfied(result) {
return true
}
export function doAsyncThingSeveralTimesWithTimeout(
doAsyncThingReturnsPromise,
maxWaitTimeMilliseconds = defaultMaxWaitTimeMilliseconds,
checkEveryMilliseconds = 500,
) {
const subject$ = race(
interval(checkEveryMilliseconds).pipe(
mergeMap(() => doAsyncThingReturnsPromise()),
takeWhileInclusive(result => isAsyncThingSatisfied(result)),
),
of(null).pipe(
delay(maxWaitTimeMilliseconds),
switchMapTo(throwError('doAsyncThingSeveralTimesWithTimeout timeout'))
)
)
return subject$.toPromise(Promise) // will return first result satistieble result of doAsyncThingReturnsPromise or throw error on timeout
}
示例
// mailhogWaitForNEmails
import { takeWhileInclusive } from 'rxjs-take-while-inclusive'
import { of, interval, race, throwError } from 'rxjs'
import { catchError, timeout, mergeMap, delay, switchMap } from 'rxjs/operators'
const defaultMaxWaitTimeMilliseconds = 5 * 1000
export function mailhogWaitForNEmails(
mailhogClient,
numberOfExpectedEmails,
maxWaitTimeMilliseconds = defaultMaxWaitTimeMilliseconds,
checkEveryMilliseconds = 500,
) {
let tries = 0
const mails$ = race(
interval(checkEveryMilliseconds).pipe(
mergeMap(() => mailhogClient.getAll()),
takeWhileInclusive(mails => {
tries += 1
return mails.total < numberOfExpectedEmails
}),
),
of(null).pipe(
delay(maxWaitTimeMilliseconds),
switchMap(() => throwError(`mailhogWaitForNEmails timeout after ${tries} tries`))
)
)
// toPromise returns promise which contains the last value from the Observable sequence.
// If the Observable sequence is in error, then the Promise will be in the rejected stage.
// If the sequence is empty, the Promise will not resolve.
return mails$.toPromise(Promise)
}
// mailhogWaitForEmailAndClean
import { mailhogWaitForNEmails } from './mailhogWaitForNEmails'
export async function mailhogWaitForEmailAndClean(mailhogClient) {
const mails = await mailhogWaitForNEmails(mailhogClient, 1)
if (mails.count !== 1) {
throw new Error(
`Expected to receive 1 email, but received ${mails.count} emails`,
)
}
await mailhogClient.deleteAll()
return mails.items[0]
}
Angular / 上面的 typescript 重写解决方案:
export interface PollOptions {
interval: number;
timeout: number;
}
const OPTIONS_DEFAULT: PollOptions = {
interval: 5000,
timeout: 60000
};
@Injectable()
class PollHelper {
startPoll<T>(
pollFn: () => Observable<T>, // intermediate polled responses
stopPollPredicate: (value: T) => boolean, // condition to stop polling
options: PollOptions = OPTIONS_DEFAULT): Observable<T> {
return interval(options.interval)
.pipe(
exhaustMap(() => pollFn()),
first(value => stopPollPredicate(value)),
timeout(options.timeout)
);
}
}
示例:
pollHelper.startPoll<Response>(
() => httpClient.get<Response>(...),
response => response.isDone()
).subscribe(result => {
console.log(result);
});
我们也有相同的用例,下面的 code 效果很好。
import { timer, Observable } from "rxjs";
import { scan, tap, switchMapTo, first } from "rxjs/operators";
function checkAttempts(maxAttempts: number) {
return (attempts: number) => {
if (attempts > maxAttempts) {
throw new Error("Error: max attempts");
}
};
}
export function pollUntil<T>(
pollInterval: number,
maxAttempts: number,
responsePredicate: (res: any) => boolean
) {
return (source$: Observable<T>) =>
timer(0, pollInterval).pipe(
scan(attempts => ++attempts, 0),
tap(checkAttempts(maxAttempts)),
switchMapTo(source$),
first(responsePredicate)
);
}
如果尝试次数已达到限制,则会抛出一个错误,导致输出流被取消订阅。此外,您只能在不满足定义为 responsePredicate 的给定条件之前发出 http 请求。
示例用法:
import { of } from "rxjs";
import { pollUntil } from "./poll-until-rxjs";
const responseObj = { body: { inProgress: true } };
const response$ = of(responseObj);
// this is to simulate a http call
response$
.pipe(pollUntil(1000, 3, ({ body }) => !body.inProgress))
.subscribe(({ body }) => console.log("Response body: ", body));
setTimeout(() => (responseObj.body.inProgress = false), 1500);