为什么每次订阅都会调用管道主题中的运算符?

Why does operator in piped Subject is called for every subscription?

我需要从 Subject 过滤值并对返回的数据做一些副作用。
像这样:

const subject2 = subject.pipe(
  filter((value: number) => {
    console.log(`filter: ${value}`);
    return value % 2 === 0; // filter even nubmers
  }),
  tap((value) => console.log(`after filter: ${value}`))
);

我看到来自 filter() 的函数被调用给每个发送给 subject2 订阅者的值(即与 subject2 订阅者长度一样多的次数)。但我假设它会在每次 next() 调用时被调用一次。

我还看到,如果我订阅 subject2 并通过管道传输其值,则不会出现重复。

谁能解释一下幕后发生的事情以及过滤主题值的正确模式是什么?

Stackblitz 上的示例:
https://stackblitz.com/edit/typescript-e4stc4?devtoolsheight=100&file=index.ts

在幕后,Subject 的 next 方法是这样实现的:

for (const observer of this.observers) {
  observer.next(value);
}

因此,当您向 Subject 发送消息时,每个“观察者”(或“订阅者”)都会收到自己的通知。运算符只是在将结果传递给观察者之前处理 value 的函数。

例如,如果您这样声明运算符:

const myFilter = filter((value: number) => value % 2 === 0);
const myTap = tap((value) => console.log(`after filter: ${value}`));

那么自定义 Subject 中的 next 函数可以这样实现:

for (const observer of this.observers) {
  observer.next(myTap(myFilter(value)));
}

(此代码实际上不起作用 - 它是一种简化,用于显示当您在主题上调用 next 时值如何到达订阅者)

要解决您的问题,您可以通过将 share() 作为链的最后一个元素来减少源 Subject 的观察者数量,如下所示:

const subject2 = subject.pipe(
  filter((value: number) => {
    console.log(`filter: ${value}`);
    return value % 2 === 0; // filter even nubmers
  }),
  tap((value) => console.log(`after filter: ${value}`)),
  share()
);

share 的实现使其作为源 Observable 的单个观察者,无论有多少观察者订阅了它。