具有内部订阅的 Rxjs observable 需要很长时间,并且外部 observable 在后续请求中首先执行
Rxjs observable with Inner subscription taking long time and outer observable executed firstly in subsequent requests
我有一个启动 Observable 的按钮,然后我订阅结果并嵌套我有一个订阅取决于之前的 Observable 结果。
this.getData(params).subscribe(result => {
// Make some first level process
console.log('[Outter Execution]: ',result);
this.getInnerData(result).subscribe(res => {
// Make some inner level process
console.log('[Inner Execution]: ',res);
});
});
取决于用户的点击速度,顺序不一样:
//User click slowly
[Outer Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]
//User start clicking quickly
[Outer Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]
[Outer Execution]
[Outer Execution]
[Outer Execution]
[Inner Execution]
[Inner Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]
如您所见,如果嵌套订阅耗时较长,用户在解决内部订阅之前再次点击,则在解决内部执行之前,第一个[外部执行]消息会被注销。一段时间后,解决了之前长时间的内部订阅,并返回了记录的消息。
我尝试使用 switchMap 和 mergeMap 运算符但没有成功。
[已编辑]:我需要的功能是作为一个块执行,下一次外部执行需要在第一次执行完成后执行(外部和内部订阅)。
使用 RxJS 的方法有很多种,这里没有具体的示例,很难为您提供具体的答案。我能做的就是从我的代码中为您提供一个示例,并希望它接近您要完成的目标。
我有他们供应商的产品。当我得到一个产品时,它有一系列供应商 ID。所以我需要获得产品,然后在该操作中找到供应商。 (听起来与您的场景相似?)
这是我的代码:
selectedProductSuppliers$ = this.selectedProduct$
.pipe(
filter(selectedProduct => Boolean(selectedProduct)),
switchMap(selectedProduct =>
forkJoin(selectedProduct.supplierIds.map(supplierId => this.http.get<Supplier>(`${this.suppliersUrl}/${supplierId}`)))
),
tap(suppliers => console.log('product suppliers', JSON.stringify(suppliers)))
);
此代码使用 switchMap
来确保如果用户多次单击它会切换到最新的选择。
然后它使用 forkJoin
来处理每个供应商 ID,发出一个 http get 请求来检索每个供应商数据并将它们加入一个发出的数组中。
return 值是 Observable<Supplier[]>
您可以在此处找到完整示例:https://github.com/DeborahK/Angular-RxJS
更新:
如果您将上面的代码更改为使用 concatMap
而不是 switchMap
,它将处理每个请求,等待一个完成后再执行下一个。
为了确保处理等待,你需要在主Observable上使用concatMap
,而不是在依赖的Observable上。否则不等待集合。
我在您的最新更改之前进行了分叉,但这是我想出的似乎有效的方法:
private clickSubject = new Subject<number>();
clickAction$ = this.clickSubject.asObservable();
ngOnInit() {
this.clickAction$
.pipe(
concatMap(value => this.mainProcess(value)
.pipe(
mergeMap(x => this.dependantProcess(x))
)
)
)
.subscribe();
}
onClick(value) {
// Emits a value into the action stream
this.clickSubject.next(value);
}
mainProcess(value) {
console.log("[Emitted] Main", value);
return of(value).pipe(delay(10));
}
dependantProcess(value) {
console.log("[Emitted] Dependent", value);
return of(value).pipe(delay(2000));
}
注意 concatMap
用于等待 mainProcess。
此代码通过使用 RxJS Subject
.
定义操作流来响应用户点击
每次用户单击按钮时,都会发出操作流。
动作流管道使用concatMap
缓存请求并等待处理它,直到处理完先前的请求。
当主进程发出时,从属进程执行。
当管道完成(主进程及其依赖进程)时,下一个缓存请求将由 concatMap
处理。
有道理吗?
您可以在此处找到更新后的代码:
https://stackblitz.com/edit/angular-dependent-order-deborahk
我有一个启动 Observable 的按钮,然后我订阅结果并嵌套我有一个订阅取决于之前的 Observable 结果。
this.getData(params).subscribe(result => {
// Make some first level process
console.log('[Outter Execution]: ',result);
this.getInnerData(result).subscribe(res => {
// Make some inner level process
console.log('[Inner Execution]: ',res);
});
});
取决于用户的点击速度,顺序不一样:
//User click slowly
[Outer Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]
//User start clicking quickly
[Outer Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]
[Outer Execution]
[Outer Execution]
[Outer Execution]
[Inner Execution]
[Inner Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]
如您所见,如果嵌套订阅耗时较长,用户在解决内部订阅之前再次点击,则在解决内部执行之前,第一个[外部执行]消息会被注销。一段时间后,解决了之前长时间的内部订阅,并返回了记录的消息。
我尝试使用 switchMap 和 mergeMap 运算符但没有成功。
[已编辑]:我需要的功能是作为一个块执行,下一次外部执行需要在第一次执行完成后执行(外部和内部订阅)。
使用 RxJS 的方法有很多种,这里没有具体的示例,很难为您提供具体的答案。我能做的就是从我的代码中为您提供一个示例,并希望它接近您要完成的目标。
我有他们供应商的产品。当我得到一个产品时,它有一系列供应商 ID。所以我需要获得产品,然后在该操作中找到供应商。 (听起来与您的场景相似?)
这是我的代码:
selectedProductSuppliers$ = this.selectedProduct$
.pipe(
filter(selectedProduct => Boolean(selectedProduct)),
switchMap(selectedProduct =>
forkJoin(selectedProduct.supplierIds.map(supplierId => this.http.get<Supplier>(`${this.suppliersUrl}/${supplierId}`)))
),
tap(suppliers => console.log('product suppliers', JSON.stringify(suppliers)))
);
此代码使用 switchMap
来确保如果用户多次单击它会切换到最新的选择。
然后它使用 forkJoin
来处理每个供应商 ID,发出一个 http get 请求来检索每个供应商数据并将它们加入一个发出的数组中。
return 值是 Observable<Supplier[]>
您可以在此处找到完整示例:https://github.com/DeborahK/Angular-RxJS
更新:
如果您将上面的代码更改为使用 concatMap
而不是 switchMap
,它将处理每个请求,等待一个完成后再执行下一个。
为了确保处理等待,你需要在主Observable上使用concatMap
,而不是在依赖的Observable上。否则不等待集合。
我在您的最新更改之前进行了分叉,但这是我想出的似乎有效的方法:
private clickSubject = new Subject<number>();
clickAction$ = this.clickSubject.asObservable();
ngOnInit() {
this.clickAction$
.pipe(
concatMap(value => this.mainProcess(value)
.pipe(
mergeMap(x => this.dependantProcess(x))
)
)
)
.subscribe();
}
onClick(value) {
// Emits a value into the action stream
this.clickSubject.next(value);
}
mainProcess(value) {
console.log("[Emitted] Main", value);
return of(value).pipe(delay(10));
}
dependantProcess(value) {
console.log("[Emitted] Dependent", value);
return of(value).pipe(delay(2000));
}
注意 concatMap
用于等待 mainProcess。
此代码通过使用 RxJS
Subject
. 定义操作流来响应用户点击
每次用户单击按钮时,都会发出操作流。
动作流管道使用
concatMap
缓存请求并等待处理它,直到处理完先前的请求。当主进程发出时,从属进程执行。
当管道完成(主进程及其依赖进程)时,下一个缓存请求将由
concatMap
处理。
有道理吗?
您可以在此处找到更新后的代码:
https://stackblitz.com/edit/angular-dependent-order-deborahk