如何从所有消息的可观察对象中创建未读消息的可观察对象

How to create an observable of the unread messages from the observable of all the messages

有一个websocket回显服务,发送和接收消息:

@Injectable()
export class NotificationsService {
  private ws = new $WebSocket('wss://echo.websocket.org');

  wsSendMessage(message: string = `Message ${Math.random()}`): void {
    this.ws.send(message);
  }

  wsGetMessages(): Observable<string> {
    return Observable.from(this.ws.getDataStream())
      .map((res: any) => res.data);
  }
}

通知组件订阅此可观察对象并在通知下拉面板中显示所有消息:

  getMessages(): void {
    this.notificationsService.wsGetMessages().subscribe(
        (message: string) => {
          this.messages.unshift(message);
        },
        (err: any) => console.log(err)
    );    
  }

我需要在通知下拉面板打开时在导航栏组件中显示未读消息数并将其设置为 0。我不知道该怎么做,所以我需要你的帮助...

P.S.

现在 websocket 回显服务是:

@Injectable()
export class NotificationsService {
  private ws = new $WebSocket('wss://echo.websocket.org');

  wsSendMessage(message: string = `Message ${Math.random()}`): void {
    this.ws.send(message);
  }

  wsGetMessages(): Observable<Object> {
    return Observable.from(this.ws.getDataStream())
      .map((res: any) => res.data);
  }

  messagesReadStream(msgArray: Object[]): Observable<Object> {
    return Observable.from(msgArray);
  }
}

通知组件是:

export class Notifications implements OnInit {
  messagesAll:string[] = [];
  messagesRead:string[] = [];
  newMessagesNumber:number = 0;

  constructor(private notificationsService: NotificationsService) {
  }

  addMessage(): void {
    this.notificationsService.wsSendMessage();
  }

  getMessages(): void {
    this.notificationsService.wsGetMessages()
      .subscribe(
        (message: any) => this.messagesAll.unshift(message),
        (err: any) => console.error(err)
      );
  }

  readMessages(): void {
    this.messagesRead = this.messagesAll.slice();

    this.notificationsService.messagesReadStream(this.messagesRead)
      .switchMap(() => 
        this.notificationsService
          .wsGetMessages()
          .scan((unread, _) => unread + 1, 0)    
          .startWith(0))
      .subscribe(
        (count: number) => this.newMessagesNumber = count,
        (err: any) => console.error(err)
      );    
  }

  ngOnInit(): void {
    this.getMessages();
  }  
}

使用模板:

<button type="button" class="btn btn-primary-outline btn-sm" (click) = "addMessage()">
  Add message
</button>  

<button type="button" class="btn btn-primary-outline btn-sm" (click) = "readMessages()">
  Read messages
</button>  

<div style="margin-top: 1rem">
  <span class="circle bg-warning fw-bold">
      {{newMessagesNumber}}
  </span>  
</div>

现在,如果我单击 button 'Add message',我的计数器不会更改 0 的值。并且仅当我单击 button 'Add message',然后单击 button 'Read messages',然后再次单击 button 'Add message' 时,计数器才开始正常工作。你能解释一下,问题是什么吗?

您将需要不止一个 Observable。一个包含计数,一个用于在消息已被阅读时发出信号。前者将使用 wsGetMessage 流,而后者必须连接到通知面板打开时触发的 Observable。

const panelOpened: Observable<any> = //..Need a signalling mechanism for when the messages get read

const unreadCount: Observable<number> = messagesRead.switchMap(() => 
   this.notificationsService
         .wsGetMessages()
         //Just count all of the messages as they come in.
         .scan((unread, _) => unread + 1, 0)
         //Make sure it always gets reset to zero
         .startWith(0));

//Subscribe to updates on count changes.
unreadCount.subscribe(count => this.unreadMessageCount = count);