在高地流中执行 node-mysql 查询
Perform node-mysql queries in a highland stream
我必须从输入文件中提取行,转换它们并将它们放入输出文件中。
由于输入文件很大,我使用 HighlandJS 对其进行了流式处理。
转换步骤包括 MySQL 数据库中的异步查询(通过节点-mysql)和
我找不到如何管理流中的异步查询。我的不同尝试给出了错误或没有。
我最后的尝试是:
h(inputStream)
.split()
.through(JSONStream.parse())
.map(function (data) {
h.wrapCallback(pool.query(data, function (err, rows) {
return rows;
}));
})
.pipe(outputStream);
关于如何做到这一点的任何提示?
谢谢。
简短回答:您的 map
转换必须 return 某些东西。现在 return 没什么。
长答案:
好的,为了这个答案的目的,我将稍微简化您的逻辑。假设我们想要这个。
input -> map to rows -> output
问题是映射是异步的,正如您指出的那样,map
函数必须 return 某些东西。在这种情况下,您只需 return 输入中每个元素的流。所以它看起来像这样。
// input -> map to a stream of streams of rows -> output
h(input).map(h.wrapCallback(pool.query)).pipe(output);
最后一个问题实际上是获取行而不是流的流。您可以使用 flatMap
转换来执行此操作,该转换会将 "flatten" 流的流变为 "normal" 流。
// input -> map to a stream of rows -> output
h(input).flatMap(h.wrapCallback(pool.query)).pipe(output);
我必须从输入文件中提取行,转换它们并将它们放入输出文件中。
由于输入文件很大,我使用 HighlandJS 对其进行了流式处理。
转换步骤包括 MySQL 数据库中的异步查询(通过节点-mysql)和 我找不到如何管理流中的异步查询。我的不同尝试给出了错误或没有。
我最后的尝试是:
h(inputStream)
.split()
.through(JSONStream.parse())
.map(function (data) {
h.wrapCallback(pool.query(data, function (err, rows) {
return rows;
}));
})
.pipe(outputStream);
关于如何做到这一点的任何提示?
谢谢。
简短回答:您的 map
转换必须 return 某些东西。现在 return 没什么。
长答案:
好的,为了这个答案的目的,我将稍微简化您的逻辑。假设我们想要这个。
input -> map to rows -> output
问题是映射是异步的,正如您指出的那样,map
函数必须 return 某些东西。在这种情况下,您只需 return 输入中每个元素的流。所以它看起来像这样。
// input -> map to a stream of streams of rows -> output
h(input).map(h.wrapCallback(pool.query)).pipe(output);
最后一个问题实际上是获取行而不是流的流。您可以使用 flatMap
转换来执行此操作,该转换会将 "flatten" 流的流变为 "normal" 流。
// input -> map to a stream of rows -> output
h(input).flatMap(h.wrapCallback(pool.query)).pipe(output);