angular rxjs 改变了 websocket 的异步可观察性

angular rxjs change async observable from websocket

嗨,我正在使用 angular 7 和 rxjs async

在我的组件中,我将 ngFor 与异步观察器一起使用

<item-comp [item]="item" *ngFor="let item of groupsService.selectedItems$ | async; ">

</item-comp>

在我的服务中,我有一个 BehaviorSubject,它会在用户选择一个组时发出

  public groupSelected$: BehaviorSubject<any> = new BehaviorSubject(null);

这是 selectedItems$ Observable :

public selectedItems$ = this.groupSelected$.pipe(
    switchMap((group: any) => {
        if (!group)
          return new EmptyObservable();

        return this.http.get('/api/'+ group)
          .pipe(
            map((res: any) => {
                return res.items;
              }
            )
          )
      }
    )
  )

这有效,但现在我需要能够更改特定项目以响应 websocket 消息。 我有一个 websocket 连接,用于处理项目更新的消息。有没有办法使用 rxjs 的反应式方法来做到这一点?

您可以为 websocket 更新创建高阶函数,然后将其与您的 http 请求链接起来

const onUpdate=(items)=>updateFromWebSocket.pipe(
   map(itemUpdates=>{........ return updatedItems}
   startWith(items)
)

selectedItems$.pipe(switchMap(items=>onUpdate(items)).subscribe()