在消费流之前做一些事情,使用 highland.js

Do something before consuming stream, using highland.js

我正在尝试编写一个可写流,它接收对象流并将它们输入到 mongodb 数据库中。在使用对象流之前,我首先需要等待数据库连接建立,但我似乎做错了什么,因为程序永远不会到达插入部分。

// ./mongowriter.js

let mongo = mongodb.MongoClient,
    connectToDb = _.wrapCallback(mongo.connect);

export default url => _.pipeline(s => {
  return connectToDb(url).flatMap(db => {
    console.log('Connection established!');
    return s.flatMap(x => /* insert x into db */);
  });
});

....

// Usage in other file
import mongowriter from './mongowriter.js';

let objStream = _([/* json objects */]);

objStream.pipe(mongoWriter);

程序直接退出,"Connection established!" 从未写入控制台。

我错过了什么?有什么我应该遵循的习语吗?

通过阅读源代码和一些常规实验,我弄清楚了如何执行单个异步操作,然后通过流继续处理。基本上,您 使用 flatMap 将来自异步任务的事件替换为您实际要处理的流。

另一个我没有预料到并且让我失望的怪癖是 _.pipeline 将无法工作,除非原始流在回调中被完全消耗掉。这就是为什么它不会简单地放入 _.map 和日志内容(这是我尝试调试它的方式)。相反,需要确保在末尾有一个 eachdone。下面是一个最小的例子:

export default _ => _.pipeline( stream => {
  return _(promiseReturningFunction())
    .tap(_ => process.stdout.write('.'))
    .flatMap(_ => stream)
    .each(_ => process.stdout.write('-'));
});

// Will produce something like the following when called with a non-empty stream.
// Note the lone '.' in the beginning.
// => .-------------------

基本上,一个“.”异步函数完成时输出,流中的每个对象都有一个“-”。

希望这可以节省一些时间。我花了很长时间才弄明白这一点。 ^^