我如何 return 具有回调中值的可观察对象?

How can I return an observable with a value that's in a callback?

我正在编写一个服务,我打算存储 Place 对象的本地副本,并仅在它们未存储在本地时从后端获取它们。但是,我在实现此功能时遇到了麻烦。如果来自 place() 的值是 undefined,我可以将我的页面设置为调用 fetchPlace(),但我打算将 fetchPlace() 保密,这样我可以稍后实现一个系统来检查最近是否发出请求,以便在用户快速切换页面时服务器不会被请求淹没。

places.service.ts

export class PlacesService {
  private _places = new BehaviorSubject<Place[]>([]);
  get places() {
    return this._places.asObservable();
  }

  constructor(private _http: HttpClient) {}

  place(placeId: number): Observable<Place> {
    return this._places.pipe(
      take(1),
      map((places: Place[]) => {
        console.log(places);
        let place = places.find((place: Place) => place.id === placeId);

        if (place === undefined) {
          console.log('Time to send a request!');
          this.fetchPlace(placeId).subscribe(
            (fetchedPlace: Place) => {
              console.log('We got one!');
              place = fetchedPlace;
              console.log(place);
            },
            (error) => {
              console.error('Looks like a 404.');
            },
          );
        }

        console.log('Okay, returning place now!');
        return place;
      }),
    );
  }

  private fetchPlace(placeId: number): Observable<Place> {
    return this._http
      .get<Place.ResponseBody>(`http://localhost:8000/v1/places/${placeId}/`)
      .pipe(map((response: Place.ResponseBody) => Place.create(response)));
  }
}

上面代码的问题在于,当变量place未定义时,对fetchPlace()的订阅被异步调用,所以placeplace 的值被 fetchedPlace 覆盖之前返回。我想要某种方法从 place() 函数返回一个包含 place 的可观察对象。

为完成起见,下面是上面代码的调用方式和控制台输出:

地点-detail.page.ts

ngOnInit() {
  this._route.paramMap.subscribe((paramMap: ParamMap) => {
    if (!paramMap.has('placeId')) {
      this._navCtrl.navigateBack('/places/discover');
      return;
    }

    const placeId = +paramMap.get('placeId');
    this._placesSub = this._placesSrv.place(placeId).subscribe(
      (place: Place) => {
        if (place === undefined) {
          console.log('Got here.');
        } else {
          this._isBookable = place.user !== this._authSrv.user;
          this._place = place;
        }
      },
      (error) => {
        console.error(error);
      }
    );
  });
}

控制台

Angular is running in development mode. Call enableProdMode() to enable production mode. core.js:26833
Native: tried calling StatusBar.styleDefault, but Cordova is not available. Make sure to include cordova.js or run in a device/simulator common.js:284
Native: tried calling SplashScreen.hide, but Cordova is not available. Make sure to include cordova.js or run in a device/simulator common.js:284
[WDS] Live Reloading enabled. client:52
Array []
places.service.ts:81:16
Time to send a request! places.service.ts:85:18
Okay, returning place now! places.service.ts:98:16
Got here. place-detail.page.ts:75:20
We got one! places.service.ts:88:22
Object { _id: 1, _user: 2, _title: "Manhattan Mansion", _description: "In the heart of New York City.", _imgUrl: "https://www.idesignarch.com/wp-content/uploads/New-York-Fifth-Avenue-Mansion_1.jpg", _price: "149.99", _availableFrom: Date Fri Dec 31 2021 18:00:00 GMT-0600 (Central Standard Time), _availableTo: Date Sat Dec 30 2023 18:00:00 GMT-0600 (Central Standard Time) }
places.service.ts:90:22

注意到您是如何在可观察流的流中调用 subscribe() 的吗?通常,我们希望避免这种情况,因为每次 observable 发出时,您都会创建一个新的内部订阅,并且没有真正清理这些订阅的好方法。

您要查找的是“Higher Order Mapping Operator", in this case switchMap.

使用 switchMap,您将传入的发射映射到 Observable。然后,switchMap 将订阅这个“内部可观察对象”并发出它的排放量。当收到新的发射时,先前的内部可观察对象将被取消订阅,新的将被订阅。因此,从本质上讲,它允许您“切换”可观察的来源。

在您的情况下,您有两个可能的来源,您现有的项目 (place.find(...)) 或新获取的结果 (this.fetchPlace(placeId))。

由于 switchMap 中的代码必须 return 一个可观察对象,returning fetchPlace(placeId) 没问题,因为它 return 是一个可观察对象。但是,现有项目不是可观察的,因此我们必须用 of 将其包装成一个。

使用 switchMap:

您的代码可能如下所示
place(placeId: number): Observable<Place> {
    return this._places.pipe(
      switchMap((places: Place[]) => {
        const place = places.find(place => place.id === placeId);

        return place ? of(place) : this.fetchPlace(placeId);
      }),
    );
  }

另外,请注意我删除了 take(1)。我想你不想要那个。原因如下:使用 observables 的主要优点是消费者总是可以推送最新的值。 take(1) 基本上只传递一个值。所以,如果你像你提到的那样实现你的缓存过期,如果组件 A 订阅 place(1) 并且它变得陈旧,然后组件 B 订阅导致重新获取,你希望组件 A 接收新获取的值,对吗?

看起来你想要缓存 API 调用,或者在 RxJs 世界中 - 与未来的订阅者共享源 Observable(而不是再次订阅它)。这是一个干净的例子:

service.ts:

cache: { [key:string]: Observable<any> } = {};

constructor(private http: HttpClient) {    }

memoisedGet(url: string) {    
  const source = this.http.get(url).pipe( 
    publishReplay(1),
    refCount()
  );
  this.cache[url] = this.cache[url] || source;

  return this.cache[url];
}  

component.ts:

memoisedGet(url: string = 'https://jsonplaceholder.typicode.com/todos/1') {
  this.myService.memoisedGet(url).subscribe(console.log);
}

有了这个,无论调用多少次 memoisedGet(),它都会触发一次 HTTP 调用,每次调用都会带来最后的结果。