如何最好地处理模板上同一可观察对象的多个订阅?
How to best handle multiple subscriptions to the same observable on the template?
假设我有一个名为 'todoList$' 的可观察对象。使用 'async' 运算符,我可以从中自动 subscribe/unsubscribe 。下面代码中的问题是对同一个可观察对象有两个相同的订阅:
<ng-container *ngIf="(todoList$ | async).length > 0>
<div *ngFor="let todo of todoList$ | async">
...
这不是很 DRY,因此,我们为可以更有效处理的订阅分配内存。
由于 ngIf 条件中的语法,我不相信我可以使用 'as' 关键字为可观察输出创建模板变量。 相反,当我使用组件文件中的 RxJs 'share' 运算符时有效:
todoList$ = this.store.select(todoList).pipe(tap(x => {console.log('testing')}), share());
//testing
没有共享运算符,“testing”被打印两次。这让我相信 share() 运算符可以解决这个问题。如果是,不确定 why/how?由于这可能是一个普遍的问题/代码异味,处理同一模板中相同的多个订阅的最佳方法是什么?
我承认 Whosebug 上有一些类似的问题。但是 none 已经给了我想要的东西。
您实际上仍然可以在 *ngIf
指令中使用 as
签名来只有一个有效订阅。尝试以下
<ng-container *ngIf="(todoList$ | async) as todoList">
<ng-container *ngIf="todoList.length > 0">
<div *ngFor="let todo of todoList">
...
在这种情况下使用 *ngIf
。希望对你有所帮助。
<ng-container *ngIf="(todoList$ | async) as todoList">
<ng-container *ngIf="todoList && todoList != undefined && todoList.length">
<div *ngFor="let todo of todoList">
...
If it does, not exactly sure why/how?
让我们看看如何 share()
is defined:
function shareSubjectFactory() {
return new Subject<any>();
}
return (source: Observable<T>) => refCount()(multicast(shareSubjectFactory)(source)) as Observable<T>;
首先,
(source: Observable<T>) => refCount()(multicast(shareSubjectFactory)(source))
与
相同
(source: Observable<T>) => source.pipe(
multicast(shareSubjectFactory),
refCount()
)
multicast
将 return 变成 ConnectableObservable
,它仍然是 Observable
,但除其他外,它公开了一个 connect
方法。
// Inside `multicast` operator
const connectable: any = Object.create(source, connectableObservableDescriptor);
connectable.source = source;
connectable.subjectFactory = subjectFactory;
return <ConnectableObservable<R>> connectable;
另一个有趣的事情是,当订阅时,订阅者将被添加到Subject
的订阅者列表和主要来源在调用 connect
之前不会 被订阅:
_subscribe(subscriber: Subscriber<T>) {
return this.getSubject().subscribe(subscriber);
}
protected getSubject(): Subject<T> {
const subject = this._subject;
if (!subject || subject.isStopped) {
this._subject = this.subjectFactory();
}
return this._subject!;
}
例如:
const src$ = privateSrc.pipe(
tap(() => console.log('from src')),
share(),
tap(() => console.log('from share()')),
)
订阅src$
时:
// Subscriber #1
src$.subscribe(/* ... */)
订阅者将被添加到 Subject
的订阅者列表中,来源 src$
将被订阅。为什么?因为 share
也使用 refCount
,如果在以前没有活跃订阅者的情况下注册了新订阅者,则 订阅 源并且将 取消订阅 如果没有更多活跃订阅者,则从源中获取。
我们再来看一个例子:
const src$ = (new Observable(s => {
console.warn('[SOURCE] SUBSCRIBED')
setTimeout(() => {
s.next(1);
}, 1000);
})).pipe(share());
// First subscriber,
// because it's the first one, `refCount` will to its job and the source will be subscribed
// and this subscriber will be added to the `Subject`'s subscribers list
// note that the source sends the data asynchronously
src$.subscribe(/* ... */)
// The second subscriber
// since the source is already subscribed, `refCount` won't subscribe to it again
// instead, this new subscriber will be added to `Subject`'s list
src$.subscribe(/* ... */)
在 1s
之后,源将发送值 1
,主题将收到该值并将其发送给其注册订阅者。
这是 how refCount
的魔法:
// When a new subscriber is registered
(<any> connectable)._refCount++;
// `RefCountSubscriber` will make sure that if no more subscribers are left
// the source will be unsubscribed
const refCounter = new RefCountSubscriber(subscriber, connectable);
// Add the subscriber to the `Subject`'s list
const subscription = connectable.subscribe(refCounter);
if (!refCounter.closed) {
(<any> refCounter).connection = connectable.connect();
}
return subscription;
而ConnectableObservable.connect
定义为as follows:
connect(): Subscription {
let connection = this._connection;
if (!connection) {
// If the source wasn't subscribed before
this._isComplete = false;
connection = this._connection = new Subscription();
// Subscribing to the source
// Every notification send by the source will be first received by `Subject`
connection.add(this.source
.subscribe(new ConnectableSubscriber(this.getSubject(), this)));
/* ... */
}
return connection;
}
因此,如果我们有一个 src$
observable 需要在模板中多次订阅,我们可以应用上述概念。
但是,我们应该注意一个重要方面。
如果我们的模板是这样的:
<!-- #1 -->
<div *ngIf="src$ | async"></div>
<!-- ... -->
<!-- #2 -->
<div *ngIf="src$ | async"></div>
和src$
:
src$ = store.pipe(select(/* ... */), share())
那么,如果store
已经有值,则同步获取,也就是说当#1
被注册时,[=41] =] 将被订阅并发送该值,但请注意那时 #2
尚未 订阅,因此它不会收到任何东西。
如果 source
是异步的,那么我们应该没有问题,因为模板中的订阅很可能是 同步的。
但是,当源是同步时,您可以这样解决这个问题:
src$ = store.pipe(
select(/* ... */),
subscribeOn(asyncScheduler),
share()
)
subscribeOn(asyncScheduler)
大致 与使用 setTimeout(() => {}, 0)
延迟订阅源相同。但是,这允许订阅 #2
,这样当源最终被订阅时,两个订阅者都将收到该值。
作为一般规则,我在模板中的每个 Observable
末尾使用 shareReplay({ refCount: true, bufferSize: 1 })
运算符。我还将它添加到基础 observables,我用它来分支其他 observables,然后在模板中使用。
这将确保订阅在每个订阅者之间共享,并且通过使用 shareReplay
您可以通过使用 take(1)
.
获得组件内部最后发出的结果。
{ refCount: true, bufferSize: 1 }
的原因是,如果您只使用 shareReplay(1)
,它可能会导致订阅泄漏,无论您是否使用 async
管道。
回到你的例子,Michael D
提供的答案还不错,这样做是有道理的。但是,它确实需要模板中的一些逻辑,我个人不赞成。
因此,只要您使用 shareReplay
,在您的模板中使用多个 async
调用确实没有任何缺点,您甚至可以使它们在您的整个过程中具有描述性和可重用性通过在您的组件中定义它们来模板:
export class TodoComponent {
readonly todoList$ = this.store.select(todoList).pipe(
shareReplay({ refCount: true, bufferSize: 1 })
);
readonly hasTodos$ = this.todoList$.pipe(
map((todos) => todos?.length > 0),
shareReplay({ refCount: true, bufferSize: 1 })
);
}
然后您可以保持模板的描述性:
<ng-container *ngIf="hasTodos$ | async>
<div *ngFor="let todo of todoList$ | async">
<!-- -->
别忘了你的trackBy
!
如果您不喜欢重复代码,您甚至可以创建自定义运算符并使用它:
export function shareRef<T>() {
return (source: Observable<T>) => source.pipe(
shareReplay({ refCount: true, bufferSize: 1 })
);
}
将您的 observable 更改为:
readonly todoList$ = this.store.select(todoList).pipe(
shareRef()
);
另一个选择(我觉得更简单)
<ng-container *ngIf="todoList$|async as todoList;else loading">
<div *ngFor="let todo of todoList">
{{todo}}
</div>
<div *ngIf="!todoList.length">Empty</div>
</ng-container>
<ng-template #loading>loading...</ng-template>
另一个,使用中间对象(*)
<ng-container *ngIf="{data:todoList$|async} as todoList">
<div *ngIf="!todoList.data">loading...</div>
<div *ngFor="let todo of todoList.data">
{{todo}}
</div>
<div *ngIf="!todoList.data.length">Empty</div>
</ng-container>
(*) 看到第一个 *ngIf return 始终为真,但是在 ng-container 下我们在 todoList.data 中有数据。
假设我有一个名为 'todoList$' 的可观察对象。使用 'async' 运算符,我可以从中自动 subscribe/unsubscribe 。下面代码中的问题是对同一个可观察对象有两个相同的订阅:
<ng-container *ngIf="(todoList$ | async).length > 0>
<div *ngFor="let todo of todoList$ | async">
...
这不是很 DRY,因此,我们为可以更有效处理的订阅分配内存。
由于 ngIf 条件中的语法,我不相信我可以使用 'as' 关键字为可观察输出创建模板变量。 相反,当我使用组件文件中的 RxJs 'share' 运算符时有效:
todoList$ = this.store.select(todoList).pipe(tap(x => {console.log('testing')}), share());
//testing
没有共享运算符,“testing”被打印两次。这让我相信 share() 运算符可以解决这个问题。如果是,不确定 why/how?由于这可能是一个普遍的问题/代码异味,处理同一模板中相同的多个订阅的最佳方法是什么?
我承认 Whosebug 上有一些类似的问题。但是 none 已经给了我想要的东西。
您实际上仍然可以在 *ngIf
指令中使用 as
签名来只有一个有效订阅。尝试以下
<ng-container *ngIf="(todoList$ | async) as todoList">
<ng-container *ngIf="todoList.length > 0">
<div *ngFor="let todo of todoList">
...
在这种情况下使用 *ngIf
。希望对你有所帮助。
<ng-container *ngIf="(todoList$ | async) as todoList">
<ng-container *ngIf="todoList && todoList != undefined && todoList.length">
<div *ngFor="let todo of todoList">
...
If it does, not exactly sure why/how?
让我们看看如何 share()
is defined:
function shareSubjectFactory() {
return new Subject<any>();
}
return (source: Observable<T>) => refCount()(multicast(shareSubjectFactory)(source)) as Observable<T>;
首先,
(source: Observable<T>) => refCount()(multicast(shareSubjectFactory)(source))
与
相同(source: Observable<T>) => source.pipe(
multicast(shareSubjectFactory),
refCount()
)
multicast
将 return 变成 ConnectableObservable
,它仍然是 Observable
,但除其他外,它公开了一个 connect
方法。
// Inside `multicast` operator
const connectable: any = Object.create(source, connectableObservableDescriptor);
connectable.source = source;
connectable.subjectFactory = subjectFactory;
return <ConnectableObservable<R>> connectable;
另一个有趣的事情是,当订阅时,订阅者将被添加到Subject
的订阅者列表和主要来源在调用 connect
之前不会 被订阅:
_subscribe(subscriber: Subscriber<T>) {
return this.getSubject().subscribe(subscriber);
}
protected getSubject(): Subject<T> {
const subject = this._subject;
if (!subject || subject.isStopped) {
this._subject = this.subjectFactory();
}
return this._subject!;
}
例如:
const src$ = privateSrc.pipe(
tap(() => console.log('from src')),
share(),
tap(() => console.log('from share()')),
)
订阅src$
时:
// Subscriber #1
src$.subscribe(/* ... */)
订阅者将被添加到 Subject
的订阅者列表中,来源 src$
将被订阅。为什么?因为 share
也使用 refCount
,如果在以前没有活跃订阅者的情况下注册了新订阅者,则 订阅 源并且将 取消订阅 如果没有更多活跃订阅者,则从源中获取。
我们再来看一个例子:
const src$ = (new Observable(s => {
console.warn('[SOURCE] SUBSCRIBED')
setTimeout(() => {
s.next(1);
}, 1000);
})).pipe(share());
// First subscriber,
// because it's the first one, `refCount` will to its job and the source will be subscribed
// and this subscriber will be added to the `Subject`'s subscribers list
// note that the source sends the data asynchronously
src$.subscribe(/* ... */)
// The second subscriber
// since the source is already subscribed, `refCount` won't subscribe to it again
// instead, this new subscriber will be added to `Subject`'s list
src$.subscribe(/* ... */)
在 1s
之后,源将发送值 1
,主题将收到该值并将其发送给其注册订阅者。
这是 how refCount
的魔法:
// When a new subscriber is registered
(<any> connectable)._refCount++;
// `RefCountSubscriber` will make sure that if no more subscribers are left
// the source will be unsubscribed
const refCounter = new RefCountSubscriber(subscriber, connectable);
// Add the subscriber to the `Subject`'s list
const subscription = connectable.subscribe(refCounter);
if (!refCounter.closed) {
(<any> refCounter).connection = connectable.connect();
}
return subscription;
而ConnectableObservable.connect
定义为as follows:
connect(): Subscription {
let connection = this._connection;
if (!connection) {
// If the source wasn't subscribed before
this._isComplete = false;
connection = this._connection = new Subscription();
// Subscribing to the source
// Every notification send by the source will be first received by `Subject`
connection.add(this.source
.subscribe(new ConnectableSubscriber(this.getSubject(), this)));
/* ... */
}
return connection;
}
因此,如果我们有一个 src$
observable 需要在模板中多次订阅,我们可以应用上述概念。
但是,我们应该注意一个重要方面。
如果我们的模板是这样的:
<!-- #1 -->
<div *ngIf="src$ | async"></div>
<!-- ... -->
<!-- #2 -->
<div *ngIf="src$ | async"></div>
和src$
:
src$ = store.pipe(select(/* ... */), share())
那么,如果store
已经有值,则同步获取,也就是说当#1
被注册时,[=41] =] 将被订阅并发送该值,但请注意那时 #2
尚未 订阅,因此它不会收到任何东西。
如果 source
是异步的,那么我们应该没有问题,因为模板中的订阅很可能是 同步的。
但是,当源是同步时,您可以这样解决这个问题:
src$ = store.pipe(
select(/* ... */),
subscribeOn(asyncScheduler),
share()
)
subscribeOn(asyncScheduler)
大致 与使用 setTimeout(() => {}, 0)
延迟订阅源相同。但是,这允许订阅 #2
,这样当源最终被订阅时,两个订阅者都将收到该值。
作为一般规则,我在模板中的每个 Observable
末尾使用 shareReplay({ refCount: true, bufferSize: 1 })
运算符。我还将它添加到基础 observables,我用它来分支其他 observables,然后在模板中使用。
这将确保订阅在每个订阅者之间共享,并且通过使用 shareReplay
您可以通过使用 take(1)
.
{ refCount: true, bufferSize: 1 }
的原因是,如果您只使用 shareReplay(1)
,它可能会导致订阅泄漏,无论您是否使用 async
管道。
回到你的例子,Michael D
提供的答案还不错,这样做是有道理的。但是,它确实需要模板中的一些逻辑,我个人不赞成。
因此,只要您使用 shareReplay
,在您的模板中使用多个 async
调用确实没有任何缺点,您甚至可以使它们在您的整个过程中具有描述性和可重用性通过在您的组件中定义它们来模板:
export class TodoComponent {
readonly todoList$ = this.store.select(todoList).pipe(
shareReplay({ refCount: true, bufferSize: 1 })
);
readonly hasTodos$ = this.todoList$.pipe(
map((todos) => todos?.length > 0),
shareReplay({ refCount: true, bufferSize: 1 })
);
}
然后您可以保持模板的描述性:
<ng-container *ngIf="hasTodos$ | async>
<div *ngFor="let todo of todoList$ | async">
<!-- -->
别忘了你的trackBy
!
如果您不喜欢重复代码,您甚至可以创建自定义运算符并使用它:
export function shareRef<T>() {
return (source: Observable<T>) => source.pipe(
shareReplay({ refCount: true, bufferSize: 1 })
);
}
将您的 observable 更改为:
readonly todoList$ = this.store.select(todoList).pipe(
shareRef()
);
另一个选择(我觉得更简单)
<ng-container *ngIf="todoList$|async as todoList;else loading">
<div *ngFor="let todo of todoList">
{{todo}}
</div>
<div *ngIf="!todoList.length">Empty</div>
</ng-container>
<ng-template #loading>loading...</ng-template>
另一个,使用中间对象(*)
<ng-container *ngIf="{data:todoList$|async} as todoList">
<div *ngIf="!todoList.data">loading...</div>
<div *ngFor="let todo of todoList.data">
{{todo}}
</div>
<div *ngIf="!todoList.data.length">Empty</div>
</ng-container>
(*) 看到第一个 *ngIf return 始终为真,但是在 ng-container 下我们在 todoList.data 中有数据。