如何链接 RxJS 可观察列表
How to chain a list of RxJS observable
对单页应用程序使用 Angular CLI 6
我需要做的是以下几点
发出 HTTP post 请求。我会得到一个好的回报。结果或副作用需要一段时间才能准备好进行处理。
我将不得不轮询结果准备情况,比如说每秒一次。
要轮询结果,我需要进行 HTTP get 调用并检查结果。如果结果完整,我就完成了。
不然我还要继续投票..
我所做的是,我有两个可观察对象,一个用于 HTTP post,一个用于 HTTP get 调用。
我使用 setTimeout 进行轮询。这个代码的组织,当我点击 setTimeout 时,我永远不会登陆我必须杀死应用程序...
关于这个问题有什么提示吗?
我目前所掌握的是
private initiateAnalysis(){
this.initiateRequest$()
.subscribe(response =>{
const error = getErrorMessage(response);
if (error !== null) {
console.error(error);
} else {
this.processResults();
}
},
(err: HttpErrorResponse) =>{
console.error('feature error:', err);
});
}
private initiateRequest$(): Observable<any>{
let params: any = {
};
return this.problemsService.postRequest('postURL', {}, params)
}
private checkForResponse$(): Observable<any>{
let params: any = {
};
return this.problemsService.getResults('someURL', params);
}
private processResults(){
this.doneWithNecRiskAnalysis = false;
while (!this.doneWithNecRiskAnalysis) {
setTimeout(() =>{
this.checkForResults(); // I never to this line in the code...
}, 1000);
}
}
private checkForResults() {
this.checkForResponse$()
.subscribe(response =>{
const error = getErrorMessage(response);
if (error !== null) {
console.error(error);
} else {
if (1 === 1) { // just for now
this.showResults(response.payload);
}
}
},
(err: HttpErrorResponse) =>{
console.error('feature error:', err);
});
}
private showResults(results) {
console.log('results', results);
}
while (!this.doneWithNecRiskAnalysis) {
setTimeout(() =>{
this.checkForResults(); // I never to this line in the code...
}, 1000);
}
这会不断要求服务器在 1 秒内检查结果。你永远不会等到下一秒再问。
为了使您的代码更简单、更清晰,我要做的第一件事就是重构您的服务(and/or 您的后端),以便它们的可观察对象发出 error 如果出现错误,即使有错误消息也不会发出正常消息。
剩下的会假设你已经完成了。
您也应该停止使用 any
类型。
那么代码可以简化为:
private initiateAnalysis() {
this.initiateRequest$().pipe( // sends the first request to start the analysis
switchMap(() => interval(1000)), // when the response comes back, start emitting an event every second from this observable
mergeMap(() => this.checkForResponse$()), // each second, send the GET request and emit the results. Merge all the results into the resulting observable
filter(results => this.isAnalysisComplete(results)), // filter out the results if they are not the final, correct results
first() // only take the first complete results, to avoid continuing sending GET requests
).subscribe(results => this.showResults(results));
}
private initiateRequest$(): Observable<void> {
const params = {
};
return this.problemsService.postRequest('postURL', {}, params)
}
private checkForResponse$(): Observable<Results>{
const params = {
};
return this.problemsService.getResults('someURL', params);
}
private showResults(results: Results) {
console.log('results', results);
}
如果您想在发送下一个请求之前等待上一个响应,您可能更喜欢使用 concatMap()
而不是 mergeMap()
。
这是一个演示,其中实际的 HTTP 请求被随机延迟和随机响应所取代:https://stackblitz.com/edit/angular-bt17fb?file=src%2Fapp%2Fapp.component.ts
对单页应用程序使用 Angular CLI 6
我需要做的是以下几点 发出 HTTP post 请求。我会得到一个好的回报。结果或副作用需要一段时间才能准备好进行处理。 我将不得不轮询结果准备情况,比如说每秒一次。
要轮询结果,我需要进行 HTTP get 调用并检查结果。如果结果完整,我就完成了。 不然我还要继续投票..
我所做的是,我有两个可观察对象,一个用于 HTTP post,一个用于 HTTP get 调用。 我使用 setTimeout 进行轮询。这个代码的组织,当我点击 setTimeout 时,我永远不会登陆我必须杀死应用程序...
关于这个问题有什么提示吗?
我目前所掌握的是
private initiateAnalysis(){
this.initiateRequest$()
.subscribe(response =>{
const error = getErrorMessage(response);
if (error !== null) {
console.error(error);
} else {
this.processResults();
}
},
(err: HttpErrorResponse) =>{
console.error('feature error:', err);
});
}
private initiateRequest$(): Observable<any>{
let params: any = {
};
return this.problemsService.postRequest('postURL', {}, params)
}
private checkForResponse$(): Observable<any>{
let params: any = {
};
return this.problemsService.getResults('someURL', params);
}
private processResults(){
this.doneWithNecRiskAnalysis = false;
while (!this.doneWithNecRiskAnalysis) {
setTimeout(() =>{
this.checkForResults(); // I never to this line in the code...
}, 1000);
}
}
private checkForResults() {
this.checkForResponse$()
.subscribe(response =>{
const error = getErrorMessage(response);
if (error !== null) {
console.error(error);
} else {
if (1 === 1) { // just for now
this.showResults(response.payload);
}
}
},
(err: HttpErrorResponse) =>{
console.error('feature error:', err);
});
}
private showResults(results) {
console.log('results', results);
}
while (!this.doneWithNecRiskAnalysis) {
setTimeout(() =>{
this.checkForResults(); // I never to this line in the code...
}, 1000);
}
这会不断要求服务器在 1 秒内检查结果。你永远不会等到下一秒再问。
为了使您的代码更简单、更清晰,我要做的第一件事就是重构您的服务(and/or 您的后端),以便它们的可观察对象发出 error 如果出现错误,即使有错误消息也不会发出正常消息。
剩下的会假设你已经完成了。
您也应该停止使用 any
类型。
那么代码可以简化为:
private initiateAnalysis() {
this.initiateRequest$().pipe( // sends the first request to start the analysis
switchMap(() => interval(1000)), // when the response comes back, start emitting an event every second from this observable
mergeMap(() => this.checkForResponse$()), // each second, send the GET request and emit the results. Merge all the results into the resulting observable
filter(results => this.isAnalysisComplete(results)), // filter out the results if they are not the final, correct results
first() // only take the first complete results, to avoid continuing sending GET requests
).subscribe(results => this.showResults(results));
}
private initiateRequest$(): Observable<void> {
const params = {
};
return this.problemsService.postRequest('postURL', {}, params)
}
private checkForResponse$(): Observable<Results>{
const params = {
};
return this.problemsService.getResults('someURL', params);
}
private showResults(results: Results) {
console.log('results', results);
}
如果您想在发送下一个请求之前等待上一个响应,您可能更喜欢使用 concatMap()
而不是 mergeMap()
。
这是一个演示,其中实际的 HTTP 请求被随机延迟和随机响应所取代:https://stackblitz.com/edit/angular-bt17fb?file=src%2Fapp%2Fapp.component.ts