如何从 AsyncSubject(消费者模式)订阅一次元素
How to subscribe exactly once to an element from AsyncSubject (consumer pattern)
在 rxjs5
中,我有一个 AsyncSubject 并想多次订阅它,但只有一个订阅者应该收到 next()
事件。所有其他人(如果他们尚未取消订阅)应立即获得 complete()
事件而无需 next()
.
示例:
let fired = false;
let as = new AsyncSubject();
const setFired = () => {
if (fired == true) throw new Error("Multiple subscriptions executed");
fired = true;
}
let subscription1 = as.subscribe(setFired);
let subscription2 = as.subscribe(setFired);
// note that subscription1/2 could be unsubscribed from in the future
// and still only a single subscriber should be triggered
setTimeout(() => {
as.next(undefined);
as.complete();
}, 500);
您可以通过编写一个包含初始 AsyncSubject
的小 class 轻松实现这一点
import {AsyncSubject, Subject, Observable, Subscription} from 'rxjs/RX'
class SingleSubscriberObservable<T> {
private newSubscriberSubscribed = new Subject();
constructor(private sourceObservable: Observable<T>) {}
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
this.newSubscriberSubscribed.next();
return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}
}
然后您可以在示例中尝试一下:
const as = new AsyncSubject();
const single = new SingleSubscriberObservable(as)
let fired = false;
function setFired(label:string){
return ()=>{
if(fired == true) throw new Error("Multiple subscriptions executed");
console.log("FIRED", label);
fired = true;
}
}
function logDone(label: string){
return ()=>{
console.log(`${label} Will stop subscribing to source observable`);
}
}
const subscription1 = single.subscribe(setFired('First'), ()=>{}, logDone('First'));
const subscription2 = single.subscribe(setFired('Second'), ()=>{}, logDone('Second'));
const subscription3 = single.subscribe(setFired('Third'), ()=>{}, logDone('Third'));
setTimeout(()=>{
as.next(undefined);
as.complete();
}, 500)
这里的关键是这部分:
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
this.newSubscriberSusbscribed.next();
return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}
每次有人呼叫订阅时,我们都会向 newSubscriberSubscribed
主题发出信号。
当我们订阅底层 Observable 时,我们使用
takeUntil(this.newSubscriberSubscribed)
这意味着当下一个用户呼叫时:
this.newSubscriberSubscribed.next()
之前 returned 的 observable 将完成。
所以这将导致您所问的是,只要有新订阅出现,之前的订阅就会完成。
应用程序的输出将是:
First Will stop subscribing to source observable
Second Will stop subscribing to source observable
FIRED Third
Third Will stop subscribing to source observable
编辑:
如果您想在第一个订阅者保持订阅并且所有未来订阅立即完成的情况下执行此操作(这样当最早的订阅者仍然订阅时,没有其他人可以订阅)。你可以这样做:
class SingleSubscriberObservable<T> {
private isSubscribed: boolean = false;
constructor(private sourceObservable: Observable<T>) {}
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
if(this.isSubscribed){
return Observable.empty().subscribe(next, error, complete);
}
this.isSubscribed = true;
var unsubscribe = this.sourceObservable.subscribe(next, error, complete);
return new Subscription(()=>{
unsubscribe.unsubscribe();
this.isSubscribed = false;
});
}
}
我们保留一个标志 this.isSusbscribed
来跟踪当前是否有人订阅。我们还 return 一个自定义订阅,我们可以使用它在取消订阅时将此标志设置回 false。
每当有人尝试订阅时,如果我们改为将他们订阅到一个空的 Observable
,这将立即完成。输出看起来像:
Second Will stop subscribing to source observable
Third Will stop subscribing to source observable
FIRED First
First Will stop subscribing to source observable
最简单的方法是将 AsyncSubject 包装在另一个对象中,该对象仅处理调用 1 个订阅者的逻辑。假设您只想调用第一个订阅者,下面的代码应该是一个很好的起点
let as = new AsyncSubject();
const createSingleSubscriberAsyncSubject = as => {
// define empty array for subscribers
let subscribers = [];
const subscribe = callback => {
if (typeof callback !== 'function') throw new Error('callback provided is not a function');
subscribers.push(callback);
// return a function to unsubscribe
const unsubscribe = () => { subscribers = subscribers.filter(cb => cb !== callback); };
return unsubscribe;
};
// the main subscriber that will listen to the original AS
const mainSubscriber = (...args) => {
// you can change this logic to invoke the last subscriber
if (subscribers[0]) {
subscribers[0](...args);
}
};
as.subscribe(mainSubscriber);
return {
subscribe,
// expose original AS methods as needed
next: as.next.bind(as),
complete: as.complete.bind(as),
};
};
// use
const singleSub = createSingleSubscriberAsyncSubject(as);
// add 3 subscribers
const unsub1 = singleSub.subscribe(() => console.log('subscriber 1'));
const unsub2 = singleSub.subscribe(() => console.log('subscriber 2'));
const unsub3 = singleSub.subscribe(() => console.log('subscriber 3'));
// remove first subscriber
unsub1();
setTimeout(() => {
as.next(undefined);
as.complete();
// only 'subscriber 2' is printed
}, 500);
在 rxjs5
中,我有一个 AsyncSubject 并想多次订阅它,但只有一个订阅者应该收到 next()
事件。所有其他人(如果他们尚未取消订阅)应立即获得 complete()
事件而无需 next()
.
示例:
let fired = false;
let as = new AsyncSubject();
const setFired = () => {
if (fired == true) throw new Error("Multiple subscriptions executed");
fired = true;
}
let subscription1 = as.subscribe(setFired);
let subscription2 = as.subscribe(setFired);
// note that subscription1/2 could be unsubscribed from in the future
// and still only a single subscriber should be triggered
setTimeout(() => {
as.next(undefined);
as.complete();
}, 500);
您可以通过编写一个包含初始 AsyncSubject
import {AsyncSubject, Subject, Observable, Subscription} from 'rxjs/RX'
class SingleSubscriberObservable<T> {
private newSubscriberSubscribed = new Subject();
constructor(private sourceObservable: Observable<T>) {}
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
this.newSubscriberSubscribed.next();
return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}
}
然后您可以在示例中尝试一下:
const as = new AsyncSubject();
const single = new SingleSubscriberObservable(as)
let fired = false;
function setFired(label:string){
return ()=>{
if(fired == true) throw new Error("Multiple subscriptions executed");
console.log("FIRED", label);
fired = true;
}
}
function logDone(label: string){
return ()=>{
console.log(`${label} Will stop subscribing to source observable`);
}
}
const subscription1 = single.subscribe(setFired('First'), ()=>{}, logDone('First'));
const subscription2 = single.subscribe(setFired('Second'), ()=>{}, logDone('Second'));
const subscription3 = single.subscribe(setFired('Third'), ()=>{}, logDone('Third'));
setTimeout(()=>{
as.next(undefined);
as.complete();
}, 500)
这里的关键是这部分:
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
this.newSubscriberSusbscribed.next();
return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}
每次有人呼叫订阅时,我们都会向 newSubscriberSubscribed
主题发出信号。
当我们订阅底层 Observable 时,我们使用
takeUntil(this.newSubscriberSubscribed)
这意味着当下一个用户呼叫时:
this.newSubscriberSubscribed.next()
之前 returned 的 observable 将完成。
所以这将导致您所问的是,只要有新订阅出现,之前的订阅就会完成。
应用程序的输出将是:
First Will stop subscribing to source observable
Second Will stop subscribing to source observable
FIRED Third
Third Will stop subscribing to source observable
编辑:
如果您想在第一个订阅者保持订阅并且所有未来订阅立即完成的情况下执行此操作(这样当最早的订阅者仍然订阅时,没有其他人可以订阅)。你可以这样做:
class SingleSubscriberObservable<T> {
private isSubscribed: boolean = false;
constructor(private sourceObservable: Observable<T>) {}
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
if(this.isSubscribed){
return Observable.empty().subscribe(next, error, complete);
}
this.isSubscribed = true;
var unsubscribe = this.sourceObservable.subscribe(next, error, complete);
return new Subscription(()=>{
unsubscribe.unsubscribe();
this.isSubscribed = false;
});
}
}
我们保留一个标志 this.isSusbscribed
来跟踪当前是否有人订阅。我们还 return 一个自定义订阅,我们可以使用它在取消订阅时将此标志设置回 false。
每当有人尝试订阅时,如果我们改为将他们订阅到一个空的 Observable
,这将立即完成。输出看起来像:
Second Will stop subscribing to source observable
Third Will stop subscribing to source observable
FIRED First
First Will stop subscribing to source observable
最简单的方法是将 AsyncSubject 包装在另一个对象中,该对象仅处理调用 1 个订阅者的逻辑。假设您只想调用第一个订阅者,下面的代码应该是一个很好的起点
let as = new AsyncSubject();
const createSingleSubscriberAsyncSubject = as => {
// define empty array for subscribers
let subscribers = [];
const subscribe = callback => {
if (typeof callback !== 'function') throw new Error('callback provided is not a function');
subscribers.push(callback);
// return a function to unsubscribe
const unsubscribe = () => { subscribers = subscribers.filter(cb => cb !== callback); };
return unsubscribe;
};
// the main subscriber that will listen to the original AS
const mainSubscriber = (...args) => {
// you can change this logic to invoke the last subscriber
if (subscribers[0]) {
subscribers[0](...args);
}
};
as.subscribe(mainSubscriber);
return {
subscribe,
// expose original AS methods as needed
next: as.next.bind(as),
complete: as.complete.bind(as),
};
};
// use
const singleSub = createSingleSubscriberAsyncSubject(as);
// add 3 subscribers
const unsub1 = singleSub.subscribe(() => console.log('subscriber 1'));
const unsub2 = singleSub.subscribe(() => console.log('subscriber 2'));
const unsub3 = singleSub.subscribe(() => console.log('subscriber 3'));
// remove first subscriber
unsub1();
setTimeout(() => {
as.next(undefined);
as.complete();
// only 'subscriber 2' is printed
}, 500);