可观察的主题事件监听器

Observable subject event listener

我正在研究 Observables 及其与 EventEmitter 的区别,然后偶然发现了 Subjects(我可以看到 Angulars EventEmitter 是基于 subjects 的)。

看起来 Observables 是单播的,而 Subjects 是多播的(然后 EE 只是一个将 .next 包装在 emit 中以提供正确接口的主题)。

Observables 似乎很容易实现

class Observable {
    constructor(subscribe) {
        this._subscribe = subscribe;
    }

    subscribe(next, complete, error) {
        const observer = new Observer(next, complete, error);

        // return way to unsubscribe
        return this._subscribe(observer);
    }

}

其中 Observer 只是一个包装器,它添加了一些 try catch 和监视器 isComplete 以便它可以清理并停止观察。

对于我提出的主题:

class Subject {
    subscribers = new Set();

    constructor() {
        this.observable = new Observable(observer => {
            this.observer = observer;
        });

        this.observable.subscribe((...args) => {
            this.subscribers.forEach(sub => sub(...args))
        });
    }

    subscribe(subscriber) {
        this.subscribers.add(subscriber);
    }

    emit(...args) {
        this.observer.next(...args);
    }
}

哪一种合并到一个 EventEmitter 中,它用 emit 包装 .next - 但捕获 Observable 的 observe 参数似乎是错误的 - 就像我刚刚破解了一个解决方案。从 Observable(单播)生成 Subject(多播)的更好方法是什么?

我试着查看 RXJS,但我看不出它是如何填充 subscribers 数组的:/

我想你也可以通过使用调试器来更好地理解。打开 StackBlitz RxJS 项目,创建最简单的示例(取决于您想要了解的内容),然后放置一些断点。据我所知,使用 StackBlitz,您可以调试 TypeScript 文件,这看起来很棒。


首先,Subject class extends Observable:

export class Subject<T> extends Observable<T> implements SubscriptionLike { /* ... */ }

现在让我们检查 Observable class.

它有 well-known pipe method:

pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
  return operations.length ? pipeFromArray(operations)(this) : this;
}

其中 pipeFromArray 定义为 as follows:

export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any, any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }

  return function piped(input: T): R {
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
  };
}

在澄清上面代码片段中发生的事情之前,了解 operators 是很重要的。运算符是一个函数,它 return 是另一个函数,其单个参数是 Observable<T> 并且 return 类型是 Observable<R>。有时,TR 可以相同(例如,当使用 filter()debounceTime()... 时)。

例如map就是defined like this:

export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
  return operate((source, subscriber) => {
    // The index of the value from the source. Used with projection.
    let index = 0;
    // Subscribe to the source, all errors and completions are sent along
    // to the consumer.
    source.subscribe(
      new OperatorSubscriber(subscriber, (value: T) => {
        // Call the projection function with the appropriate this context,
        // and send the resulting value to the consumer.
        subscriber.next(project.call(thisArg, value, index++));
      })
    );
  });
}

export function operate<T, R>(
  init: (liftedSource: Observable<T>, subscriber: Subscriber<R>) => (() => void) | void
): OperatorFunction<T, R> {
  return (source: Observable<T>) => {
    if (hasLift(source)) {
      return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
        try {
          return init(liftedSource, this);
        } catch (err) {
          this.error(err);
        }
      });
    }
    throw new TypeError('Unable to lift unknown Observable type');
  };
}

所以,operatereturn一个函数。注意它的参数:source: Observable<T>。 return 类型派生自 Subscriber<R>.

Observable.lift 只是创建了一个新的 Observable。这就像在喜欢的列表中创建节点。

protected lift<R>(operator?: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  
  // it's important to keep track of the source !
  observable.source = this;
  observable.operator = operator;
  return observable;
}

因此,运算符(如 map)将 return 一个函数。调用该函数的是 pipeFromArray 函数:

export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any, any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }

  return function piped(input: T): R {
    // here the functions returned by the operators are being called
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
  };
}

在上面的代码片段中,fn 就是 operate 函数 returns:

