运行 每个 Observable 的值发射动作一次
Run action once per Observable's value emission
假设我们有如下代码(Stackblitz):
public counter: number = 0; // Some arbitrary global state
public ngOnInit(): void {
const subj: Subject<number> = new Subject<number>();
const obs: Observable<number> = subj.asObservable().pipe(
tap((): void => {
++this.counter;
}));
// this.counter increments for each subscription.
// But supposed to increment once per obs new value emition.
obs.subscribe();
obs.subscribe();
obs.subscribe();
subj.next(1);
// Now this.counter === 3
}
执行上述后this.counter
等于3
,每订阅一次就加1。
但是我们如何才能以反应方式在每个 Observable
的新价值排放中增加一次?
我知道我们应该避免反应流之外的状态,但在现实世界的应用程序中仍然时不时需要它。
请尝试这样的事情一次。我无法在任何地方 运行 它。如果出现错误请告诉我
public counter: number = 0;
public items: any = [];
public ngOnInit(): void {
const obs: Observable<number> = of(1).pipe(
tap((value): void => {
if(! Array.prototype.includes(value)) {
this.items.push(value);
++this.counter;
}
}));
// this.counter increments for each subscription.
// How to make it increment once per obs new value emission.
obs.subscribe(1);
obs.subscribe(2);
obs.subscribe(1);
// Now this.counter === 3
}
您看到数字 3
的原因有点不同。当您使用 of(1)
时,它会发出一个 next
通知,紧随其后的是 complete
通知。每次调用 obs.subscribe()
.
时它都会执行此操作
所以如果你想看到 1
你需要创建一个中间 Subject,你在其中进行所有订阅,然后这个中间 Subject 订阅源 Observable(使用 publish()
运算符最容易创建适合您的主题):
const obs: ConnectableObservable<number> = of(1).pipe(
tap((): void => {
++this.counter;
}),
publish(),
) as ConnectableObservable<number>;
obs.subscribe();
obs.subscribe();
obs.subscribe();
obs.connect();
您更新的演示:https://stackblitz.com/edit/rxjs-update-global-state-ejdkie?file=app/app.component.ts
请注意 of(1)
无论如何都会发出 complete
通知,这意味着 publish()
中的主题也会完成。这可能(也可能不是)对您来说是个问题,但这实际上取决于您的用例。
编辑:这是更新的演示,其中源是主题:https://stackblitz.com/edit/rxjs-update-global-state-bws36m?file=app/app.component.ts
假设我们有如下代码(Stackblitz):
public counter: number = 0; // Some arbitrary global state
public ngOnInit(): void {
const subj: Subject<number> = new Subject<number>();
const obs: Observable<number> = subj.asObservable().pipe(
tap((): void => {
++this.counter;
}));
// this.counter increments for each subscription.
// But supposed to increment once per obs new value emition.
obs.subscribe();
obs.subscribe();
obs.subscribe();
subj.next(1);
// Now this.counter === 3
}
执行上述后this.counter
等于3
,每订阅一次就加1。
但是我们如何才能以反应方式在每个 Observable
的新价值排放中增加一次?
我知道我们应该避免反应流之外的状态,但在现实世界的应用程序中仍然时不时需要它。
请尝试这样的事情一次。我无法在任何地方 运行 它。如果出现错误请告诉我
public counter: number = 0;
public items: any = [];
public ngOnInit(): void {
const obs: Observable<number> = of(1).pipe(
tap((value): void => {
if(! Array.prototype.includes(value)) {
this.items.push(value);
++this.counter;
}
}));
// this.counter increments for each subscription.
// How to make it increment once per obs new value emission.
obs.subscribe(1);
obs.subscribe(2);
obs.subscribe(1);
// Now this.counter === 3
}
您看到数字 3
的原因有点不同。当您使用 of(1)
时,它会发出一个 next
通知,紧随其后的是 complete
通知。每次调用 obs.subscribe()
.
所以如果你想看到 1
你需要创建一个中间 Subject,你在其中进行所有订阅,然后这个中间 Subject 订阅源 Observable(使用 publish()
运算符最容易创建适合您的主题):
const obs: ConnectableObservable<number> = of(1).pipe(
tap((): void => {
++this.counter;
}),
publish(),
) as ConnectableObservable<number>;
obs.subscribe();
obs.subscribe();
obs.subscribe();
obs.connect();
您更新的演示:https://stackblitz.com/edit/rxjs-update-global-state-ejdkie?file=app/app.component.ts
请注意 of(1)
无论如何都会发出 complete
通知,这意味着 publish()
中的主题也会完成。这可能(也可能不是)对您来说是个问题,但这实际上取决于您的用例。
编辑:这是更新的演示,其中源是主题:https://stackblitz.com/edit/rxjs-update-global-state-bws36m?file=app/app.component.ts