Kefir/Bacon.js 中的流处理
Stream processing in Kefir/Bacon.js
我一直在从事一个涉及使用 Kefir(或 Bacon.js,选择你最喜欢的)实时数据的个人项目,并且已经到了需要将数据记录到数据库中以附加的地步id,然后将带有id的对象传递到链下。实际上将数据插入数据库 (NeDB) 不是问题,问题在于它使用回调并在将记录插入数据库时继续执行以及如何解决此行为。
过度简化的示例:
假设我们有多个设备将解析后的数据转储到 bus/pool:
function Position(data) {
this.id = null;
this.longitude = data.longitude;
this.latitude = data.latitude;
}
self.positionDataPool.map(function(position)) { // is this even what really needs to be done?
// unsure what to do here {
self.db.insert {
longitude: position.longitude
, latitude: position.latitude
}, function(e, newRecord) {
if(e) { ... }
, else {
position.id = newRecord._id;
return position;
}
}
//}
})
.filter(function(position) {
// the position without an id is passed here
...
});
我怀疑这是对地图功能的不正确或不恰当的使用,但在尝试了几种方法后我没有想法。任何想法、建议或帮助将不胜感激。
我的解决方案
在进行了大量阅读和实验(在我已经做过的事情之上)并回到我每天使用流处理的日子之后,我想出了以下解决方案。虽然这可能不是最有效的,但此解决方案通过将多个事件源插入数据池(未显示)来从多个源获取输入。创建了一个全新的流来对 object/data 执行单个操作。虽然可扩展性不是这里的 objective,但这允许多个源监视流中的数据,而不是直接将其转储到过滤器中。最后,来自处理流的数据被过滤以仅显示我们想要的结果。
self.savedPositionDataStream = Kefir.stream(function(emitter) {
self.positionDataPool.onValue(function(val) {
self.db.insert {
longitude: position.longitude
, latitude: position.latitude
}, function(e, newRecord) {
val.id = newRecord._id;
emitter.emit(val);
}
});
});
self.filteredPositionData = savedPositionDataStream.filter(...);
至少在 Bacon.js 中,您可以使用 Bacon.fromNodeCallback
将插入调用结果包装到流中。喜欢
Bacon.fromNodeCallback(self.db, "insert", dataToBeInserted)
您当然可以使用 Bacon.fromBinder
或类似的 Kefir.stream
来执行此操作,但是 fromNodeCallback
帮助程序使这更容易,因为它会自动处理 success/error 值并将它们转换为适当地流式传输事件。
然后 flatMap
而不是 map
,进行插入并以流形式提供结果:
let insertionResultE = self.positionDataPool.flatMap(val =>
Bacon.fromNodeCallback(self.db, "insert", val).map("._id")
)
insertionResultE.log("insertion result")
类似的方法也适用于开菲尔。关键是你不能在 map
中进行异步和可能失败的计算,但你可以在 flapMap
.
中进行
这里唯一的问题是您需要为 insertionResultE
添加至少一个订阅者才能激活它。在上面的例子中,log
做到了。
我一直在从事一个涉及使用 Kefir(或 Bacon.js,选择你最喜欢的)实时数据的个人项目,并且已经到了需要将数据记录到数据库中以附加的地步id,然后将带有id的对象传递到链下。实际上将数据插入数据库 (NeDB) 不是问题,问题在于它使用回调并在将记录插入数据库时继续执行以及如何解决此行为。
过度简化的示例:
假设我们有多个设备将解析后的数据转储到 bus/pool:
function Position(data) {
this.id = null;
this.longitude = data.longitude;
this.latitude = data.latitude;
}
self.positionDataPool.map(function(position)) { // is this even what really needs to be done?
// unsure what to do here {
self.db.insert {
longitude: position.longitude
, latitude: position.latitude
}, function(e, newRecord) {
if(e) { ... }
, else {
position.id = newRecord._id;
return position;
}
}
//}
})
.filter(function(position) {
// the position without an id is passed here
...
});
我怀疑这是对地图功能的不正确或不恰当的使用,但在尝试了几种方法后我没有想法。任何想法、建议或帮助将不胜感激。
我的解决方案
在进行了大量阅读和实验(在我已经做过的事情之上)并回到我每天使用流处理的日子之后,我想出了以下解决方案。虽然这可能不是最有效的,但此解决方案通过将多个事件源插入数据池(未显示)来从多个源获取输入。创建了一个全新的流来对 object/data 执行单个操作。虽然可扩展性不是这里的 objective,但这允许多个源监视流中的数据,而不是直接将其转储到过滤器中。最后,来自处理流的数据被过滤以仅显示我们想要的结果。
self.savedPositionDataStream = Kefir.stream(function(emitter) {
self.positionDataPool.onValue(function(val) {
self.db.insert {
longitude: position.longitude
, latitude: position.latitude
}, function(e, newRecord) {
val.id = newRecord._id;
emitter.emit(val);
}
});
});
self.filteredPositionData = savedPositionDataStream.filter(...);
至少在 Bacon.js 中,您可以使用 Bacon.fromNodeCallback
将插入调用结果包装到流中。喜欢
Bacon.fromNodeCallback(self.db, "insert", dataToBeInserted)
您当然可以使用 Bacon.fromBinder
或类似的 Kefir.stream
来执行此操作,但是 fromNodeCallback
帮助程序使这更容易,因为它会自动处理 success/error 值并将它们转换为适当地流式传输事件。
然后 flatMap
而不是 map
,进行插入并以流形式提供结果:
let insertionResultE = self.positionDataPool.flatMap(val =>
Bacon.fromNodeCallback(self.db, "insert", val).map("._id")
)
insertionResultE.log("insertion result")
类似的方法也适用于开菲尔。关键是你不能在 map
中进行异步和可能失败的计算,但你可以在 flapMap
.
这里唯一的问题是您需要为 insertionResultE
添加至少一个订阅者才能激活它。在上面的例子中,log
做到了。