Angular/RxJS6:如何防止HTTP重复请求?

Angular/RxJS 6: How to prevent duplicate HTTP requests?

目前有一个场景,共享服务中的一个方法被多个组件使用。此方法对将始终具有相同响应和 returns Observable 的端点进行 HTTP 调用。是否可以与所有订阅者共享第一个响应以防止重复的 HTTP 请求?

下面是上述场景的简化版本:

class SharedService {
  constructor(private http: HttpClient) {}

  getSomeData(): Observable<any> {
    return this.http.get<any>('some/endpoint');
  }
}

class Component1 {
  constructor(private sharedService: SharedService) {
    this.sharedService.getSomeData().subscribe(
      () => console.log('do something...')
    );
  }
}

class Component2 {
  constructor(private sharedService: SharedService) {
    this.sharedService.getSomeData().subscribe(
      () => console.log('do something different...')
    );
  }
}

在尝试了几种不同的方法后,我遇到了解决我的问题的方法,无论有多少订阅者都只发出一个 HTTP 请求:

class SharedService {
  someDataObservable: Observable<any>;

  constructor(private http: HttpClient) {}

  getSomeData(): Observable<any> {
    if (this.someDataObservable) {
      return this.someDataObservable;
    } else {
      this.someDataObservable = this.http.get<any>('some/endpoint').pipe(share());
      return this.someDataObservable;
    }
  }
}

我仍然愿意接受更有效的建议!

好奇者:share()

根据您简化的场景,我构建了一个工作示例,但有趣的部分是理解正在发生的事情。

首先,我构建了一个服务来模拟 HTTP 并避免进行真正的 HTTP 调用:

export interface SomeData {
  some: {
    data: boolean;
  };
}

@Injectable()
export class HttpClientMockService {
  private cpt = 1;

  constructor() {}

  get<T>(url: string): Observable<T> {
    return of({
      some: {
        data: true,
      },
    }).pipe(
      tap(() => console.log(`Request n°${this.cpt++} - URL "${url}"`)),
      // simulate a network delay
      delay(500)
    ) as any;
  }
}

进入 AppModule 我已经将真正的 HttpClient 替换为使用模拟的:

    { provide: HttpClient, useClass: HttpClientMockService }

现在,共享服务:

@Injectable()
export class SharedService {
  private cpt = 1;

  public myDataRes$: Observable<SomeData> = this.http
    .get<SomeData>("some-url")
    .pipe(share());

  constructor(private http: HttpClient) {}

  getSomeData(): Observable<SomeData> {
    console.log(`Calling the service for the ${this.cpt++} time`);
    return this.myDataRes$;
  }
}

如果从 getSomeData 方法你 return 一个新实例,你将有 2 个不同的可观察值。无论您是否使用共享。所以这里的想法是“准备”请求。 CFmyDataRes$。这只是请求,后跟 share。但它只被声明一次,并且 return 从 getSomeData 方法引用该引用。

现在,如果您从 2 个不同的组件订阅可观察对象(服务调用的结果),您将在控制台中看到以下内容:

Calling the service for the 1 time
Request n°1 - URL "some-url"
Calling the service for the 2 time

如您所见,我们对该服务进行了 2 次调用,但只发出了一次请求。

是啊!

如果您想确保一切都按预期工作,只需注释掉带有 .pipe(share()):

的行
Calling the service for the 1 time
Request n°1 - URL "some-url"
Calling the service for the 2 time
Request n°2 - URL "some-url"

但是...远非理想。

模拟服务中的 delay 模拟网络延迟很酷。 但也隐藏了一个潜在的错误

从 stackblitz 重现,转到组件 second 并取消对 setTimeout 的注释。它会在 1 秒后调用该服务。

我们注意到现在,即使我们使用服务中的 share,我们也有以下内容:

Calling the service for the 1 time
Request n°1 - URL "some-url"
Calling the service for the 2 time
Request n°2 - URL "some-url"

