RxJS:我应该如何通过 socket.js 事件更新 observable 中的元素?

RxJS: How should I update elements in observable via socket.js event?

正在尝试重新实现公司的主要应用程序,从 Promises 和命令式风格切换到 RxJS 和函数式风格。并发现自己处于特定情况。

问题:我们在系统中有一个可以过滤的帖子列表。 因此,我创建了一个非常简单的过滤器组件和一个列表组件,我在其中接收如下数据:

this.items = this.filterChange
    .startWith(this.filterComponent.initialValue);
    .debounceTime(400)
    .switchMap((data) => this.process(data))
    .share();

但是我需要从更新列表中的单个元素的套接字插入另一个事件。我们的 process 函数可以理解这两个事件并对它们做出反应,但问题是:

merge(this.filterChange.debounceTime(400), this.socketUpdates)
    .startWith(this.filterComponent.initialValue);
    .switchMap((data) => this.process(data))
    .share();

如果更新在搜索更改后立即发生,我们将丢失搜索更改结果,在这种情况下我该怎么办?

PS: 我考虑过 combineLatest + pariwise 的一种选择,即拥有改变的能力并对此做出反应。有没有更优雅的方案?

如果所需的行为是在搜索输入消除抖动之前阻止来自套接字的任何更新,那么您应该使一个流依赖于另一个流,而不是合并。这可能是一种适用于您的用例的方法:

this.filterChange.switchMap( data => {
    return Observable.of( data )
           .debounceTime( 400 )
           .concat( this.socketUpdates );
  })
  .startWith(this.filterComponent.initialValue);
  .switchMap((data) => this.process(data))
  .share();

我不确切知道 debounceTime 运算符的用例是什么,如果你要对文本输入或其他一些数据进行去抖动,但你应该能够将它移出 switchMap 如果你喜欢。上面的这个实现可能真的 "chatty" 订阅了 socketUpdates 流,这也可能对你产生影响。

this.filterChange.debounceTime( 400 ).switchMap( data => {
    return Observable.of( data ).concat( this.socketUpdates );
  })
  .startWith(this.filterComponent.initialValue);
  .switchMap((data) => this.process(data))
  .share();

这实际上是 RxJS 的一个常见错误,你在应该使用 concatMap(或者 mergeMap)的时候使用了 switchMap

switchMap 只有与 non-sideeffectful read-only requests/messages 一起才真正安全。如果您正在尝试通过网络更新记录,并且您关心完成和处理结果的顺序,您希望使用 concatMap。如果你不关心结果的顺序,你应该使用 mergeMap.

  • switchMap - 当任何待处理的 Observable 获得一个新值时将取消订阅,从而丢弃它的响应。
  • mergeMap - 将开始并 运行 它立即创建并完成的所有 Observable,而不关心结果合并时返回的顺序。
  • concatMap - 一次只有 运行 个 Observable,按顺序,什么也不丢。
merge(this.filterChange.debounceTime(400), this.socketUpdates)
    .startWith(this.filterComponent.initialValue);
    .concatMap((data) => this.process(data))
    .share();