
Observable subject event listener

我正在研究 Observables 及其与 EventEmitter 的区别,然后偶然发现了 Subjects(我可以看到 Angulars EventEmitter 是基于 subjects 的)。

看起来 Observables 是单播的,而 Subjects 是多播的(然后 EE 只是一个将 .next 包装在 emit 中以提供正确接口的主题)。

Observables 似乎很容易实现

class Observable {
    constructor(subscribe) {
        this._subscribe = subscribe;

    subscribe(next, complete, error) {
        const observer = new Observer(next, complete, error);

        // return way to unsubscribe
        return this._subscribe(observer);


其中 Observer 只是一个包装器,它添加了一些 try catch 和监视器 isComplete 以便它可以清理并停止观察。


class Subject {
    subscribers = new Set();

    constructor() {
        this.observable = new Observable(observer => {
            this.observer = observer;

        this.observable.subscribe((...args) => {
            this.subscribers.forEach(sub => sub(...args))

    subscribe(subscriber) {

    emit(...args) {

哪一种合并到一个 EventEmitter 中,它用 emit 包装 .next - 但捕获 Observable 的 observe 参数似乎是错误的 - 就像我刚刚破解了一个解决方案。从 Observable(单播)生成 Subject(多播)的更好方法是什么?

我试着查看 RXJS,但我看不出它是如何填充 subscribers 数组的:/

我想你也可以通过使用调试器来更好地理解。打开 StackBlitz RxJS 项目,创建最简单的示例(取决于您想要了解的内容),然后放置一些断点。据我所知,使用 StackBlitz,您可以调试 TypeScript 文件,这看起来很棒。

首先,Subject class extends Observable:

export class Subject<T> extends Observable<T> implements SubscriptionLike { /* ... */ }

现在让我们检查 Observable class.

它有 well-known pipe method:

pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
  return operations.length ? pipeFromArray(operations)(this) : this;

其中 pipeFromArray 定义为 as follows:

export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any, any>;

  if (fns.length === 1) {
    return fns[0];

  return function piped(input: T): R {
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);

在澄清上面代码片段中发生的事情之前,了解 operators 是很重要的。运算符是一个函数,它 return 是另一个函数,其单个参数是 Observable<T> 并且 return 类型是 Observable<R>。有时,TR 可以相同(例如,当使用 filter()debounceTime()... 时)。

例如map就是defined like this:

export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
  return operate((source, subscriber) => {
    // The index of the value from the source. Used with projection.
    let index = 0;
    // Subscribe to the source, all errors and completions are sent along
    // to the consumer.
      new OperatorSubscriber(subscriber, (value: T) => {
        // Call the projection function with the appropriate this context,
        // and send the resulting value to the consumer.
        subscriber.next(project.call(thisArg, value, index++));

export function operate<T, R>(
  init: (liftedSource: Observable<T>, subscriber: Subscriber<R>) => (() => void) | void
): OperatorFunction<T, R> {
  return (source: Observable<T>) => {
    if (hasLift(source)) {
      return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
        try {
          return init(liftedSource, this);
        } catch (err) {
    throw new TypeError('Unable to lift unknown Observable type');

所以,operatereturn一个函数。注意它的参数:source: Observable<T>。 return 类型派生自 Subscriber<R>.

Observable.lift 只是创建了一个新的 Observable。这就像在喜欢的列表中创建节点。

protected lift<R>(operator?: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  // it's important to keep track of the source !
  observable.source = this;
  observable.operator = operator;
  return observable;

因此,运算符(如 map)将 return 一个函数。调用该函数的是 pipeFromArray 函数:

export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any, any>;

  if (fns.length === 1) {
    return fns[0];

  return function piped(input: T): R {
    // here the functions returned by the operators are being called
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);

在上面的代码片段中,fn 就是 operate 函数 returns:

return (source: Observable<T>) => {
  if (hasLift(source)) { // has `lift` method
    return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
      try {
        return init(liftedSource, this);
      } catch (err) {
  throw new TypeError('Unable to lift unknown Observable type');


const src$ = new Observable(subscriber => {subscriber.next(1), subscriber.complete()});

subscriber => {} 回调 fn 将分配给 Observable._subscribe 属性。

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
  if (subscribe) {
    this._subscribe = subscribe;


const src2$ = src$.pipe(map(num => num ** 2))

在这种情况下,它将从 pipeFromArray:

// `pipeFromArray`
if (fns.length === 1) {
  return fns[0];

// `Observable.pipe`
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
  return operations.length ? pipeFromArray(operations)(this) : this;

因此,Observable.pipe 将调用 (source: Observable<T>) => { ... },其中 sourcesrc$ Observable。通过调用该函数(其结果存储在 src2$ 中),它还将调用 Observable.lift 方法。

return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
  try {
    return init(liftedSource, this);
  } catch (err) {

/* ... */

protected lift<R>(operator?: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  observable.source = this;
  observable.operator = operator;
  return observable;

此时,src$是一个Observable实例,source设置为src$operator设置为function (this: Subscriber<R>, liftedSource: Observable<T>) ....

在我看来,这都是关于链表的。创建 Observable 链时(通过添加运算符),列表是从上到下创建的。
tail 节点 调用其 subscribe 方法时,将创建另一个列表,这次是从下到上。我喜欢将第一个称为 Observable list,将第二个称为 Subscribers list


这是调用 subscribe 方法时发生的情况:

const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
  const { operator, source } = this;
      ? operator.call(subscriber, source)
      : source || config.useDeprecatedSynchronousErrorHandling
      ? this._subscribe(subscriber)
      : this._trySubscribe(subscriber)

  return subscriber;

在这种情况下 src2$ 有一个 operator,所以它会调用它。 operator 定义为:

function (this: Subscriber<R>, liftedSource: Observable<T>) {
  try {
    return init(liftedSource, this);
  } catch (err) {

其中 init 取决于所使用的运算符。再一次,这里是 mapinit

export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
  return operate( /* THIS IS `init()` */(source, subscriber) => {
    // The index of the value from the source. Used with projection.
    let index = 0;
    // Subscribe to the source, all errors and completions are sent along
    // to the consumer.
      new OperatorSubscriber(subscriber, (value: T) => {
        // Call the projection function with the appropriate this context,
        // and send the resulting value to the consumer.
        subscriber.next(project.call(thisArg, value, index++));

source其实就是src$。当调用 source.subscribe() 时,它将最终调用提供给 new Observable(subscriber => { ... }) 的回调。调用 subscriber.next(1) 将从上方调用 (value: T) => { ... },后者将调用 subscriber.next(project.call(thisArg, value, index++));project - 提供给 map 的回调)。最后,subscriber.next 指的是 console.log.

回到 Subject,这是调用 _subscribe 方法时发生的情况:

protected _subscribe(subscriber: Subscriber<T>): Subscription {
  this._throwIfClosed(); // if unsubscribed
  this._checkFinalizedStatuses(subscriber); // `error` or `complete` notifications
  return this._innerSubscribe(subscriber);

protected _innerSubscribe(subscriber: Subscriber<any>) {
  const { hasError, isStopped, observers } = this;
  return hasError || isStopped
    : (observers.push(subscriber), new Subscription(() => arrRemove(this.observers, subscriber)));

所以,这就是 Subject's 订阅者列表的填充方式。通过 returning new Subscription(() => arrRemove(this.observers, subscriber)),它确保订阅者取消订阅(由于 complete/error 通知或简单地 subscriber.unsubscribe()),inactive 订阅者将从 Subject 的列表中删除。