为什么?因为当第一个组件订阅可观察对象时,由于延迟(或网络延迟),500 毫秒内没有任何反应。所以订阅在那段时间仍然有效。一旦 500 毫秒的延迟完成,observable 就完成了(它不是一个长寿命的 observable,就像一个 HTTP 请求 return 只有一个值,这个也是因为我们正在使用 of)。

但是share只不过是publishrefCount而已。 Publish 允许我们多播结果,而 refCount 允许我们在没有人监听 observable 时关闭订阅。

因此,对于您的 ,如果您的某个组件的创建时间晚于发出第一个请求所需的时间,您仍然会有另一个请求。

为了避免这种情况,我想不出任何绝妙的解决方案。使用多播我们必须使用 connect 方法,但具体在哪里呢?做一个条件和一个计数器来知道它是否是第一次调用?感觉不对。

所以这可能不是最好的主意,如果有人能提供更好的解决方案,我会很高兴,但与此同时,我们可以做些什么来保持 observable 的“活力”:

      private infiniteStream$: Observable<any> = new Subject<void>().asObservable();
      
      public myDataRes$: Observable<SomeData> = merge(
        this
          .http
          .get<SomeData>('some-url'),
        this.infiniteStream$
      ).pipe(shareReplay(1))

由于 infiniteStream$ 永远不会关闭,我们合并两个结果并使用 shareReplay(1),我们现在得到预期结果:

一次 HTTP 调用,即使对服务进行了多次调用。无论第一个请求需要多长时间。

这里有一个 Stackblitz 演示来说明所有这些:https://stackblitz.com/edit/angular-n9tvx7

尽管其他人在工作之前提出的解决方案,我发现必须在每个 class 中为每个不同的 get/post/put/delete 请求手动创建字段很烦人。

我的解决方案基本上基于两个想法:HttpService 管理所有 http 请求,PendingService 管理实际通过的请求。

这个想法不是拦截请求本身(我本可以为此使用 HttpInterceptor,但为时已晚,因为已经创建了不同的请求实例),而是意图在提出请求之前提出请求。

所以基本上,所有请求都通过这个 PendingService,其中包含 Set 待处理请求。如果一个请求(由它的 url 标识)不在那个集合中,这意味着这个请求是新的,我们必须调用 HttpClient 方法(通过回调)并将它保存为一个挂起的请求我们的集合,以 url 作为键,请求可观察作为值。

如果稍后有对同一个 url 的请求,我们将使用它的 url 在集合中再次检查,如果它是我们待定集合的一部分,则意味着... pending,所以我们 return 只是我们之前保存的 observable。

每当一个挂起的请求完成时,我们都会调用一个方法将其从集合中删除。

这是一个例子,假设我们正在请求...我不知道,吉娃娃?

这就是我们的小 ChihuahasService:

import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { HttpService } from '_services/http.service';

@Injectable({
    providedIn: 'root'
})
export class ChihuahuasService {

    private chihuahuas: Chihuahua[];

    constructor(private httpService: HttpService) {
    }

    public getChihuahuas(): Observable<Chihuahua[]> {
        return this.httpService.get('https://api.dogs.com/chihuahuas');
    }

    public postChihuahua(chihuahua: Chihuahua): Observable<Chihuahua> {
        return this.httpService.post('https://api.dogs.com/chihuahuas', chihuahua);
    }

}

类似这样的东西是 HttpService:

import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';
import { share } from 'rxjs/internal/operators';
import { PendingService } from 'pending.service';

@Injectable({
    providedIn: 'root'
})
export class HttpService {

    constructor(private pendingService: PendingService,
                private http: HttpClient) {
    }

    public get(url: string, options): Observable<any> {
        return this.pendingService.intercept(url, this.http.get(url, options).pipe(share()));
    }

    public post(url: string, body: any, options): Observable<any> {
        return this.pendingService.intercept(url, this.http.post(url, body, options)).pipe(share());
    }

    public put(url: string, body: any, options): Observable<any> {
        return this.pendingService.intercept(url, this.http.put(url, body, options)).pipe(share());
    }

    public delete(url: string, options): Observable<any> {
        return this.pendingService.intercept(url, this.http.delete(url, options)).pipe(share());
    }
    
}

最后,PendingService

import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { tap } from 'rxjs/internal/operators';

