XStreams 中忽略的第二个合并分支

second merged branch ignored in XStreams

我正在使用 storage and state 插件开发 Cycle JS 应用程序。

该应用使用 xstream 作为响应式库。

我陷入了一种奇怪的行为。

我的流如下图

在第一次会话 var 更新时,我得到了这个调试结果:

问题: "Debug 2" 分支未执行

如果我更新会话项(以便产生新的存储事件),那么两个分支都会按预期执行

如果在 "debug 0" 添加 .remember() 也会发生同样的好行为

更奇怪的是,如果我移除过滤器,流程会按预期工作

没有过滤器(也没有记住)从第一个事件开始流程就给出了这个结果

我怀疑在附加第二个分支之前在 "debug 0" 观察到了一些东西,因此第一个事件已经被消耗掉了。 但是,如果两个分支 xs.merged 在一起,怎么会发生这种情况呢?如何一个分支执行而第二个分支不执行?这两个分支没有过滤器或其他处理,它们只是映射到一个 reducer 函数。 关于如何调试和解决这种情况的任何建议?

问题的发生是因为 storage.session.getItem 在订阅 后立即发出 ,并且 merge(a$, b$) 在订阅 [=] 之前订阅了 a$ 13=],所以 a$ 获得了事件,但是当 b$ 被订阅时,已经来不及了。这个问题是众所周知的,被称为故障(在反应式编程中),通常发生在有菱形流图时,它是正是你的情况。

我有两篇关于故障的博文,可以为您提供更多背景信息:Primer on RxJS schedulers and Rx glitches aren't actually a problem。它提到了 RxJS,但 xstream 在实现方面与 RxJS 非常接近。 xstream 和 RxJS 的区别是 xstream 流总是多播("shared"),RxJS 有很多调度器类型,但是 xstream 只有一种。 RxJS 中的默认调度程序与 xstream 具有相同的行为。

解决方法是在到达钻石之前应用 .remember()。这是因为需要为该流的其他消费者缓存发出的值。 .remember() 只是将 Stream 转换为 MemoryStream。我认为源流最初是一个 MemoryStream,映射一个 MemoryStream 会创建其他 MemoryStreams,但是 filter 是一个破坏它的运算符。 filter 总是 returns 一个 Stream,原因是 MemoryStreams 应该总是有一个当前值,但是 filter 可以 删除 值,所以过滤后的流可能没有任何当前值。

作为 Cycle.js 的作者,我认为 cyclejs/storage 的设计方式并不是最好的,我认为我们会找到设计这些 API 的方法,从而最大程度地减少 MemoryStream 与 Stream 的混淆.但就目前而言,了解这两者之间的区别并规划您的应用程序以避免钻石(和故障)或在正确的位置使用 .remember() 非常重要。