Kefir/Bacon.js 中的流处理

Stream processing in Kefir/Bacon.js

我一直在从事一个涉及使用 Kefir(或 Bacon.js,选择你最喜欢的)实时数据的个人项目,并且已经到了需要将数据记录到数据库中以附加的地步id,然后将带有id的对象传递到链下。实际上将数据插入数据库 (NeDB) 不是问题,问题在于它使用回调并在将记录插入数据库时​​继续执行以及如何解决此行为。

过度简化的示例:

假设我们有多个设备将解析后的数据转储到 bus/pool:

function Position(data) {
    this.id = null;
    this.longitude = data.longitude;
    this.latitude = data.latitude;
}

self.positionDataPool.map(function(position)) {  // is this even what really needs to be done?
    // unsure what to do here {
        self.db.insert {
            longitude: position.longitude
            , latitude: position.latitude
        }, function(e, newRecord) {
            if(e) { ... }
            , else {
                position.id = newRecord._id;
                return position;
            }
        }
    //}
})
.filter(function(position) {
    // the position without an id is passed here
    ...
});

我怀疑这是对地图功能的不正确或不恰当的使用,但在尝试了几种方法后我没有想法。任何想法、建议或帮助将不胜感激。

我的解决方案

在进行了大量阅读和实验(在我已经做过的事情之上)并回到我每天使用流处理的日子之后,我想出了以下解决方案。虽然这可能不是最有效的,但此解决方案通过将多个事件源插入数据池(未显示)来从多个源获取输入。创建了一个全新的流来对 object/data 执行单个操作。虽然可扩展性不是这里的 objective,但这允许多个源监视流中的数据,而不是直接将其转储到过滤器中。最后,来自处理流的数据被过滤以仅显示我们想要的结果。

self.savedPositionDataStream = Kefir.stream(function(emitter) {
    self.positionDataPool.onValue(function(val) {
        self.db.insert {
            longitude: position.longitude
            , latitude: position.latitude
        }, function(e, newRecord) {
            val.id = newRecord._id;
            emitter.emit(val);
        }
    });
});

self.filteredPositionData = savedPositionDataStream.filter(...);

至少在 Bacon.js 中,您可以使用 Bacon.fromNodeCallback 将插入调用结果包装到流中。喜欢

Bacon.fromNodeCallback(self.db, "insert", dataToBeInserted)

您当然可以使用 Bacon.fromBinder 或类似的 Kefir.stream 来执行此操作,但是 fromNodeCallback 帮助程序使这更容易,因为它会自动处理 success/error 值并将它们转换为适当地流式传输事件。

然后 flatMap 而不是 map,进行插入并以流形式提供结果:

let insertionResultE = self.positionDataPool.flatMap(val => 
  Bacon.fromNodeCallback(self.db, "insert", val).map("._id")
)
insertionResultE.log("insertion result")

类似的方法也适用于开菲尔。关键是你不能在 map 中进行异步和可能失败的计算,但你可以在 flapMap.

中进行

这里唯一的问题是您需要为 insertionResultE 添加至少一个订阅者才能激活它。在上面的例子中,log 做到了。