@Injectable()
export class PendingService {

    private pending = new Map<string, Observable<any>>();

    public intercept(url: string, request): Observable<any> {
        const pendingRequestObservable = this.pending.get(url);
        return pendingRequestObservable ? pendingRequestObservable : this.sendRequest(url, request);
    }

    public sendRequest(url, request): Observable<any> {
        this.pending.set(url, request);
        return request.pipe(tap(() => {
            this.pending.delete(url);
        }));
    }
    
}

这样,即使有 6 个不同的组件调用 ChihuahasService.getChihuahuas(),实际上只会发出一个请求,我们的狗 API 不会抱怨。

我相信它可以改进(我欢迎建设性的反馈)。希望有人觉得这有用。

这里已经有很多方法可以帮助你,但我会从另一个角度给你一个方法。

在 RxJS 中有一个叫做 BehaviorSubject 的东西可以很好地实现这一点。它基本上 return 是新订阅者之后的最后一个值。因此,您可以在应用程序加载时发出 HTTP 请求,并使用该值调用 BehaviorSubject 的 next() ,并且只要订阅者在那里,它就会立即 return 获取该值而不是发出新的 HTTP 请求。您还可以通过使用更新后的值调用 next 来重新检索值(更新时)。

关于 BehaviorSubject 的更多信息:

派对迟到了,但我创建了一个 reusable decorator specifically 来解决这个用例。它与此处发布的其他解决方案相比如何?

  • 它抽象出所有样板逻辑,让您的应用程序代码保持整洁
  • 它处理带有参数的方法并确保不共享对具有不同参数的方法的调用。
  • 它提供了一种方法来配置when您想要共享底层可观察对象(参见文档)。

它是在我将用于各种 Angular 相关实用程序的保护伞下发布的。

安装它:

npm install @ngspot/rxjs --save-dev

使用它:

import { Share } from '@ngspot/rxjs/decorators';

class SharedService {
  constructor(private http: HttpClient) {}

  @Share()
  getSomeData(): Observable<any> {
    return this.http.get<any>('some/endpoint');
  }
}

单例服务 & component.ts 与以前一样工作

  1. 确保您的服务是 singleton
  2. Return 一个新的 Observable,而不是 http.get Observable
  3. 第一次发出 HTTP 请求,保存响应并更新新的 observable
  4. 下次在没有 HTTP 请求的情况下更新 observable

.

class SharedService {

    private savedResponse; //to return second time onwards

    constructor(private http: HttpClient) {}

    getSomeData(): Observable<any> {

      return new Observable((observer) => {

        if (this.savedResponse) {

          observer.next(this.savedResponse);
          observer.complete();

        } else { /* make http request & process */
          
          this.http.get('some/endpoint').subscribe(data => {
            this.savedResponse = data; 
            observer.next(this.savedResponse);
            observer.complete();
          }); /* make sure to handle http error */

        }

      });
    }
  }

您可以通过在服务中放置一个随机数变量来验证单例。 console.log 应该从任何地方打印相同的数字!

    /* singleton will have the same random number in all instances */
    private random = Math.floor((Math.random() * 1000) + 1);

优点:此服务即使在本次更新后 returns 在两种情况下(http 或缓存)都可以观察到。

注意:确保此服务的提供程序没有单独添加到每个组件中。

我的解决方案是创建一个 HttpInterceptor,这样我就不需要在我的所有服务调用中添加代码

@Injectable({
  providedIn: 'root'
})
export class DuplicateCallInterceptor implements HttpInterceptor {

  private activeCalls: Map<string, Subject<any>> = new Map();

  intercept(request: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
    if (this.activeCalls.has(request.url)) {
      const subject = this.activeCalls.get(request.url);
      return subject.asObservable();
    }
    this.activeCalls.set(request.url, new Subject<any>());
    return next.handle(request)
      .pipe(
        filter(res => res.type === HttpEventType.Response),
        tap(res => {
          const subject = this.activeCalls.get(request.url);
          subject.next(res);
          subject.complete();
          this.activeCalls.delete(request.url);
        })
      )
  }
}