为什么在执行连接后不调用 complete?
Why complete is not invoked after performing a concatenation?
我需要多次查询一个设备。每个查询都需要是异步的,并且设备不支持一次同时查询。
而且,一旦被查询过,就不能马上再次查询到。至少需要暂停1秒才能正常工作。
我的两个查询,由 saveClock()
和 saveConfig()
执行,return 一个 Promise,并且都按预期通过 returning undefined 解决。
在下面的代码中,为什么删除 take()
会阻止调用 toArray()
?
这是怎么回事,是否有更好的方法来实现相同的行为?
export const saveEpic = (action$, store) =>
action$.ofType(SAVE)
.map(action => {
// access store and create object data
// ...
return data;
})
.mergeMap(data =>
Rx.Observable.from([
Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)),
Rx.Observable.timer(1000),
Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config)),
Rx.Observable.of(data.id)
])
)
.concatAll()
.take(4)
.toArray()
// [undefined, 0, undefined, "id"]
.map(x => { type: COMPLETED, id: x[3] });
我看到了几件事:
您的最终 .map()
缺少括号,在当前形式中这是语法错误,但细微的更改可能会使其意外成为 labeled statement 而不是返回 object。因为在当前形式下它是一个语法错误,我想这只是 post 中的一个错误,而不是您的代码中的错误(甚至 运行),但请仔细检查!
// before
.map(x => { type: COMPLETED, id: x[3] });
// after
.map(x => ({ type: COMPLETED, id: x[3] }));
修复后,该示例 运行 带有一个简单的 redux-observable 测试用例:http://jsbin.com/hunale/edit?js,output 所以如果没有什么值得注意的,我做的与你不同,问题 似乎 未提供代码。请随意添加更多见解甚至更好,为我们在 JSBin/git 存储库中重现它。
你没有提到但非常非常值得注意的一件事是,在 redux-observable 中,你的 epics 通常是 long-lived "process managers"。这个史诗实际上只会处理其中一个保存,然后是 complete(),这可能不是您真正想要的?用户能否在每次应用程序启动时只保存一次?似乎不太可能。
相反,您需要通过将此逻辑封装在 mergeMap
中,让 top-level 流式传输您的史诗 returns 并监听未来的动作。 take(4)
和传递 data.id
然后变得无关紧要:
const saveEpic = (action$, store) =>
action$.ofType(SAVE)
.mergeMap(data =>
Rx.Observable.from([
Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)),
Rx.Observable.timer(1000),
Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config))
])
.concatAll()
.toArray()
.map(() => ({ type: COMPLETED, id: data.id }))
);
Ben Lesh 在他最近的 AngularConnect 演讲中描述了这种流分离,在错误的上下文中,但它仍然适用:https://youtu.be/3LKMwkuK0ZE?t=20m(别担心,这不是Angular具体!)
接下来,我想分享一些未经请求的重构建议,这些建议可能会让您的生活更轻松,但当然这是自以为是,所以请随意忽略:
我会重构以更准确地直观地反映事件的顺序,并降低复杂性:
const saveEpic = (action$, store) =>
action$.ofType(SAVE)
.mergeMap(data =>
Rx.Observable.from(saveClock(data.id, data.clock))
.delay(1000)
.mergeMap(() => saveConfig(data.id, data.config))
.map(() => ({ type: COMPLETED, id: data.id }))
);
这里我们使用 saveClock
返回的 Promise,将其输出延迟 1000 毫秒,mergeMapping 将结果调用 saveConfig()
,这也是 returns 一个 Promise被消耗掉。然后最终将结果映射到我们的 COMPLETE
操作。
最后,请记住,如果您的史诗 确实 存活并且长寿,则此史诗 as-is 中没有任何内容可以阻止它接收多个保存请求而其他的仍然 in-flight 或尚未耗尽请求之间所需的 1000 毫秒延迟。也就是说,如果确实需要 any 请求之间的 1000 毫秒 space,您的史诗本身并不能完全阻止您的 UI 代码破坏它。在这种情况下,您可能需要考虑添加更复杂的缓冲 backpressure 机制,例如将 .zip()
运算符与 BehaviorSubject
.
一起使用
http://jsbin.com/waqipol/edit?js,output
const saveEpic = (action$, store) => {
// used to control how many we want to take,
// the rest will be buffered by .zip()
const requestCount$ = new Rx.BehaviorSubject(1)
.mergeMap(count => new Array(count));
return action$.ofType(SAVE)
.zip(requestCount$, action => action)
.mergeMap(data =>
Rx.Observable.from(saveClock(data.id, data.clock))
.delay(1000)
.mergeMap(() => saveConfig(data.id, data.config))
.map(() => ({ type: COMPLETED, id: data.id }))
// we're ready to take the next one, when available
.do(() => requestCount$.next(1))
);
};
这样一来,当我们仍在处理现有请求时,保存请求就会被缓冲,我们一次只处理其中一个请求。请记住,尽管这是一个无界缓冲区——这意味着未决操作队列的增长速度可能无限快于缓冲区的刷新速度。这是不可避免的,除非您采用有损背压策略,例如丢弃重叠的请求等。
如果您有其他 epics 对每秒发送请求不超过一次有重叠的要求,您将需要创建某种单一主管来保证所有 epics .
这一切看起来都非常复杂,但也许具有讽刺意味的是,与传统的命令式代码相比,在 RxJS 中 更容易做到这一点。最难的部分实际上是了解模式。
我需要多次查询一个设备。每个查询都需要是异步的,并且设备不支持一次同时查询。 而且,一旦被查询过,就不能马上再次查询到。至少需要暂停1秒才能正常工作。
我的两个查询,由 saveClock()
和 saveConfig()
执行,return 一个 Promise,并且都按预期通过 returning undefined 解决。
在下面的代码中,为什么删除 take()
会阻止调用 toArray()
?
这是怎么回事,是否有更好的方法来实现相同的行为?
export const saveEpic = (action$, store) =>
action$.ofType(SAVE)
.map(action => {
// access store and create object data
// ...
return data;
})
.mergeMap(data =>
Rx.Observable.from([
Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)),
Rx.Observable.timer(1000),
Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config)),
Rx.Observable.of(data.id)
])
)
.concatAll()
.take(4)
.toArray()
// [undefined, 0, undefined, "id"]
.map(x => { type: COMPLETED, id: x[3] });
我看到了几件事:
您的最终 .map()
缺少括号,在当前形式中这是语法错误,但细微的更改可能会使其意外成为 labeled statement 而不是返回 object。因为在当前形式下它是一个语法错误,我想这只是 post 中的一个错误,而不是您的代码中的错误(甚至 运行),但请仔细检查!
// before
.map(x => { type: COMPLETED, id: x[3] });
// after
.map(x => ({ type: COMPLETED, id: x[3] }));
修复后,该示例 运行 带有一个简单的 redux-observable 测试用例:http://jsbin.com/hunale/edit?js,output 所以如果没有什么值得注意的,我做的与你不同,问题 似乎 未提供代码。请随意添加更多见解甚至更好,为我们在 JSBin/git 存储库中重现它。
你没有提到但非常非常值得注意的一件事是,在 redux-observable 中,你的 epics 通常是 long-lived "process managers"。这个史诗实际上只会处理其中一个保存,然后是 complete(),这可能不是您真正想要的?用户能否在每次应用程序启动时只保存一次?似乎不太可能。
相反,您需要通过将此逻辑封装在 mergeMap
中,让 top-level 流式传输您的史诗 returns 并监听未来的动作。 take(4)
和传递 data.id
然后变得无关紧要:
const saveEpic = (action$, store) =>
action$.ofType(SAVE)
.mergeMap(data =>
Rx.Observable.from([
Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)),
Rx.Observable.timer(1000),
Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config))
])
.concatAll()
.toArray()
.map(() => ({ type: COMPLETED, id: data.id }))
);
Ben Lesh 在他最近的 AngularConnect 演讲中描述了这种流分离,在错误的上下文中,但它仍然适用:https://youtu.be/3LKMwkuK0ZE?t=20m(别担心,这不是Angular具体!)
接下来,我想分享一些未经请求的重构建议,这些建议可能会让您的生活更轻松,但当然这是自以为是,所以请随意忽略:
我会重构以更准确地直观地反映事件的顺序,并降低复杂性:
const saveEpic = (action$, store) =>
action$.ofType(SAVE)
.mergeMap(data =>
Rx.Observable.from(saveClock(data.id, data.clock))
.delay(1000)
.mergeMap(() => saveConfig(data.id, data.config))
.map(() => ({ type: COMPLETED, id: data.id }))
);
这里我们使用 saveClock
返回的 Promise,将其输出延迟 1000 毫秒,mergeMapping 将结果调用 saveConfig()
,这也是 returns 一个 Promise被消耗掉。然后最终将结果映射到我们的 COMPLETE
操作。
最后,请记住,如果您的史诗 确实 存活并且长寿,则此史诗 as-is 中没有任何内容可以阻止它接收多个保存请求而其他的仍然 in-flight 或尚未耗尽请求之间所需的 1000 毫秒延迟。也就是说,如果确实需要 any 请求之间的 1000 毫秒 space,您的史诗本身并不能完全阻止您的 UI 代码破坏它。在这种情况下,您可能需要考虑添加更复杂的缓冲 backpressure 机制,例如将 .zip()
运算符与 BehaviorSubject
.
http://jsbin.com/waqipol/edit?js,output
const saveEpic = (action$, store) => {
// used to control how many we want to take,
// the rest will be buffered by .zip()
const requestCount$ = new Rx.BehaviorSubject(1)
.mergeMap(count => new Array(count));
return action$.ofType(SAVE)
.zip(requestCount$, action => action)
.mergeMap(data =>
Rx.Observable.from(saveClock(data.id, data.clock))
.delay(1000)
.mergeMap(() => saveConfig(data.id, data.config))
.map(() => ({ type: COMPLETED, id: data.id }))
// we're ready to take the next one, when available
.do(() => requestCount$.next(1))
);
};
这样一来,当我们仍在处理现有请求时,保存请求就会被缓冲,我们一次只处理其中一个请求。请记住,尽管这是一个无界缓冲区——这意味着未决操作队列的增长速度可能无限快于缓冲区的刷新速度。这是不可避免的,除非您采用有损背压策略,例如丢弃重叠的请求等。
如果您有其他 epics 对每秒发送请求不超过一次有重叠的要求,您将需要创建某种单一主管来保证所有 epics .
这一切看起来都非常复杂,但也许具有讽刺意味的是,与传统的命令式代码相比,在 RxJS 中 更容易做到这一点。最难的部分实际上是了解模式。