运行 每个 Observable 的值发射动作一次

Run action once per Observable's value emission

假设我们有如下代码(Stackblitz):

  public counter: number = 0; // Some arbitrary global state

  public ngOnInit(): void {
    const subj: Subject<number> = new Subject<number>();

    const obs: Observable<number> = subj.asObservable().pipe(
      tap((): void => {
        ++this.counter;
      }));

    // this.counter increments for each subscription.
    // But supposed to increment once per obs new value emition.
    obs.subscribe();
    obs.subscribe();
    obs.subscribe();

    subj.next(1);

    // Now this.counter === 3
  }

执行上述后this.counter等于3,每订阅一次就加1。

但是我们如何才能以反应方式在每个 Observable 的新价值排放中增加一次?

我知道我们应该避免反应流之外的状态,但在现实世界的应用程序中仍然时不时需要它。

请尝试这样的事情一次。我无法在任何地方 运行 它。如果出现错误请告诉我

  public counter: number = 0;
  public items: any = [];

  public ngOnInit(): void {
    const obs: Observable<number> = of(1).pipe(
      tap((value): void => {
        if(! Array.prototype.includes(value)) {
           this.items.push(value);
            ++this.counter;
         }
      }));

    // this.counter increments for each subscription.
    // How to make it increment once per obs new value emission.
    obs.subscribe(1);
    obs.subscribe(2);
    obs.subscribe(1);

    // Now this.counter === 3
  }

您看到数字 3 的原因有点不同。当您使用 of(1) 时,它会发出一个 next 通知,紧随其后的是 complete 通知。每次调用 obs.subscribe().

时它都会执行此操作

所以如果你想看到 1 你需要创建一个中间 Subject,你在其中进行所有订阅,然后这个中间 Subject 订阅源 Observable(使用 publish() 运算符最容易创建适合您的主题):

const obs: ConnectableObservable<number> = of(1).pipe(
  tap((): void => {
    ++this.counter;
  }),
  publish(),
) as ConnectableObservable<number>;

obs.subscribe();
obs.subscribe();
obs.subscribe();

obs.connect();

您更新的演示:https://stackblitz.com/edit/rxjs-update-global-state-ejdkie?file=app/app.component.ts

请注意 of(1) 无论如何都会发出 complete 通知,这意味着 publish() 中的主题也会完成。这可能(也可能不是)对您来说是个问题,但这实际上取决于您的用例。

编辑:这是更新的演示,其中源是主题:https://stackblitz.com/edit/rxjs-update-global-state-bws36m?file=app/app.component.ts