RXJS Chain 依序依赖 observables,但获取进度条的每个发射
RXJS Chain dependent observables sequentially, but getting each emission for a progress bar
我遇到了一个问题,我一直在尝试使用 RxJs
找到解决方案,但似乎找不到合适的解决方案...
- 我有 3 个不同的 REST 请求,它们将被顺序调用,每个请求都需要前一个请求的响应作为参数
- 我想实现一个进度条,它会随着请求的成功而递增
这是我的想法:
- 我将使用管道和
concatMap()
来避免嵌套订阅,并在前一个请求完成后订阅每个请求。
考虑这个非常简化的版本。假设每个 of
代表一个完整的 REST 成功请求(稍后将处理错误),并且我将使用 n
参数进行未显示的工作...
const request1 = of('success 1').pipe(
delay(500),
tap(n => console.log('received ' + n)),
);
const request2 = (n) => of('success 2').pipe(
delay(1000),
tap(n => console.log('received ' + n))
);
const request3 = (n) => of('success 3').pipe(
delay(400),
tap(n => console.log('received ' + n))
);
request1.pipe(
concatMap(n => request2(n).pipe(
concatMap(n => request3(n))
))
)
但是,当我订阅最后一段代码时,我只会得到最后一个请求的响应,这是预期的,因为管道解析为那个。
所以使用 concatMap()
,我可以正确地链接我的依赖 REST 调用,但无法跟踪进度。
虽然我可以通过嵌套订阅很容易地跟踪进度,但我正在努力避免这种情况并使用最佳实践方式。
如何链接我的依赖 REST 调用,但每次调用成功时仍然能够做一些事情?
最简单的解决方案是设置一个进度计数器变量,当每次响应返回时,它会通过点击更新。
let progressCounter = 0;
request1.pipe(
tap(_ => progressCounter = 0.33),
concatMap(n => request2(n).pipe(
tap(_ => progressCounter = 0.66),
concatMap(n => request3(n)
.pipe(tap(_ => progressCounter = 1)))
))
);
如果您希望进度本身是可观察的,那么您希望共享请求可观察对象而不是重复请求),然后将它们组合起来以获得进度。
有关您可能希望如何处理的示例,请访问:https://www.learnrxjs.io/recipes/progressbar.html
这是一个通用的解决方案,但并不那么简单。但它确实在避免使用 share
运算符的同时取得了可观察的进展,如果使用不当,可能会引入意想不到的状态。
const chainRequests = (firstRequestFn, ...otherRequestFns) => (
initialParams
) => {
return otherRequestFns.reduce(
(chain, nextRequestFn) =>
chain.pipe(op.concatMap((response) => nextRequestFn(response))),
firstRequestFn(initialParams)
);
};
chainRequests
采用可变数量的函数,return 是一个接受初始参数的函数,return 是一个可观察到的 concatMaps
函数,如手动所示这个问题。它通过将每个函数简化为恰好是可观察的累加值来实现这一点。
记住,如果我们知道路径,RxJS 会带领我们走出回调地狱。
const chainRequestsWithProgress = (...requestFns) => (initialParams) => {
const progress$ = new Rx.BehaviorSubject(0);
const wrappedFns = requestFns.map((fn, i) => (...args) =>
fn(...args).pipe(op.tap(() => progress$.next((i + 1) / requestFns.length)))
);
const chain$ = Rx.defer(() => {
progress$.next(0);
return chainRequests(...wrappedFns)(initialParams);
});
return [chain$, progress$];
};
chainRequestsWithProgress
return 有两个可观察对象——一个最终发出最后一个响应,一个在订阅第一个可观察对象时发出进度值。为此,我们创建了一个 BehaviorSubject
作为我们的进度值流,并将我们的每个请求函数包装到 return 与通常情况下相同的可观察对象,但我们也将其通过管道传输到 tap
这样它就可以将新的进度值推送到 BehaviorSubject
.
每次订阅第一个 observable 时进度都会清零。
如果您想 return 一个产生进度状态和最终结果值的可观察对象,您可以 chainRequestsWithProgress
而不是 return:
chain$.pipe(
op.startWith(null),
op.combineLatest(progress$, (result, progress) => ({ result, progress }))
)
并且您将拥有一个可观察对象,它会发出一个表示最终结果进展情况的对象,然后是该结果本身。深思 - progress$
是否必须只发出数字?
警告
这假设请求可观察对象只发出一个值。
我遇到了一个问题,我一直在尝试使用 RxJs
找到解决方案,但似乎找不到合适的解决方案...
- 我有 3 个不同的 REST 请求,它们将被顺序调用,每个请求都需要前一个请求的响应作为参数
- 我想实现一个进度条,它会随着请求的成功而递增
这是我的想法:
- 我将使用管道和
concatMap()
来避免嵌套订阅,并在前一个请求完成后订阅每个请求。
考虑这个非常简化的版本。假设每个 of
代表一个完整的 REST 成功请求(稍后将处理错误),并且我将使用 n
参数进行未显示的工作...
const request1 = of('success 1').pipe(
delay(500),
tap(n => console.log('received ' + n)),
);
const request2 = (n) => of('success 2').pipe(
delay(1000),
tap(n => console.log('received ' + n))
);
const request3 = (n) => of('success 3').pipe(
delay(400),
tap(n => console.log('received ' + n))
);
request1.pipe(
concatMap(n => request2(n).pipe(
concatMap(n => request3(n))
))
)
但是,当我订阅最后一段代码时,我只会得到最后一个请求的响应,这是预期的,因为管道解析为那个。
所以使用 concatMap()
,我可以正确地链接我的依赖 REST 调用,但无法跟踪进度。
虽然我可以通过嵌套订阅很容易地跟踪进度,但我正在努力避免这种情况并使用最佳实践方式。
如何链接我的依赖 REST 调用,但每次调用成功时仍然能够做一些事情?
最简单的解决方案是设置一个进度计数器变量,当每次响应返回时,它会通过点击更新。
let progressCounter = 0;
request1.pipe(
tap(_ => progressCounter = 0.33),
concatMap(n => request2(n).pipe(
tap(_ => progressCounter = 0.66),
concatMap(n => request3(n)
.pipe(tap(_ => progressCounter = 1)))
))
);
如果您希望进度本身是可观察的,那么您希望共享请求可观察对象而不是重复请求),然后将它们组合起来以获得进度。
有关您可能希望如何处理的示例,请访问:https://www.learnrxjs.io/recipes/progressbar.html
这是一个通用的解决方案,但并不那么简单。但它确实在避免使用 share
运算符的同时取得了可观察的进展,如果使用不当,可能会引入意想不到的状态。
const chainRequests = (firstRequestFn, ...otherRequestFns) => (
initialParams
) => {
return otherRequestFns.reduce(
(chain, nextRequestFn) =>
chain.pipe(op.concatMap((response) => nextRequestFn(response))),
firstRequestFn(initialParams)
);
};
chainRequests
采用可变数量的函数,return 是一个接受初始参数的函数,return 是一个可观察到的 concatMaps
函数,如手动所示这个问题。它通过将每个函数简化为恰好是可观察的累加值来实现这一点。
记住,如果我们知道路径,RxJS 会带领我们走出回调地狱。
const chainRequestsWithProgress = (...requestFns) => (initialParams) => {
const progress$ = new Rx.BehaviorSubject(0);
const wrappedFns = requestFns.map((fn, i) => (...args) =>
fn(...args).pipe(op.tap(() => progress$.next((i + 1) / requestFns.length)))
);
const chain$ = Rx.defer(() => {
progress$.next(0);
return chainRequests(...wrappedFns)(initialParams);
});
return [chain$, progress$];
};
chainRequestsWithProgress
return 有两个可观察对象——一个最终发出最后一个响应,一个在订阅第一个可观察对象时发出进度值。为此,我们创建了一个 BehaviorSubject
作为我们的进度值流,并将我们的每个请求函数包装到 return 与通常情况下相同的可观察对象,但我们也将其通过管道传输到 tap
这样它就可以将新的进度值推送到 BehaviorSubject
.
每次订阅第一个 observable 时进度都会清零。
如果您想 return 一个产生进度状态和最终结果值的可观察对象,您可以 chainRequestsWithProgress
而不是 return:
chain$.pipe(
op.startWith(null),
op.combineLatest(progress$, (result, progress) => ({ result, progress }))
)
并且您将拥有一个可观察对象,它会发出一个表示最终结果进展情况的对象,然后是该结果本身。深思 - progress$
是否必须只发出数字?
警告
这假设请求可观察对象只发出一个值。