使 ReplaySubject return 仅订阅的最后一个值
Make a ReplaySubject return only the last value on subscribe
我有一个奇怪的用例,我需要跟踪所有以前发出的事件。
多亏了 ReplaySubject,它到目前为止工作得很好。在每个新订阅者上,此主题都会重新发出所有以前的事件。
现在,对于特定场景,我需要能够仅提供最新发布的事件(有点像 BehaviorSubject),但保持源相同的事件。
这是我要实现的目标的片段:stackblitz
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.mySubject = new ReplaySubject();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, complete) {
return this.mySubject.subscribe(next, error, complete);
}
subscribe(next, error, complete) {
return this.mySubject.pipe(/* an operator to get the last one on new subscribe */).subscribe(next, error, complete);
}
}
const myEventManager = new EventManager();
myEventManager.publish("Data 1");
myEventManager.publish("Data 2");
myEventManager.publish("Data 3");
myEventManager.fullSubscribe(v => {
console.log("SUB 1", v);
});
myEventManager.subscribe(v => {
console.log("SUB 2", v);
});
谢谢
为什么不在 EventManager
上创建 ReplaySubject
和 BehaviorSubject
的实例?
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.replaySubject = new ReplaySubject();
this.behaviorSubject = new BehaviorSubject();
}
publish(value) {
this.replaySubject.next(value);
this.behaviorSubject.next(value);
}
fullSubscribe(next, error, complete) {
return this.replaySubject.subscribe(next, error, complete);
}
subscribe(next, error, complete) {
return this.behaviorSubject.subscribe(next, error, complete);
}
}
如果您跟踪已发布的活动数量,您可以使用 skip
:
subscribe(next, error?, complete?) {
return this.mySubject.pipe(
skip(this.publishCount - 1)
).subscribe(next, error, complete);
}
这是一个 StackBlitz 演示。
您可以通过操纵 BehaviorSubject
.
来达到类似 ReplaySubject
的行为,而不是强制 ReplaySubject
表现得像 BehaviorSubject
import { BehaviorSubject, from, concat } from 'rxjs';
import { scan, shareReplay } from 'rxjs/operators';
class EventManager {
constructor() {
this.mySubject = new BehaviorSubject();
this.allEmittedValues = this.mySubject.pipe(
scan((xs, x) => [...xs, x], []),
shareReplay(1)
);
// Necessary since we need to start accumulating allEmittedValues
// immediately.
this.allEmittedValues.subscribe();
}
dispose() {
// ends all subscriptions
this.mySubject.complete();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, complete) {
// First, take the latest value of the accumulated array of emits and
// unroll it into an observable
const existingEmits$ = this.allEmittedValues.pipe(
take(1),
concatMap((emits) => from(emits))
);
// Then, subscribe to the main subject, skipping the replayed value since
// we just got it at the tail end of existingEmits$
const futureEmits$ = this.mySubject.pipe(skip(1));
return concat(existingEmits$, futureEmits$).subscribe(
next,
error,
complete
);
}
subscribe(next, error, complete) {
return this.mySubject.subscribe(next, error, complete);
}
}
我有一个奇怪的用例,我需要跟踪所有以前发出的事件。
多亏了 ReplaySubject,它到目前为止工作得很好。在每个新订阅者上,此主题都会重新发出所有以前的事件。
现在,对于特定场景,我需要能够仅提供最新发布的事件(有点像 BehaviorSubject),但保持源相同的事件。
这是我要实现的目标的片段:stackblitz
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.mySubject = new ReplaySubject();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, complete) {
return this.mySubject.subscribe(next, error, complete);
}
subscribe(next, error, complete) {
return this.mySubject.pipe(/* an operator to get the last one on new subscribe */).subscribe(next, error, complete);
}
}
const myEventManager = new EventManager();
myEventManager.publish("Data 1");
myEventManager.publish("Data 2");
myEventManager.publish("Data 3");
myEventManager.fullSubscribe(v => {
console.log("SUB 1", v);
});
myEventManager.subscribe(v => {
console.log("SUB 2", v);
});
谢谢
为什么不在 EventManager
上创建 ReplaySubject
和 BehaviorSubject
的实例?
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.replaySubject = new ReplaySubject();
this.behaviorSubject = new BehaviorSubject();
}
publish(value) {
this.replaySubject.next(value);
this.behaviorSubject.next(value);
}
fullSubscribe(next, error, complete) {
return this.replaySubject.subscribe(next, error, complete);
}
subscribe(next, error, complete) {
return this.behaviorSubject.subscribe(next, error, complete);
}
}
如果您跟踪已发布的活动数量,您可以使用 skip
:
subscribe(next, error?, complete?) {
return this.mySubject.pipe(
skip(this.publishCount - 1)
).subscribe(next, error, complete);
}
这是一个 StackBlitz 演示。
您可以通过操纵 BehaviorSubject
.
ReplaySubject
的行为,而不是强制 ReplaySubject
表现得像 BehaviorSubject
import { BehaviorSubject, from, concat } from 'rxjs';
import { scan, shareReplay } from 'rxjs/operators';
class EventManager {
constructor() {
this.mySubject = new BehaviorSubject();
this.allEmittedValues = this.mySubject.pipe(
scan((xs, x) => [...xs, x], []),
shareReplay(1)
);
// Necessary since we need to start accumulating allEmittedValues
// immediately.
this.allEmittedValues.subscribe();
}
dispose() {
// ends all subscriptions
this.mySubject.complete();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, complete) {
// First, take the latest value of the accumulated array of emits and
// unroll it into an observable
const existingEmits$ = this.allEmittedValues.pipe(
take(1),
concatMap((emits) => from(emits))
);
// Then, subscribe to the main subject, skipping the replayed value since
// we just got it at the tail end of existingEmits$
const futureEmits$ = this.mySubject.pipe(skip(1));
return concat(existingEmits$, futureEmits$).subscribe(
next,
error,
complete
);
}
subscribe(next, error, complete) {
return this.mySubject.subscribe(next, error, complete);
}
}