return (source: Observable<T>) => {
  if (hasLift(source)) { // has `lift` method
    return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
      try {
        return init(liftedSource, this);
      } catch (err) {
        this.error(err);
      }
    });
  }
  throw new TypeError('Unable to lift unknown Observable type');
};

也许最好也看看例子。我建议您自己使用调试器进行尝试。

const src$ = new Observable(subscriber => {subscriber.next(1), subscriber.complete()});

subscriber => {} 回调 fn 将分配给 Observable._subscribe 属性。

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
  if (subscribe) {
    this._subscribe = subscribe;
  }
}

接下来,让我们尝试添加一个运算符:

const src2$ = src$.pipe(map(num => num ** 2))

在这种情况下,它将从 pipeFromArray:

调用此块
// `pipeFromArray`
if (fns.length === 1) {
  return fns[0];
}

// `Observable.pipe`
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
  return operations.length ? pipeFromArray(operations)(this) : this;
}

因此,Observable.pipe 将调用 (source: Observable<T>) => { ... },其中 sourcesrc$ Observable。通过调用该函数(其结果存储在 src2$ 中),它还将调用 Observable.lift 方法。

return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
  try {
    return init(liftedSource, this);
  } catch (err) {
    this.error(err);
  }
});

/* ... */

protected lift<R>(operator?: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  observable.source = this;
  observable.operator = operator;
  return observable;
}

此时,src$是一个Observable实例,source设置为src$operator设置为function (this: Subscriber<R>, liftedSource: Observable<T>) ....

在我看来,这都是关于链表的。创建 Observable 链时(通过添加运算符),列表是从上到下创建的。
tail 节点 调用其 subscribe 方法时,将创建另一个列表,这次是从下到上。我喜欢将第一个称为 Observable list,将第二个称为 Subscribers list

src2$.subscribe(console.log)

这是调用 subscribe 方法时发生的情况:

const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
  
  const { operator, source } = this;
  subscriber.add(
    operator
      ? operator.call(subscriber, source)
      : source || config.useDeprecatedSynchronousErrorHandling
      ? this._subscribe(subscriber)
      : this._trySubscribe(subscriber)
  );

  return subscriber;

在这种情况下 src2$ 有一个 operator,所以它会调用它。 operator 定义为:

function (this: Subscriber<R>, liftedSource: Observable<T>) {
  try {
    return init(liftedSource, this);
  } catch (err) {
    this.error(err);
  }
}

其中 init 取决于所使用的运算符。再一次,这里是 mapinit

export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
  return operate( /* THIS IS `init()` */(source, subscriber) => {
    
    // The index of the value from the source. Used with projection.
    let index = 0;
    // Subscribe to the source, all errors and completions are sent along
    // to the consumer.
    source.subscribe(
      new OperatorSubscriber(subscriber, (value: T) => {
        // Call the projection function with the appropriate this context,
        // and send the resulting value to the consumer.
        subscriber.next(project.call(thisArg, value, index++));
      })
    );
  });
}

source其实就是src$。当调用 source.subscribe() 时,它将最终调用提供给 new Observable(subscriber => { ... }) 的回调。调用 subscriber.next(1) 将从上方调用 (value: T) => { ... },后者将调用 subscriber.next(project.call(thisArg, value, index++));project - 提供给 map 的回调)。最后,subscriber.next 指的是 console.log.

回到 Subject,这是调用 _subscribe 方法时发生的情况:

protected _subscribe(subscriber: Subscriber<T>): Subscription {
  this._throwIfClosed(); // if unsubscribed
  this._checkFinalizedStatuses(subscriber); // `error` or `complete` notifications
  return this._innerSubscribe(subscriber);
}

protected _innerSubscribe(subscriber: Subscriber<any>) {
  const { hasError, isStopped, observers } = this;
  return hasError || isStopped
    ? EMPTY_SUBSCRIPTION
    : (observers.push(subscriber), new Subscription(() => arrRemove(this.observers, subscriber)));
}

所以,这就是 Subject's 订阅者列表的填充方式。通过 returning new Subscription(() => arrRemove(this.observers, subscriber)),它确保订阅者取消订阅(由于 complete/error 通知或简单地 subscriber.unsubscribe()),inactive 订阅者将从 Subject 的列表中删除。