订阅推送通知服务后添加管道

Adding pipes after subscribing to a push notification service

情况:
我遇到了 rxjs Observable 系统的用例,我可能需要在它启动后向 Subscription 添加 piped 命令。

就我而言,我正在处理的应用程序必须被动地收听推送通知系统。可以通过该系统推出许多消息,我的系统需要对此做出响应。 但是,有一个可以预见的情况,未来要实现的动态加载视图需要在推送通知系统中添加一个监听器。

问题:
鉴于我的应用程序处于 Subscription 已经存在的状态,我可以在 .subscribe(() => {}) 被调用后添加一个额外的管道吗?

// this.something is an Observable<any>, for discussion purposes.
const subscription = this.something.subscribe(() => { // commands });

this.something.pipe(
  map((something) => {
    // ...Commands that I want to add to the subscription...
  })
);

...如果我这样做,那么会发生什么?

解法:
@user2216584 和@SerejaBogolubov 的两个回答都对这个问题有一个方面的回答。

我的高级推送通知侦听器服务需要做两件事:

  1. 坚持订阅,
  2. 能够从听众列表中挑选。

复杂的是每个侦听器需要侦听不同的消息。换句话说,如果我在 foo_DEV 上收到一条消息,应用程序需要做一些不同于推送通知系统在 bar_DEV.

上推送消息的事情。

所以,这是我想出的:

export interface PushNotificationListener {
  name: string,
  onMessageReceived: (msg: PushNotificationMessage) => any,
  messageSubject$: Subject<PushNotificationMessage>
}

export class PushNotificationListenerService {
  private connection$: Observable<PushNotificationConnection>;
  private subscription$: Subscription;

  private listeners: PushNotificationListener[] = [];

  constructor(
    private connectionManager: PushNotificationConnectionManager
  ) {
  }

  connect() {
    // Step 1 - Open the socket connection!
    this.connection$ = this.connectionManager.connect(
      // The arguments for setting up the websocket are unimportant here.
      // The underlying implementation is similarly unimportant.
    );
  } 

  setListener(
    name: string,
    onMessageReceived: (msg: PushNotificationMessage) => any
  ) {
    // Step 3...or maybe 2...(shrug)...
    // Set listeners that the subscription to the high-order connection
    // will employ.
    const newListener: PushNotificationListener = {
      name: name,
      onMessageReceived: onMessageReceived,
      messageSubject$: null
    };

    this.listeners.push(newListener);
  }

  listen() {
    // Step 2 - Listen for changes to the high-order connection observable.
    this.subscription$ = this.connection$
      .subscribe((connection: PushNotificationConnection) => {
        console.info('Push notification connection established');

        for (let listener of this.listeners) {
         listener.messageSubject$ = connection.subscribe(listener.name);
         listener.messageSubject$.subscribe((message: PushNotificationMessage) => {
           listener.onMessageReceived(message);
         }
        }
      },
      (error: any) => {
        console.warn('Push notification connection error', error);
      }
  }
}

通过仔细研究构成我的推送通知系统核心的内部代码,我发现我们已经有了更高阶的Observable。 websocket 代码创建了一个可观察对象 (connectionManager.connect()),它需要缓存在服务中并被订阅。由于该代码特定于我工作的地方,因此我不能再多说了。

但是,缓存侦听器也很重要! .listen() 中的 subscribe 调用只是在连接状态更改时遍历所有附加的侦听器,因此我可以通过 .addListener() 即兴添加侦听器,并且由于 rxjs 的 [=12] =] 系统本身就可以工作,AND 事实上,我是在一个范围内的听众列表中工作,我有一个系统,我可以动态设置听众,即使 .connect() 在配置任何侦听器之前调用。

这段代码可能仍然可以从 redesign/refactoring 中受益,但我有一些有用的东西,这是任何好的编码的重要的第一步。谢谢大家!

[我正在编辑我的答案,因为之前的答案是根据作者分享的第一个代码;如评论中所述,作者有changed/corrected代码]-

我怀疑以下代码是否会影响订阅中的任何内容 -

this.something.pipe(
  map((something) => {
    // ...Commands that I want to add to the subscription...
  })
);

您可以在最初设置可观察对象时尝试高阶函数,如果高阶函数在范围内,您可以重新分配它。由于以下原因,我也怀疑它是否会起作用 -

  1. 设置 Observable 后,observable 会保留传递的函数的引用,该函数将在订阅时调用 [https://medium.com/@benlesh/learning-observable-by-building-observable-d5da57405d87]。现在,如果您重新分配高阶函数,那么可观察函数仍然指向旧引用。通过重新分配高阶函数,您没有更改最初设置可观察对象时设置的原始函数引用。

  2. 假设由于某种原因,高阶重新分配有效,在这种情况下,也很有可能在您的旧 higher-order 函数执行之前您可能已经重新分配了您的高阶函数(因为如果 source observable 对后端进行异步调用,在等待代码时,javascript 事件循环可能已经重新分配了高阶函数,当异步调用返回时它将执行新分配的高阶函数)。也许这段代码会阐明我的观点-

让 higherOrderFunc = map(x => x * 2);

this.something
    .pipe(
          mergeMap(_ => //call to backend; async call),
          higherOrderFunc,
         ).subscribe();
higherOrderFunc = map(x => x * 3); // this will execute before async call completes

好吧,你可以很容易地做到这一点。比如说,你想要一些 runtime-deferred map。比起像 map(this.myMapper) 这样的操作,其中 myMapper 是在适当范围内可见的私有字段。通过改变该私有字段,您可以有点 add/remove 额外的行为。例如,map(x => x) 表示没有任何映射。

但是,在我看来,您在滥用 rxjs。很可能您真正需要的是正确的高阶可观察对象(发出可观察对象的可观察对象,"stream of streams")。那将是更 rxjsic 和更清洁的解决方案。所以三思而后行。