即使没有订阅者也保持 observable 连接

Keep observable connected even with no subscribers

编辑 2:shareReplay(1) 的效果更好,如 .

的更新所述

编辑 1:最终效果最好的是:

@Injectable()
export class MyGlobalService {

  private resource$;
  private resource$Connected;

  constructor(private http: Http) {
    this.resource$Connected = false;
    this.resource$ = this.http
      .get('/api/resource')
      .map((res: Response) => res.json())
      .publishReplay(1);
  } 

  getResource(): Observable<any> {
    if (!this.resource$Connected) {
      this.resource$.connect();
      this.resource$Connected = true;
    }
    return this.resource$;
  }

}

它只进行一次 AJAX 调用,直到某些消费者需要资源时才会调用。


原始问题:

我正在尝试缓存 angular HTTP 调用并将最新结果多播给所有当前和未来的订阅者。 ajax 结果在应用程序生命周期内不会改变,因此我不想对我已有的资源进行任何额外调用。因此,即使所有订阅者都取消订阅,我也希望它继续保持 "connected"。这可能吗?

我最初尝试的是这样的:

// in a global service 

getResource(): Observable<any> {
  return this.http
    .get('/api/resource')
    .map((res: Response) => res.json())
    .publishLast()
    .refCount();
}

这适用于同一组件中的多个 async 管道,但如果该组件被销毁(因此 refCount 变为 0),HTTP 请求将在稍后实例化时重复组件。

为了解决这个问题,我开始手动缓存结果:

resourceResults: any;

getResource(): Observable<any> {
  if (resourceResults) {
    return Observable.of(this.resourceResults);
  }
  return this.http
    .get('/api/resource')
    .map((res: Response) => res.json())
    .do(x => this.resourceResults = x)
    .publishLast()
    .refCount();
}

这很好用,但我觉得有更多的 rx 方法可以做到。

我试过使用 connect(),但这似乎遇到了与我的第一个示例相同的问题。一旦所有订阅者都取消订阅,使用 connect() 会导致 HTTP 请求再次发生

resource$ = this.http
  .get('/api/resource')
  .map((res: Response) => res.json())
  .publishLast()
  .refCount();

getResource(): Observable<any> {
  this.resource$.connect();
  return this.resource$;
}

有什么想法吗?

publishReplay/connect 有效。这里是 the working plunker:

import {Injectable} from '@angular/core';
import {Http, Response} from '@angular/http';
import {Observable} from "rxjs/Observable";

@Injectable()
export class YourService {
    resource:Observable<any>;

    constructor(private http: Http) {
        this.resource = this.http.get('https://api.github.com/users/karser')
            .map((res: Response) => res.json())
            .do(res => console.log('response', res))
            .publishReplay(1);
        this.resource.connect();
    }
}

输出:

Subscribing
response Object {login: "karser", id: 1675033…}
Unscibscribed
Subscribing once again

更新:RxJS 5.4 shareReplay operator which apparently does the same thing. See the updated plunkr

this.http.get('https://api.github.com/users/karser')
    .map((res: Response) => res.json())
    .shareReplay(1);

来自pull request

shareReplay returns an observable that is the source multicasted over a ReplaySubject. That replay subject is recycled on error from the source, but not on completion of the source. This makes shareReplay ideal for handling things like caching AJAX results, as it's retryable. It's repeat behavior, however, differs from share in that it will not repeat the source observable, rather it will repeat the source observable's values.

首先,如果您想在整个应用程序生命周期中共享此响应,您应该将其放入服务中并确保所有组件都共享同一个实例。

理论上您可以使源 Observable 永远不完成。更准确地说,你可以让链条永远不会传播完整的信号,但不推荐这样做,如果你试图将这个 Observable 与 forkJoin()toArray() 等运算符一起使用,可能会导致意外行为类似的需要源 Observable 才能正确完成。

相反,您可以使用 publishReplay(1) 来保留源 Observable 发出的最后一个值,refCount() 在查询缓存时保持对源 Observable 的单个订阅,然后 take(1) 只接受一个值并完成。

const cache = this.http.get('/api/resource')
  .map((res: Response) => res.json())
  .publishReplay(1)
  .refCount()
  .take(1);

因此您可以一次订阅多个观察者,并且只会执行一个 HTTP 请求。然后任何后续订阅将命中已经具有缓存值的 .publishReplay(1),该缓存值会立即传播,并且 take(1) 会立即完成链。因此不会订阅 this.http.get('/api/resource'),因此不会发出任何请求。

不要使用 refCount。手动创建可连接的可观察对象,进行连接,订阅它,可连接的 ReplaySubject 将保留最后发布的值。类似的东西:

let obs$ = Rx.Observable.interval(5000)
  .multicast(new Rx.ReplaySubject(1));

obs$.connect();

// unsuscribe when service is destroyed???
let mainSubecription = obs$.subscribe(x=>console.log(x));

// create subscription (like async pipe)
let subscription;
setTimeout(()=> {
  subscription = obs$.subscribe(x=>console.log('second', x));
}, 15000);


// remove subscription (like async pipe)
setTimeout(()=> {
 subscription.unsubscribe();
}, 40000);

// the main subscription still gets data