使 ReplaySubject return 仅订阅的最后一个值

Make a ReplaySubject return only the last value on subscribe

我有一个奇怪的用例,我需要跟踪所有以前发出的事件。

多亏了 ReplaySubject,它到目前为止工作得很好。在每个新订阅者上,此主题都会重新发出所有以前的事件。

现在,对于特定场景,我需要能够仅提供最新发布的事件(有点像 BehaviorSubject),但保持源相同的事件。

这是我要实现的目标的片段:stackblitz

import { ReplaySubject, BehaviorSubject, from } from "rxjs";

class EventManager {
  constructor() {
    this.mySubject = new ReplaySubject();
  }

  publish(value) {
    this.mySubject.next(value);
  }

  fullSubscribe(next, error, complete) {
    return this.mySubject.subscribe(next, error, complete);
  }

  subscribe(next, error, complete) {
    return this.mySubject.pipe(/* an operator to get the last one on new subscribe */).subscribe(next, error, complete);
  }
}

const myEventManager = new EventManager();

myEventManager.publish("Data 1");
myEventManager.publish("Data 2");
myEventManager.publish("Data 3");

myEventManager.fullSubscribe(v => {
  console.log("SUB 1", v);
});

myEventManager.subscribe(v => {
  console.log("SUB 2", v);
});

谢谢

为什么不在 EventManager 上创建 ReplaySubjectBehaviorSubject 的实例?

import { ReplaySubject, BehaviorSubject, from } from "rxjs";

class EventManager {
  constructor() {
    this.replaySubject = new ReplaySubject();
    this.behaviorSubject = new BehaviorSubject();
  }

  publish(value) {
    this.replaySubject.next(value);
    this.behaviorSubject.next(value);
  }

  fullSubscribe(next, error, complete) {
    return this.replaySubject.subscribe(next, error, complete);
  }

  subscribe(next, error, complete) {
    return this.behaviorSubject.subscribe(next, error, complete);
  }
}

如果您跟踪已发布的活动数量,您可以使用 skip:

  subscribe(next, error?, complete?) {
    return this.mySubject.pipe(
      skip(this.publishCount - 1)
    ).subscribe(next, error, complete);
  }

这是一个 StackBlitz 演示。

您可以通过操纵 BehaviorSubject.

来达到类似 ReplaySubject 的行为,而不是强制 ReplaySubject 表现得像 BehaviorSubject
import { BehaviorSubject, from, concat } from 'rxjs';
import { scan, shareReplay } from 'rxjs/operators';

class EventManager {
  constructor() {
    this.mySubject = new BehaviorSubject();
    this.allEmittedValues = this.mySubject.pipe(
      scan((xs, x) => [...xs, x], []),
      shareReplay(1)
    );

    // Necessary since we need to start accumulating allEmittedValues
    // immediately.
    this.allEmittedValues.subscribe();
  }

  dispose() {
    // ends all subscriptions
    this.mySubject.complete();
  }

  publish(value) {
    this.mySubject.next(value);
  }

  fullSubscribe(next, error, complete) {
    // First, take the latest value of the accumulated array of emits and
    // unroll it into an observable
    const existingEmits$ = this.allEmittedValues.pipe(
      take(1),
      concatMap((emits) => from(emits))
    );
    // Then, subscribe to the main subject, skipping the replayed value since
    // we just got it at the tail end of existingEmits$
    const futureEmits$ = this.mySubject.pipe(skip(1));

    return concat(existingEmits$, futureEmits$).subscribe(
      next,
      error,
      complete
    );
  }

  subscribe(next, error, complete) {
    return this.mySubject.subscribe(next, error, complete);
  }
}