在消费流之前做一些事情,使用 highland.js
Do something before consuming stream, using highland.js
我正在尝试编写一个可写流,它接收对象流并将它们输入到 mongodb 数据库中。在使用对象流之前,我首先需要等待数据库连接建立,但我似乎做错了什么,因为程序永远不会到达插入部分。
// ./mongowriter.js
let mongo = mongodb.MongoClient,
connectToDb = _.wrapCallback(mongo.connect);
export default url => _.pipeline(s => {
return connectToDb(url).flatMap(db => {
console.log('Connection established!');
return s.flatMap(x => /* insert x into db */);
});
});
....
// Usage in other file
import mongowriter from './mongowriter.js';
let objStream = _([/* json objects */]);
objStream.pipe(mongoWriter);
程序直接退出,"Connection established!" 从未写入控制台。
我错过了什么?有什么我应该遵循的习语吗?
通过阅读源代码和一些常规实验,我弄清楚了如何执行单个异步操作,然后通过流继续处理。基本上,您 使用 flatMap 将来自异步任务的事件替换为您实际要处理的流。
另一个我没有预料到并且让我失望的怪癖是 _.pipeline
将无法工作,除非原始流在回调中被完全消耗掉。这就是为什么它不会简单地放入 _.map 和日志内容(这是我尝试调试它的方式)。相反,需要确保在末尾有一个 each
或 done
。下面是一个最小的例子:
export default _ => _.pipeline( stream => {
return _(promiseReturningFunction())
.tap(_ => process.stdout.write('.'))
.flatMap(_ => stream)
.each(_ => process.stdout.write('-'));
});
// Will produce something like the following when called with a non-empty stream.
// Note the lone '.' in the beginning.
// => .-------------------
基本上,一个“.”异步函数完成时输出,流中的每个对象都有一个“-”。
希望这可以节省一些时间。我花了很长时间才弄明白这一点。 ^^
我正在尝试编写一个可写流,它接收对象流并将它们输入到 mongodb 数据库中。在使用对象流之前,我首先需要等待数据库连接建立,但我似乎做错了什么,因为程序永远不会到达插入部分。
// ./mongowriter.js
let mongo = mongodb.MongoClient,
connectToDb = _.wrapCallback(mongo.connect);
export default url => _.pipeline(s => {
return connectToDb(url).flatMap(db => {
console.log('Connection established!');
return s.flatMap(x => /* insert x into db */);
});
});
....
// Usage in other file
import mongowriter from './mongowriter.js';
let objStream = _([/* json objects */]);
objStream.pipe(mongoWriter);
程序直接退出,"Connection established!" 从未写入控制台。
我错过了什么?有什么我应该遵循的习语吗?
通过阅读源代码和一些常规实验,我弄清楚了如何执行单个异步操作,然后通过流继续处理。基本上,您 使用 flatMap 将来自异步任务的事件替换为您实际要处理的流。
另一个我没有预料到并且让我失望的怪癖是 _.pipeline
将无法工作,除非原始流在回调中被完全消耗掉。这就是为什么它不会简单地放入 _.map 和日志内容(这是我尝试调试它的方式)。相反,需要确保在末尾有一个 each
或 done
。下面是一个最小的例子:
export default _ => _.pipeline( stream => {
return _(promiseReturningFunction())
.tap(_ => process.stdout.write('.'))
.flatMap(_ => stream)
.each(_ => process.stdout.write('-'));
});
// Will produce something like the following when called with a non-empty stream.
// Note the lone '.' in the beginning.
// => .-------------------
基本上,一个“.”异步函数完成时输出,流中的每个对象都有一个“-”。
希望这可以节省一些时间。我花了很长时间才弄明白这一点。 ^^