订阅推送通知服务后添加管道
Adding pipes after subscribing to a push notification service
情况:
我遇到了 rxjs Observable
系统的用例,我可能需要在它启动后向 Subscription
添加 pipe
d 命令。
就我而言,我正在处理的应用程序必须被动地收听推送通知系统。可以通过该系统推出许多消息,我的系统需要对此做出响应。 但是,有一个可以预见的情况,未来要实现的动态加载视图需要在推送通知系统中添加一个监听器。
问题:
鉴于我的应用程序处于 Subscription
已经存在的状态,我可以在 .subscribe(() => {})
被调用后添加一个额外的管道吗?
// this.something is an Observable<any>, for discussion purposes.
const subscription = this.something.subscribe(() => { // commands });
this.something.pipe(
map((something) => {
// ...Commands that I want to add to the subscription...
})
);
...如果我这样做,那么会发生什么?
解法:
@user2216584 和@SerejaBogolubov 的两个回答都对这个问题有一个方面的回答。
我的高级推送通知侦听器服务需要做两件事:
- 坚持订阅,
- 能够从听众列表中挑选。
复杂的是每个侦听器需要侦听不同的消息。换句话说,如果我在 foo_DEV
上收到一条消息,应用程序需要做一些不同于推送通知系统在 bar_DEV
.
上推送消息的事情。
所以,这是我想出的:
export interface PushNotificationListener {
name: string,
onMessageReceived: (msg: PushNotificationMessage) => any,
messageSubject$: Subject<PushNotificationMessage>
}
export class PushNotificationListenerService {
private connection$: Observable<PushNotificationConnection>;
private subscription$: Subscription;
private listeners: PushNotificationListener[] = [];
constructor(
private connectionManager: PushNotificationConnectionManager
) {
}
connect() {
// Step 1 - Open the socket connection!
this.connection$ = this.connectionManager.connect(
// The arguments for setting up the websocket are unimportant here.
// The underlying implementation is similarly unimportant.
);
}
setListener(
name: string,
onMessageReceived: (msg: PushNotificationMessage) => any
) {
// Step 3...or maybe 2...(shrug)...
// Set listeners that the subscription to the high-order connection
// will employ.
const newListener: PushNotificationListener = {
name: name,
onMessageReceived: onMessageReceived,
messageSubject$: null
};
this.listeners.push(newListener);
}
listen() {
// Step 2 - Listen for changes to the high-order connection observable.
this.subscription$ = this.connection$
.subscribe((connection: PushNotificationConnection) => {
console.info('Push notification connection established');
for (let listener of this.listeners) {
listener.messageSubject$ = connection.subscribe(listener.name);
listener.messageSubject$.subscribe((message: PushNotificationMessage) => {
listener.onMessageReceived(message);
}
}
},
(error: any) => {
console.warn('Push notification connection error', error);
}
}
}
通过仔细研究构成我的推送通知系统核心的内部代码,我发现我们已经有了更高阶的Observable
。 websocket 代码创建了一个可观察对象 (connectionManager.connect()
),它需要缓存在服务中并被订阅。由于该代码特定于我工作的地方,因此我不能再多说了。
但是,缓存侦听器也很重要! .listen()
中的 subscribe
调用只是在连接状态更改时遍历所有附加的侦听器,因此我可以通过 .addListener()
即兴添加侦听器,并且由于 rxjs 的 [=12] =] 系统本身就可以工作,AND 事实上,我是在一个范围内的听众列表中工作,我有一个系统,我可以动态设置听众,即使 .connect()
在配置任何侦听器之前调用。
这段代码可能仍然可以从 redesign/refactoring 中受益,但我有一些有用的东西,这是任何好的编码的重要的第一步。谢谢大家!
[我正在编辑我的答案,因为之前的答案是根据作者分享的第一个代码;如评论中所述,作者有changed/corrected代码]-
我怀疑以下代码是否会影响订阅中的任何内容 -
this.something.pipe(
map((something) => {
// ...Commands that I want to add to the subscription...
})
);
您可以在最初设置可观察对象时尝试高阶函数,如果高阶函数在范围内,您可以重新分配它。由于以下原因,我也怀疑它是否会起作用 -
设置 Observable 后,observable 会保留传递的函数的引用,该函数将在订阅时调用 [https://medium.com/@benlesh/learning-observable-by-building-observable-d5da57405d87]。现在,如果您重新分配高阶函数,那么可观察函数仍然指向旧引用。通过重新分配高阶函数,您没有更改最初设置可观察对象时设置的原始函数引用。
假设由于某种原因,高阶重新分配有效,在这种情况下,也很有可能在您的旧 higher-order 函数执行之前您可能已经重新分配了您的高阶函数(因为如果 source observable 对后端进行异步调用,在等待代码时,javascript 事件循环可能已经重新分配了高阶函数,当异步调用返回时它将执行新分配的高阶函数)。也许这段代码会阐明我的观点-
让 higherOrderFunc = map(x => x * 2);
this.something
.pipe(
mergeMap(_ => //call to backend; async call),
higherOrderFunc,
).subscribe();
higherOrderFunc = map(x => x * 3); // this will execute before async call completes
好吧,你可以很容易地做到这一点。比如说,你想要一些 runtime-deferred map
。比起像 map(this.myMapper)
这样的操作,其中 myMapper
是在适当范围内可见的私有字段。通过改变该私有字段,您可以有点 add/remove 额外的行为。例如,map(x => x)
表示没有任何映射。
但是,在我看来,您在滥用 rxjs
。很可能您真正需要的是正确的高阶可观察对象(发出可观察对象的可观察对象,"stream of streams")。那将是更 rxjs
ic 和更清洁的解决方案。所以三思而后行。
情况:
我遇到了 rxjs Observable
系统的用例,我可能需要在它启动后向 Subscription
添加 pipe
d 命令。
就我而言,我正在处理的应用程序必须被动地收听推送通知系统。可以通过该系统推出许多消息,我的系统需要对此做出响应。 但是,有一个可以预见的情况,未来要实现的动态加载视图需要在推送通知系统中添加一个监听器。
问题:
鉴于我的应用程序处于 Subscription
已经存在的状态,我可以在 .subscribe(() => {})
被调用后添加一个额外的管道吗?
// this.something is an Observable<any>, for discussion purposes.
const subscription = this.something.subscribe(() => { // commands });
this.something.pipe(
map((something) => {
// ...Commands that I want to add to the subscription...
})
);
...如果我这样做,那么会发生什么?
解法:
@user2216584 和@SerejaBogolubov 的两个回答都对这个问题有一个方面的回答。
我的高级推送通知侦听器服务需要做两件事:
- 坚持订阅,
- 能够从听众列表中挑选。
复杂的是每个侦听器需要侦听不同的消息。换句话说,如果我在 foo_DEV
上收到一条消息,应用程序需要做一些不同于推送通知系统在 bar_DEV
.
所以,这是我想出的:
export interface PushNotificationListener {
name: string,
onMessageReceived: (msg: PushNotificationMessage) => any,
messageSubject$: Subject<PushNotificationMessage>
}
export class PushNotificationListenerService {
private connection$: Observable<PushNotificationConnection>;
private subscription$: Subscription;
private listeners: PushNotificationListener[] = [];
constructor(
private connectionManager: PushNotificationConnectionManager
) {
}
connect() {
// Step 1 - Open the socket connection!
this.connection$ = this.connectionManager.connect(
// The arguments for setting up the websocket are unimportant here.
// The underlying implementation is similarly unimportant.
);
}
setListener(
name: string,
onMessageReceived: (msg: PushNotificationMessage) => any
) {
// Step 3...or maybe 2...(shrug)...
// Set listeners that the subscription to the high-order connection
// will employ.
const newListener: PushNotificationListener = {
name: name,
onMessageReceived: onMessageReceived,
messageSubject$: null
};
this.listeners.push(newListener);
}
listen() {
// Step 2 - Listen for changes to the high-order connection observable.
this.subscription$ = this.connection$
.subscribe((connection: PushNotificationConnection) => {
console.info('Push notification connection established');
for (let listener of this.listeners) {
listener.messageSubject$ = connection.subscribe(listener.name);
listener.messageSubject$.subscribe((message: PushNotificationMessage) => {
listener.onMessageReceived(message);
}
}
},
(error: any) => {
console.warn('Push notification connection error', error);
}
}
}
通过仔细研究构成我的推送通知系统核心的内部代码,我发现我们已经有了更高阶的Observable
。 websocket 代码创建了一个可观察对象 (connectionManager.connect()
),它需要缓存在服务中并被订阅。由于该代码特定于我工作的地方,因此我不能再多说了。
但是,缓存侦听器也很重要! .listen()
中的 subscribe
调用只是在连接状态更改时遍历所有附加的侦听器,因此我可以通过 .addListener()
即兴添加侦听器,并且由于 rxjs 的 [=12] =] 系统本身就可以工作,AND 事实上,我是在一个范围内的听众列表中工作,我有一个系统,我可以动态设置听众,即使 .connect()
在配置任何侦听器之前调用。
这段代码可能仍然可以从 redesign/refactoring 中受益,但我有一些有用的东西,这是任何好的编码的重要的第一步。谢谢大家!
[我正在编辑我的答案,因为之前的答案是根据作者分享的第一个代码;如评论中所述,作者有changed/corrected代码]-
我怀疑以下代码是否会影响订阅中的任何内容 -
this.something.pipe(
map((something) => {
// ...Commands that I want to add to the subscription...
})
);
您可以在最初设置可观察对象时尝试高阶函数,如果高阶函数在范围内,您可以重新分配它。由于以下原因,我也怀疑它是否会起作用 -
设置 Observable 后,observable 会保留传递的函数的引用,该函数将在订阅时调用 [https://medium.com/@benlesh/learning-observable-by-building-observable-d5da57405d87]。现在,如果您重新分配高阶函数,那么可观察函数仍然指向旧引用。通过重新分配高阶函数,您没有更改最初设置可观察对象时设置的原始函数引用。
假设由于某种原因,高阶重新分配有效,在这种情况下,也很有可能在您的旧 higher-order 函数执行之前您可能已经重新分配了您的高阶函数(因为如果 source observable 对后端进行异步调用,在等待代码时,javascript 事件循环可能已经重新分配了高阶函数,当异步调用返回时它将执行新分配的高阶函数)。也许这段代码会阐明我的观点-
让 higherOrderFunc = map(x => x * 2);
this.something
.pipe(
mergeMap(_ => //call to backend; async call),
higherOrderFunc,
).subscribe();
higherOrderFunc = map(x => x * 3); // this will execute before async call completes
好吧,你可以很容易地做到这一点。比如说,你想要一些 runtime-deferred map
。比起像 map(this.myMapper)
这样的操作,其中 myMapper
是在适当范围内可见的私有字段。通过改变该私有字段,您可以有点 add/remove 额外的行为。例如,map(x => x)
表示没有任何映射。
但是,在我看来,您在滥用 rxjs
。很可能您真正需要的是正确的高阶可观察对象(发出可观察对象的可观察对象,"stream of streams")。那将是更 rxjs
ic 和更清洁的解决方案。所以三思而后行。