rxjs publishReplay:错误重置

rxjs publishReplay: reset on Error

我正在使用 publishReplay(1).refCount() 在 Angular5 中缓存 http 请求。这工作正常 - 除了错误情况。

我吞下并报告来自 http 源的异常,这就是我在下面的代码中传递 Observable.of(undefined) 的原因。在那种情况下,它当然会缓存 undefined - 但后续订阅者不应该获得该值,而是应该再次请求 http 资源,就像没有缓存一样。

我当前的代码:

result: Observable<any>;
list() {

  if (this.result === undefined) {
    this.result = this.http.get(`https://swapi.co/api/people`)
      .catch(error => {
        console.error("Something really bad happened", error);
        return Observable.of(undefined);
        // in that case, reset publishReplay
      })
      .publishReplay(1)
      .refCount();
  }

  return this.result;
}

我有一个 stackblitz(使用 shareReplay,但同样的问题)来显示行为:https://stackblitz.com/edit/angular-s5zuqa。 单击 "Load" 应该重新请求 http 源,而不是从 catch 中提供缓存的对象。

所以如果我没理解错的话,你正在找这个,对吧?这里有两点需要注意:

  1. 我们使用 shareReplay(1) 而不是 publishReplay(1).refCount()
  2. catch 已移至 多播运算符 后面。

您会在第一时间看到(伪造的)HTTP 调用错误,发出 undefined(因为我们发现了它)。下一次,我们再次 运行 请求,这次得到的结果 (42) 是为第三次 运行 缓存的,不会引起另一个请求。

// Just to keep track of how often we sent the request already
let counter = 1;

// A fake HTTP request which errors the first time, then succeeds
const request$ = Rx.Observable.of(null)
  .do(() => console.log('HTTP Request (#' + counter + ')!'))
  .delay(250)
  .switchMap(() => counter++ === 1 
    ? Rx.Observable.throw('Error!')
    : Rx.Observable.of(42)
  );

// =========

const result$ = request$
  .shareReplay(1)
  .catch(() => Rx.Observable.of(undefined))
;

Rx.Observable.timer(0, 1000)
  .take(3)
  .do(() => console.log('Subscribing to result$…'))
  .switchMap(() => result$)
  .subscribe(value => console.log('Received value: ', value));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.min.js"></script>

谨慎使用shareReplay(1)。它在取消订阅和引用计数方面存在并发症。

正在向 shareReplay() 添加一个新的配置参数来解决这种情况。

这个问题在这里得到了更好的解释:

https://github.com/ReactiveX/rxjs/issues/3336

截至今天,新功能即将推出,但尚未发布。

https://github.com/ReactiveX/rxjs/pull/4059