为什么在执行连接后不调用 complete?

Why complete is not invoked after performing a concatenation?

我需要多次查询一个设备。每个查询都需要是异步的,并且设备不支持一次同时查询。 而且,一旦被查询过,就不能马上再次查询到。至少需要暂停1秒才能正常工作。

我的两个查询,由 saveClock()saveConfig() 执行,return 一个 Promise,并且都按预期通过 returning undefined 解决。

在下面的代码中,为什么删除 take() 会阻止调用 toArray()
这是怎么回事,是否有更好的方法来实现相同的行为?

export const saveEpic = (action$, store) =>
  action$.ofType(SAVE)
    .map(action => {
      // access store and create object data
      // ...
      return data;
    })
    .mergeMap(data =>
      Rx.Observable.from([
        Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)),
        Rx.Observable.timer(1000),
        Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config)),
        Rx.Observable.of(data.id)
     ])
    )
    .concatAll()
    .take(4)
    .toArray()
    // [undefined, 0, undefined, "id"]
    .map(x => { type: COMPLETED, id: x[3] });

我看到了几件事:

您的最终 .map() 缺少括号,在当前形式中这是语法错误,但细微的更改可能会使其意外成为 labeled statement 而不是返回 object。因为在当前形式下它是一个语法错误,我想这只是 post 中的一个错误,而不是您的代码中的错误(甚至 运行),但请仔细检查!

// before
.map(x => { type: COMPLETED, id: x[3] });

// after
.map(x => ({ type: COMPLETED, id: x[3] }));

修复后,该示例 运行 带有一个简单的 redux-observable 测试用例:http://jsbin.com/hunale/edit?js,output 所以如果没有什么值得注意的,我做的与你不同,问题 似乎 未提供代码。请随意添加更多见解甚至更好,为我们在 JSBin/git 存储库中重现它。


你没有提到但非常非常值得注意的一件事是,在 redux-observable 中,你的 epics 通常是 long-lived "process managers"。这个史诗实际上只会处理其中一个保存,然后是 complete(),这可能不是您真正想要的?用户能否在每次应用程序启动时只保存一次?似乎不太可能。

相反,您需要通过将此逻辑封装在 mergeMap 中,让 top-level 流式传输您的史诗 returns 并监听未来的动作。 take(4) 和传递 data.id 然后变得无关紧要:

const saveEpic = (action$, store) =>
  action$.ofType(SAVE)
    .mergeMap(data =>
      Rx.Observable.from([
        Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)),
        Rx.Observable.timer(1000),
        Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config))
      ])
      .concatAll()
      .toArray()
      .map(() => ({ type: COMPLETED, id: data.id }))
    );

Ben Lesh 在他最近的 AngularConnect 演讲中描述了这种流分离,在错误的上下文中,但它仍然适用:https://youtu.be/3LKMwkuK0ZE?t=20m(别担心,这不是Angular具体!)

接下来,我想分享一些未经请求的重构建议,这些建议可能会让您的生活更轻松,但当然这是自以为是,所以请随意忽略:

我会重构以更准确地直观地反映事件的顺序,并降低复杂性:

const saveEpic = (action$, store) =>
  action$.ofType(SAVE)
    .mergeMap(data =>
      Rx.Observable.from(saveClock(data.id, data.clock))
        .delay(1000)
        .mergeMap(() => saveConfig(data.id, data.config))
        .map(() => ({ type: COMPLETED, id: data.id }))
    );

这里我们使用 saveClock 返回的 Promise,将其输出延迟 1000 毫秒,mergeMapping 将结果调用 saveConfig(),这也是 returns 一个 Promise被消耗掉。然后最终将结果映射到我们的 COMPLETE 操作。

最后,请记住,如果您的史诗 确实 存活并且长寿,则此史诗 as-is 中没有任何内容可以阻止它接收多个保存请求而其他的仍然 in-flight 或尚未耗尽请求之间所需的 1000 毫秒延迟。也就是说,如果确实需要 any 请求之间的 1000 毫秒 space,您的史诗本身并不能完全阻止您的 UI 代码破坏它。在这种情况下,您可能需要考虑添加更复杂的缓冲 backpressure 机制,例如将 .zip() 运算符与 BehaviorSubject.

一起使用

http://jsbin.com/waqipol/edit?js,output

const saveEpic = (action$, store) => {
  // used to control how many we want to take,
  // the rest will be buffered by .zip()
  const requestCount$ = new Rx.BehaviorSubject(1)
    .mergeMap(count => new Array(count));

  return action$.ofType(SAVE)
    .zip(requestCount$, action => action)
    .mergeMap(data =>
      Rx.Observable.from(saveClock(data.id, data.clock))
        .delay(1000)
        .mergeMap(() => saveConfig(data.id, data.config))
        .map(() => ({ type: COMPLETED, id: data.id }))
        // we're ready to take the next one, when available
        .do(() => requestCount$.next(1))
    );
};

这样一来,当我们仍在处理现有请求时,保存请求就会被缓冲,我们一次只处理其中一个请求。请记住,尽管这是一个无界缓冲区——这意味着未决操作队列的增长速度可能无限快于缓冲区的刷新速度。这是不可避免的,除非您采用有损背压策略,例如丢弃重叠的请求等。

如果您有其他 epics 对每秒发送请求不超过一次有重叠的要求,您将需要创建某种单一主管来保证所有 epics .

这一切看起来都非常复杂,但也许具有讽刺意味的是,与传统的命令式代码相比,在 RxJS 中 更容易做到这一点。最难的部分实际上是了解模式。