Angular 已缓存,多播可观察,在订阅之前不要进行 HTTP 调用
Angular cached, multicast observable, don't make HTTP call until subscribed
我有一个使用可观察对象公开某些全局状态的服务。必须通过进行 HTTP 调用来初始化状态。我希望状态的所有消费者都收到相同的值,当他们订阅可观察对象时收到当前值,并在状态更改时收到更新。
这种情况通常使用 Subject
处理,可以通过将其订阅到 HTTP 请求返回的可观察对象来初始化。
但是,我不希望在第一次订阅全局状态可观察对象之前执行 HTTP 请求。如果使用 Subject
的 subscribe
方法,则调用 HTTP 请求。
我在服务中有以下代码。这行得通,但似乎应该有更简单的方法来做到这一点。
private initialized = false;
private readonly subject = new ReplaySubject<string>(1);
constructor(private readonly httpClient: HttpClient) {}
get data$(): Observable<string> {
return new Observable<string>(subscriber => {
if (!this.initialized) {
this.refresh();
}
let subscription = this.subject.subscribe(value => subscriber.next(value));
return () => subscription.unsubscribe();
});
}
refresh(): void {
this.httpClient.get<string>('https://...')
.pipe(
tap(() => this.initialized = true)
)
.subscribe(value => this.subject.next(value));
}
沙盒:https://codesandbox.io/s/reverent-almeida-d2zbjx?file=/src/app/test.service.ts
我不确定您的 data$()
方法中的订阅和取消订阅在做什么。这个更简单的版本(返回subject.asObservable()
)是否也符合您的要求?
get data$(): Observable<string> {
if (!this.initialized) {
this.refresh();
}
return this.subject.asObservable();
}
在类似情况下,我倾向于创建一个像这样工作的服务。
- 它公开了触发命令执行的方法,例如执行 http 调用
- 它公开了 Observables,客户可以在他们想要对变化做出反应时订阅这些 Observables
- 内部使用 Subjects 在 public Observables 执行命令时触发通知 returns 某些事情(或错误)
所以,在代码中,它看起来像这样
export class MyService {
private _subject = new ReplySubject<any>(1);
public data$ = this._subject.asObservable();
constructor(private readonly httpClient: HttpClient) {}
public executeRemoteService(inputParams: any) {
// the request is executed as soon as a client calls this method
// the subject notifies, errors and completes mirroring what the
// http client does
this.httpClient.get<string>('https://...')
.pipe(
tap(this.data$)
)
.subscribe()
}
}
任何对 data$
感兴趣的客户端,即对全局状态的变化感兴趣,都必须订阅 data$
,这随时都可能发生。
什么时候触发服务的执行,任何组件都可以调用 executeRemoteService
方法的执行,然后触发 data$
中的通知。
以下应该可以完成您想要的。 data$ 字段应该是两个 observable 的合并。第一个将是初始 http 调用,另一个将监听来自 refreshSubject 的发射以再次调用该服务。
在第一次订阅完成之前不会执行初始调用,任何对 refreshSubject 的调用也将被忽略。
最后,使用shareReplay 将使所有后续订阅者获得最后的结果。使用 refCount 参数,以便如果订阅者计数变为零,则订阅完全取消订阅,下一个订阅者将再次发起第一次调用。如果这种行为是不可取的,你可以只使用 shareReplay(1)
它总是有最后的结果并继续听 refreshSubject.
readonly refreshSubject = new subject<void>();
readonly data$ = merge(
this.httpClient.get<string>('https://...'),
this.refreshSubject.pipe(
switchMap(() => this.httpClient.get<string>('https://...'))
)
).pipe(
shareReplay({ bufferSize: 1, refCount: true })
);
根据@DanielGimenez 的回答,我实现了下面的解决方案。它的优点是稍微更简单,更DRY。
private readonly refreshSubject = new BehaviorSubject<void>(void 0);
constructor(private readonly httpClient: HttpClient) {
this.data$ = this.refreshSubject
.pipe(
switchMap(() => this.httpClient.get<string>('https://...')),
shareReplay(1)
);
}
readonly data$: Observable<string>;
refresh(): void {
this.refreshSubject.next();
}
我有一个使用可观察对象公开某些全局状态的服务。必须通过进行 HTTP 调用来初始化状态。我希望状态的所有消费者都收到相同的值,当他们订阅可观察对象时收到当前值,并在状态更改时收到更新。
这种情况通常使用 Subject
处理,可以通过将其订阅到 HTTP 请求返回的可观察对象来初始化。
但是,我不希望在第一次订阅全局状态可观察对象之前执行 HTTP 请求。如果使用 Subject
的 subscribe
方法,则调用 HTTP 请求。
我在服务中有以下代码。这行得通,但似乎应该有更简单的方法来做到这一点。
private initialized = false;
private readonly subject = new ReplaySubject<string>(1);
constructor(private readonly httpClient: HttpClient) {}
get data$(): Observable<string> {
return new Observable<string>(subscriber => {
if (!this.initialized) {
this.refresh();
}
let subscription = this.subject.subscribe(value => subscriber.next(value));
return () => subscription.unsubscribe();
});
}
refresh(): void {
this.httpClient.get<string>('https://...')
.pipe(
tap(() => this.initialized = true)
)
.subscribe(value => this.subject.next(value));
}
沙盒:https://codesandbox.io/s/reverent-almeida-d2zbjx?file=/src/app/test.service.ts
我不确定您的 data$()
方法中的订阅和取消订阅在做什么。这个更简单的版本(返回subject.asObservable()
)是否也符合您的要求?
get data$(): Observable<string> {
if (!this.initialized) {
this.refresh();
}
return this.subject.asObservable();
}
在类似情况下,我倾向于创建一个像这样工作的服务。
- 它公开了触发命令执行的方法,例如执行 http 调用
- 它公开了 Observables,客户可以在他们想要对变化做出反应时订阅这些 Observables
- 内部使用 Subjects 在 public Observables 执行命令时触发通知 returns 某些事情(或错误)
所以,在代码中,它看起来像这样
export class MyService {
private _subject = new ReplySubject<any>(1);
public data$ = this._subject.asObservable();
constructor(private readonly httpClient: HttpClient) {}
public executeRemoteService(inputParams: any) {
// the request is executed as soon as a client calls this method
// the subject notifies, errors and completes mirroring what the
// http client does
this.httpClient.get<string>('https://...')
.pipe(
tap(this.data$)
)
.subscribe()
}
}
任何对 data$
感兴趣的客户端,即对全局状态的变化感兴趣,都必须订阅 data$
,这随时都可能发生。
什么时候触发服务的执行,任何组件都可以调用 executeRemoteService
方法的执行,然后触发 data$
中的通知。
以下应该可以完成您想要的。 data$ 字段应该是两个 observable 的合并。第一个将是初始 http 调用,另一个将监听来自 refreshSubject 的发射以再次调用该服务。
在第一次订阅完成之前不会执行初始调用,任何对 refreshSubject 的调用也将被忽略。
最后,使用shareReplay 将使所有后续订阅者获得最后的结果。使用 refCount 参数,以便如果订阅者计数变为零,则订阅完全取消订阅,下一个订阅者将再次发起第一次调用。如果这种行为是不可取的,你可以只使用 shareReplay(1)
它总是有最后的结果并继续听 refreshSubject.
readonly refreshSubject = new subject<void>();
readonly data$ = merge(
this.httpClient.get<string>('https://...'),
this.refreshSubject.pipe(
switchMap(() => this.httpClient.get<string>('https://...'))
)
).pipe(
shareReplay({ bufferSize: 1, refCount: true })
);
根据@DanielGimenez 的回答,我实现了下面的解决方案。它的优点是稍微更简单,更DRY。
private readonly refreshSubject = new BehaviorSubject<void>(void 0);
constructor(private readonly httpClient: HttpClient) {
this.data$ = this.refreshSubject
.pipe(
switchMap(() => this.httpClient.get<string>('https://...')),
shareReplay(1)
);
}
readonly data$: Observable<string>;
refresh(): void {
this.refreshSubject.next();
}