RxJS:在组合流中识别流源的正确模式?

RxJS: correct patterns for identifying stream source in combined stream?

我正在寻找一些最佳实践建议,以了解如何以仅对新流进行操作的方式识别哪个流正在进入 mergecombineLatest 函数。

在 TODO 应用程序的上下文中,我有传入的添加和删除流,我想将它们组合起来,以便我的列表编辑可以在单个流中以无状态方式进行。输出是一个集成了 add(concat) 和 remove(filter) 事件的列表,否则你似乎会得到包含所有添加或删除事件的其他流,从而无限增长。

遇到的问题包括:

目前发现的似乎不太理想的方法包括:

非常欢迎任何最佳实践建议。

没有 Rxjs 方法可以识别流的来源。流上没有名称。如果你想要一个,你需要自己放。因此,如果您喜欢这两种方式中的 none,请寻找另一种方式为它们命名。如果您的问题是在对象上放置标识符的最佳做法是什么,那么答案就是在该死的对象上放置标识符。

我个人喜欢的方式是通过柯里化函数:

function label(identifier){return function (x){var obj={};obj[identifier]=x;return obj;};}

我使用如下:

source1.map(label('remove')).merge(source2.map(label('add')))

但实际上,随心所欲,如何做到这一点在我看来是一个相当小的问题。

您可以在合并之前操作精确的流,以避免不必要的 ID 处理。一般来说,对于流和函数式编程,尽量避免 if-else 结构,因为有一些工具可以帮助你解决问题。通常你可以通过查看调用的位置并更早地采取行动来避免它。

这是一个例子。列表操作只是为了让最重要的部分可见。

let listSubject: Rx.Subject<Item[]> = new Rx.BehaviourSubject<Item[]>([]);
let addObs: Rx.Observable<Item[]> = initAddObs();
let removeObs: Rx.Observable<Item[]> = initRemoveObs();

Rx.Observable
  .merge([
    addObs.map(items => (list: Item[]) => list.concat(items)),
    removeObs.map(items => (list: Item[]) => list.filter(items))
  ])
  .withLatestFrom(listSubject, (operation, list) => operation(list))
  .subscribe(listSubject)

该列表使用了 BehaviourSubject,因此无论何时订阅,任何人都可以获得最新情况。 AddObs 和 removeObs 流映射到操作,它们具有相同的签名并且可以合并。在对当前列表执行操作后,我们必须将结果回收回 listSubject。现在您可以订阅 listSubject,您将获得从那里发出的最新列表。