Angular/RxJS6:如何防止HTTP重复请求?
Angular/RxJS 6: How to prevent duplicate HTTP requests?
目前有一个场景,共享服务中的一个方法被多个组件使用。此方法对将始终具有相同响应和 returns Observable 的端点进行 HTTP 调用。是否可以与所有订阅者共享第一个响应以防止重复的 HTTP 请求?
下面是上述场景的简化版本:
class SharedService {
constructor(private http: HttpClient) {}
getSomeData(): Observable<any> {
return this.http.get<any>('some/endpoint');
}
}
class Component1 {
constructor(private sharedService: SharedService) {
this.sharedService.getSomeData().subscribe(
() => console.log('do something...')
);
}
}
class Component2 {
constructor(private sharedService: SharedService) {
this.sharedService.getSomeData().subscribe(
() => console.log('do something different...')
);
}
}
在尝试了几种不同的方法后,我遇到了解决我的问题的方法,无论有多少订阅者都只发出一个 HTTP 请求:
class SharedService {
someDataObservable: Observable<any>;
constructor(private http: HttpClient) {}
getSomeData(): Observable<any> {
if (this.someDataObservable) {
return this.someDataObservable;
} else {
this.someDataObservable = this.http.get<any>('some/endpoint').pipe(share());
return this.someDataObservable;
}
}
}
我仍然愿意接受更有效的建议!
好奇者:share()
根据您简化的场景,我构建了一个工作示例,但有趣的部分是理解正在发生的事情。
首先,我构建了一个服务来模拟 HTTP 并避免进行真正的 HTTP 调用:
export interface SomeData {
some: {
data: boolean;
};
}
@Injectable()
export class HttpClientMockService {
private cpt = 1;
constructor() {}
get<T>(url: string): Observable<T> {
return of({
some: {
data: true,
},
}).pipe(
tap(() => console.log(`Request n°${this.cpt++} - URL "${url}"`)),
// simulate a network delay
delay(500)
) as any;
}
}
进入 AppModule
我已经将真正的 HttpClient 替换为使用模拟的:
{ provide: HttpClient, useClass: HttpClientMockService }
现在,共享服务:
@Injectable()
export class SharedService {
private cpt = 1;
public myDataRes$: Observable<SomeData> = this.http
.get<SomeData>("some-url")
.pipe(share());
constructor(private http: HttpClient) {}
getSomeData(): Observable<SomeData> {
console.log(`Calling the service for the ${this.cpt++} time`);
return this.myDataRes$;
}
}
如果从 getSomeData
方法你 return 一个新实例,你将有 2 个不同的可观察值。无论您是否使用共享。所以这里的想法是“准备”请求。 CFmyDataRes$
。这只是请求,后跟 share
。但它只被声明一次,并且 return 从 getSomeData
方法引用该引用。
现在,如果您从 2 个不同的组件订阅可观察对象(服务调用的结果),您将在控制台中看到以下内容:
Calling the service for the 1 time
Request n°1 - URL "some-url"
Calling the service for the 2 time
如您所见,我们对该服务进行了 2 次调用,但只发出了一次请求。
是啊!
如果您想确保一切都按预期工作,只需注释掉带有 .pipe(share())
:
的行
Calling the service for the 1 time
Request n°1 - URL "some-url"
Calling the service for the 2 time
Request n°2 - URL "some-url"
但是...远非理想。
模拟服务中的 delay
模拟网络延迟很酷。 但也隐藏了一个潜在的错误。
从 stackblitz 重现,转到组件 second
并取消对 setTimeout 的注释。它会在 1 秒后调用该服务。
我们注意到现在,即使我们使用服务中的 share
,我们也有以下内容:
Calling the service for the 1 time
Request n°1 - URL "some-url"
Calling the service for the 2 time
Request n°2 - URL "some-url"
为什么?因为当第一个组件订阅可观察对象时,由于延迟(或网络延迟),500 毫秒内没有任何反应。所以订阅在那段时间仍然有效。一旦 500 毫秒的延迟完成,observable 就完成了(它不是一个长寿命的 observable,就像一个 HTTP 请求 return 只有一个值,这个也是因为我们正在使用 of
)。
但是share
只不过是publish
和refCount
而已。 Publish 允许我们多播结果,而 refCount 允许我们在没有人监听 observable 时关闭订阅。
因此,对于您的 ,如果您的某个组件的创建时间晚于发出第一个请求所需的时间,您仍然会有另一个请求。
为了避免这种情况,我想不出任何绝妙的解决方案。使用多播我们必须使用 connect 方法,但具体在哪里呢?做一个条件和一个计数器来知道它是否是第一次调用?感觉不对。
所以这可能不是最好的主意,如果有人能提供更好的解决方案,我会很高兴,但与此同时,我们可以做些什么来保持 observable 的“活力”:
private infiniteStream$: Observable<any> = new Subject<void>().asObservable();
public myDataRes$: Observable<SomeData> = merge(
this
.http
.get<SomeData>('some-url'),
this.infiniteStream$
).pipe(shareReplay(1))
由于 infiniteStream$ 永远不会关闭,我们合并两个结果并使用 shareReplay(1)
,我们现在得到预期结果:
一次 HTTP 调用,即使对服务进行了多次调用。无论第一个请求需要多长时间。
这里有一个 Stackblitz 演示来说明所有这些:https://stackblitz.com/edit/angular-n9tvx7
尽管其他人在工作之前提出的解决方案,我发现必须在每个 class 中为每个不同的 get/post/put/delete
请求手动创建字段很烦人。
我的解决方案基本上基于两个想法:HttpService
管理所有 http 请求,PendingService
管理实际通过的请求。
这个想法不是拦截请求本身(我本可以为此使用 HttpInterceptor
,但为时已晚,因为已经创建了不同的请求实例),而是意图在提出请求之前提出请求。
所以基本上,所有请求都通过这个 PendingService
,其中包含 Set
待处理请求。如果一个请求(由它的 url 标识)不在那个集合中,这意味着这个请求是新的,我们必须调用 HttpClient
方法(通过回调)并将它保存为一个挂起的请求我们的集合,以 url 作为键,请求可观察作为值。
如果稍后有对同一个 url 的请求,我们将使用它的 url 在集合中再次检查,如果它是我们待定集合的一部分,则意味着... pending,所以我们 return 只是我们之前保存的 observable。
每当一个挂起的请求完成时,我们都会调用一个方法将其从集合中删除。
这是一个例子,假设我们正在请求...我不知道,吉娃娃?
这就是我们的小 ChihuahasService
:
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { HttpService } from '_services/http.service';
@Injectable({
providedIn: 'root'
})
export class ChihuahuasService {
private chihuahuas: Chihuahua[];
constructor(private httpService: HttpService) {
}
public getChihuahuas(): Observable<Chihuahua[]> {
return this.httpService.get('https://api.dogs.com/chihuahuas');
}
public postChihuahua(chihuahua: Chihuahua): Observable<Chihuahua> {
return this.httpService.post('https://api.dogs.com/chihuahuas', chihuahua);
}
}
类似这样的东西是 HttpService
:
import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';
import { share } from 'rxjs/internal/operators';
import { PendingService } from 'pending.service';
@Injectable({
providedIn: 'root'
})
export class HttpService {
constructor(private pendingService: PendingService,
private http: HttpClient) {
}
public get(url: string, options): Observable<any> {
return this.pendingService.intercept(url, this.http.get(url, options).pipe(share()));
}
public post(url: string, body: any, options): Observable<any> {
return this.pendingService.intercept(url, this.http.post(url, body, options)).pipe(share());
}
public put(url: string, body: any, options): Observable<any> {
return this.pendingService.intercept(url, this.http.put(url, body, options)).pipe(share());
}
public delete(url: string, options): Observable<any> {
return this.pendingService.intercept(url, this.http.delete(url, options)).pipe(share());
}
}
最后,PendingService
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { tap } from 'rxjs/internal/operators';
@Injectable()
export class PendingService {
private pending = new Map<string, Observable<any>>();
public intercept(url: string, request): Observable<any> {
const pendingRequestObservable = this.pending.get(url);
return pendingRequestObservable ? pendingRequestObservable : this.sendRequest(url, request);
}
public sendRequest(url, request): Observable<any> {
this.pending.set(url, request);
return request.pipe(tap(() => {
this.pending.delete(url);
}));
}
}
这样,即使有 6 个不同的组件调用 ChihuahasService.getChihuahuas()
,实际上只会发出一个请求,我们的狗 API 不会抱怨。
我相信它可以改进(我欢迎建设性的反馈)。希望有人觉得这有用。
这里已经有很多方法可以帮助你,但我会从另一个角度给你一个方法。
在 RxJS 中有一个叫做 BehaviorSubject 的东西可以很好地实现这一点。它基本上 return 是新订阅者之后的最后一个值。因此,您可以在应用程序加载时发出 HTTP 请求,并使用该值调用 BehaviorSubject 的 next() ,并且只要订阅者在那里,它就会立即 return 获取该值而不是发出新的 HTTP 请求。您还可以通过使用更新后的值调用 next 来重新检索值(更新时)。
关于 BehaviorSubject 的更多信息:
派对迟到了,但我创建了一个 reusable decorator specifically 来解决这个用例。它与此处发布的其他解决方案相比如何?
- 它抽象出所有样板逻辑,让您的应用程序代码保持整洁
- 它处理带有参数的方法并确保不共享对具有不同参数的方法的调用。
- 它提供了一种方法来配置
when
您想要共享底层可观察对象(参见文档)。
它是在我将用于各种 Angular 相关实用程序的保护伞下发布的。
安装它:
npm install @ngspot/rxjs --save-dev
使用它:
import { Share } from '@ngspot/rxjs/decorators';
class SharedService {
constructor(private http: HttpClient) {}
@Share()
getSomeData(): Observable<any> {
return this.http.get<any>('some/endpoint');
}
}
单例服务 & component.ts 与以前一样工作
- 确保您的服务是 singleton
- Return 一个新的 Observable,而不是 http.get Observable
- 第一次发出 HTTP 请求,保存响应并更新新的 observable
- 下次在没有 HTTP 请求的情况下更新 observable
.
class SharedService {
private savedResponse; //to return second time onwards
constructor(private http: HttpClient) {}
getSomeData(): Observable<any> {
return new Observable((observer) => {
if (this.savedResponse) {
observer.next(this.savedResponse);
observer.complete();
} else { /* make http request & process */
this.http.get('some/endpoint').subscribe(data => {
this.savedResponse = data;
observer.next(this.savedResponse);
observer.complete();
}); /* make sure to handle http error */
}
});
}
}
您可以通过在服务中放置一个随机数变量来验证单例。 console.log 应该从任何地方打印相同的数字!
/* singleton will have the same random number in all instances */
private random = Math.floor((Math.random() * 1000) + 1);
优点:此服务即使在本次更新后 returns 在两种情况下(http 或缓存)都可以观察到。
注意:确保此服务的提供程序没有单独添加到每个组件中。
我的解决方案是创建一个 HttpInterceptor,这样我就不需要在我的所有服务调用中添加代码
@Injectable({
providedIn: 'root'
})
export class DuplicateCallInterceptor implements HttpInterceptor {
private activeCalls: Map<string, Subject<any>> = new Map();
intercept(request: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
if (this.activeCalls.has(request.url)) {
const subject = this.activeCalls.get(request.url);
return subject.asObservable();
}
this.activeCalls.set(request.url, new Subject<any>());
return next.handle(request)
.pipe(
filter(res => res.type === HttpEventType.Response),
tap(res => {
const subject = this.activeCalls.get(request.url);
subject.next(res);
subject.complete();
this.activeCalls.delete(request.url);
})
)
}
}
目前有一个场景,共享服务中的一个方法被多个组件使用。此方法对将始终具有相同响应和 returns Observable 的端点进行 HTTP 调用。是否可以与所有订阅者共享第一个响应以防止重复的 HTTP 请求?
下面是上述场景的简化版本:
class SharedService {
constructor(private http: HttpClient) {}
getSomeData(): Observable<any> {
return this.http.get<any>('some/endpoint');
}
}
class Component1 {
constructor(private sharedService: SharedService) {
this.sharedService.getSomeData().subscribe(
() => console.log('do something...')
);
}
}
class Component2 {
constructor(private sharedService: SharedService) {
this.sharedService.getSomeData().subscribe(
() => console.log('do something different...')
);
}
}
在尝试了几种不同的方法后,我遇到了解决我的问题的方法,无论有多少订阅者都只发出一个 HTTP 请求:
class SharedService {
someDataObservable: Observable<any>;
constructor(private http: HttpClient) {}
getSomeData(): Observable<any> {
if (this.someDataObservable) {
return this.someDataObservable;
} else {
this.someDataObservable = this.http.get<any>('some/endpoint').pipe(share());
return this.someDataObservable;
}
}
}
我仍然愿意接受更有效的建议!
好奇者:share()
根据您简化的场景,我构建了一个工作示例,但有趣的部分是理解正在发生的事情。
首先,我构建了一个服务来模拟 HTTP 并避免进行真正的 HTTP 调用:
export interface SomeData {
some: {
data: boolean;
};
}
@Injectable()
export class HttpClientMockService {
private cpt = 1;
constructor() {}
get<T>(url: string): Observable<T> {
return of({
some: {
data: true,
},
}).pipe(
tap(() => console.log(`Request n°${this.cpt++} - URL "${url}"`)),
// simulate a network delay
delay(500)
) as any;
}
}
进入 AppModule
我已经将真正的 HttpClient 替换为使用模拟的:
{ provide: HttpClient, useClass: HttpClientMockService }
现在,共享服务:
@Injectable()
export class SharedService {
private cpt = 1;
public myDataRes$: Observable<SomeData> = this.http
.get<SomeData>("some-url")
.pipe(share());
constructor(private http: HttpClient) {}
getSomeData(): Observable<SomeData> {
console.log(`Calling the service for the ${this.cpt++} time`);
return this.myDataRes$;
}
}
如果从 getSomeData
方法你 return 一个新实例,你将有 2 个不同的可观察值。无论您是否使用共享。所以这里的想法是“准备”请求。 CFmyDataRes$
。这只是请求,后跟 share
。但它只被声明一次,并且 return 从 getSomeData
方法引用该引用。
现在,如果您从 2 个不同的组件订阅可观察对象(服务调用的结果),您将在控制台中看到以下内容:
Calling the service for the 1 time
Request n°1 - URL "some-url"
Calling the service for the 2 time
如您所见,我们对该服务进行了 2 次调用,但只发出了一次请求。
是啊!
如果您想确保一切都按预期工作,只需注释掉带有 .pipe(share())
:
Calling the service for the 1 time
Request n°1 - URL "some-url"
Calling the service for the 2 time
Request n°2 - URL "some-url"
但是...远非理想。
模拟服务中的 delay
模拟网络延迟很酷。 但也隐藏了一个潜在的错误。
从 stackblitz 重现,转到组件 second
并取消对 setTimeout 的注释。它会在 1 秒后调用该服务。
我们注意到现在,即使我们使用服务中的 share
,我们也有以下内容:
Calling the service for the 1 time
Request n°1 - URL "some-url"
Calling the service for the 2 time
Request n°2 - URL "some-url"
为什么?因为当第一个组件订阅可观察对象时,由于延迟(或网络延迟),500 毫秒内没有任何反应。所以订阅在那段时间仍然有效。一旦 500 毫秒的延迟完成,observable 就完成了(它不是一个长寿命的 observable,就像一个 HTTP 请求 return 只有一个值,这个也是因为我们正在使用 of
)。
但是share
只不过是publish
和refCount
而已。 Publish 允许我们多播结果,而 refCount 允许我们在没有人监听 observable 时关闭订阅。
因此,对于您的
为了避免这种情况,我想不出任何绝妙的解决方案。使用多播我们必须使用 connect 方法,但具体在哪里呢?做一个条件和一个计数器来知道它是否是第一次调用?感觉不对。
所以这可能不是最好的主意,如果有人能提供更好的解决方案,我会很高兴,但与此同时,我们可以做些什么来保持 observable 的“活力”:
private infiniteStream$: Observable<any> = new Subject<void>().asObservable();
public myDataRes$: Observable<SomeData> = merge(
this
.http
.get<SomeData>('some-url'),
this.infiniteStream$
).pipe(shareReplay(1))
由于 infiniteStream$ 永远不会关闭,我们合并两个结果并使用 shareReplay(1)
,我们现在得到预期结果:
一次 HTTP 调用,即使对服务进行了多次调用。无论第一个请求需要多长时间。
这里有一个 Stackblitz 演示来说明所有这些:https://stackblitz.com/edit/angular-n9tvx7
尽管其他人在工作之前提出的解决方案,我发现必须在每个 class 中为每个不同的 get/post/put/delete
请求手动创建字段很烦人。
我的解决方案基本上基于两个想法:HttpService
管理所有 http 请求,PendingService
管理实际通过的请求。
这个想法不是拦截请求本身(我本可以为此使用 HttpInterceptor
,但为时已晚,因为已经创建了不同的请求实例),而是意图在提出请求之前提出请求。
所以基本上,所有请求都通过这个 PendingService
,其中包含 Set
待处理请求。如果一个请求(由它的 url 标识)不在那个集合中,这意味着这个请求是新的,我们必须调用 HttpClient
方法(通过回调)并将它保存为一个挂起的请求我们的集合,以 url 作为键,请求可观察作为值。
如果稍后有对同一个 url 的请求,我们将使用它的 url 在集合中再次检查,如果它是我们待定集合的一部分,则意味着... pending,所以我们 return 只是我们之前保存的 observable。
每当一个挂起的请求完成时,我们都会调用一个方法将其从集合中删除。
这是一个例子,假设我们正在请求...我不知道,吉娃娃?
这就是我们的小 ChihuahasService
:
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { HttpService } from '_services/http.service';
@Injectable({
providedIn: 'root'
})
export class ChihuahuasService {
private chihuahuas: Chihuahua[];
constructor(private httpService: HttpService) {
}
public getChihuahuas(): Observable<Chihuahua[]> {
return this.httpService.get('https://api.dogs.com/chihuahuas');
}
public postChihuahua(chihuahua: Chihuahua): Observable<Chihuahua> {
return this.httpService.post('https://api.dogs.com/chihuahuas', chihuahua);
}
}
类似这样的东西是 HttpService
:
import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';
import { share } from 'rxjs/internal/operators';
import { PendingService } from 'pending.service';
@Injectable({
providedIn: 'root'
})
export class HttpService {
constructor(private pendingService: PendingService,
private http: HttpClient) {
}
public get(url: string, options): Observable<any> {
return this.pendingService.intercept(url, this.http.get(url, options).pipe(share()));
}
public post(url: string, body: any, options): Observable<any> {
return this.pendingService.intercept(url, this.http.post(url, body, options)).pipe(share());
}
public put(url: string, body: any, options): Observable<any> {
return this.pendingService.intercept(url, this.http.put(url, body, options)).pipe(share());
}
public delete(url: string, options): Observable<any> {
return this.pendingService.intercept(url, this.http.delete(url, options)).pipe(share());
}
}
最后,PendingService
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { tap } from 'rxjs/internal/operators';
@Injectable()
export class PendingService {
private pending = new Map<string, Observable<any>>();
public intercept(url: string, request): Observable<any> {
const pendingRequestObservable = this.pending.get(url);
return pendingRequestObservable ? pendingRequestObservable : this.sendRequest(url, request);
}
public sendRequest(url, request): Observable<any> {
this.pending.set(url, request);
return request.pipe(tap(() => {
this.pending.delete(url);
}));
}
}
这样,即使有 6 个不同的组件调用 ChihuahasService.getChihuahuas()
,实际上只会发出一个请求,我们的狗 API 不会抱怨。
我相信它可以改进(我欢迎建设性的反馈)。希望有人觉得这有用。
这里已经有很多方法可以帮助你,但我会从另一个角度给你一个方法。
在 RxJS 中有一个叫做 BehaviorSubject 的东西可以很好地实现这一点。它基本上 return 是新订阅者之后的最后一个值。因此,您可以在应用程序加载时发出 HTTP 请求,并使用该值调用 BehaviorSubject 的 next() ,并且只要订阅者在那里,它就会立即 return 获取该值而不是发出新的 HTTP 请求。您还可以通过使用更新后的值调用 next 来重新检索值(更新时)。
关于 BehaviorSubject 的更多信息:
派对迟到了,但我创建了一个 reusable decorator specifically 来解决这个用例。它与此处发布的其他解决方案相比如何?
- 它抽象出所有样板逻辑,让您的应用程序代码保持整洁
- 它处理带有参数的方法并确保不共享对具有不同参数的方法的调用。
- 它提供了一种方法来配置
when
您想要共享底层可观察对象(参见文档)。
它是在我将用于各种 Angular 相关实用程序的保护伞下发布的。
安装它:
npm install @ngspot/rxjs --save-dev
使用它:
import { Share } from '@ngspot/rxjs/decorators';
class SharedService {
constructor(private http: HttpClient) {}
@Share()
getSomeData(): Observable<any> {
return this.http.get<any>('some/endpoint');
}
}
单例服务 & component.ts 与以前一样工作
- 确保您的服务是 singleton
- Return 一个新的 Observable,而不是 http.get Observable
- 第一次发出 HTTP 请求,保存响应并更新新的 observable
- 下次在没有 HTTP 请求的情况下更新 observable
.
class SharedService {
private savedResponse; //to return second time onwards
constructor(private http: HttpClient) {}
getSomeData(): Observable<any> {
return new Observable((observer) => {
if (this.savedResponse) {
observer.next(this.savedResponse);
observer.complete();
} else { /* make http request & process */
this.http.get('some/endpoint').subscribe(data => {
this.savedResponse = data;
observer.next(this.savedResponse);
observer.complete();
}); /* make sure to handle http error */
}
});
}
}
您可以通过在服务中放置一个随机数变量来验证单例。 console.log 应该从任何地方打印相同的数字!
/* singleton will have the same random number in all instances */
private random = Math.floor((Math.random() * 1000) + 1);
优点:此服务即使在本次更新后 returns 在两种情况下(http 或缓存)都可以观察到。
注意:确保此服务的提供程序没有单独添加到每个组件中。
我的解决方案是创建一个 HttpInterceptor,这样我就不需要在我的所有服务调用中添加代码
@Injectable({
providedIn: 'root'
})
export class DuplicateCallInterceptor implements HttpInterceptor {
private activeCalls: Map<string, Subject<any>> = new Map();
intercept(request: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
if (this.activeCalls.has(request.url)) {
const subject = this.activeCalls.get(request.url);
return subject.asObservable();
}
this.activeCalls.set(request.url, new Subject<any>());
return next.handle(request)
.pipe(
filter(res => res.type === HttpEventType.Response),
tap(res => {
const subject = this.activeCalls.get(request.url);
subject.next(res);
subject.complete();
this.activeCalls.delete(request.url);
})
)
}
}