当一个新的高阶 Observable 被发射时,取消之前的内部 Observables

Cancel previous inner Observables when a new higher-order Observable is emitted

考虑以下代码:

this.msgService.getUserChatList()
    .do( (list) => { 
      this.unread = false;
      console.log(list);
     } )
    .mergeMap( chats => Observable.from(chats) )
    .mergeMap( chat => this.msgService.getLastMessage(chat['id']).map( lastMessage => this.containsUnreadMessages(lastMessage, chat['lastPresence']) ) )
    .filter( state => state === true )
    .subscribe( (unread) => {
      this.unread = true;
      console.log('result ', res);
    } );

getUserChatList(): - 每次聊天发生变化时发出一个元素 - 元素是包含所有聊天元数据的原始数组 - 从未完成

getLastMessage(): - 是一个永远不会完成的 Observable

在第二个 mergeMap 中,我调用函数 getLastMessage()。 我需要观察这个可观察的 直到 getUserChatList() 发出一个新项目,否则我会对同一聊天的最后一条消息进行多次观察。

插图:

  1. getUserChatList 发出:[chatMetaA:{}, chatMetaB:{}]
  2. 代码通过getLastMessage并开始观察chatA和chatB的lastMessage
  3. 其中一个聊天更改因此 getUserChatList 发出了一个新项目,其中包含新版本的聊天元数据:[chatMetaA:{}, chatMetaB:{}]
  4. 代码通过 getLastMessage 并开始观察 chatA 和 chatB 的 lastMessage。所以我们现在观察两次 chatA 和 chatB
  5. 的最后一条消息

它会一直持续下去...

我的问题是,一旦 getUserChatList() 发出新项目,我如何取消对 getLastMessage() 的观察?我尝试使用 switch 但无法使其正常工作

解决方案确实是使用 switchMap:

this.msgService.getUserChatList()
    .do( () => { this.unread = false } )
    .switchMap(
      chats => Observable.from(chats)
        .mergeMap( chat => this.msgService.getLastMessage(chat['id'])
        .map( lastMessage => this.containsUnreadMessages(lastMessage, chat['lastPresence']) ) )
    )
    .filter( state => state === true )
    .subscribe( (unread) => {
      this.unread = true;
      console.log('result ', res);
    } );