在高地流中执行 node-mysql 查询

Perform node-mysql queries in a highland stream

我必须从输入文件中提取行,转换它们并将它们放入输出文件中。

由于输入文件很大,我使用 HighlandJS 对其进行了流式处理。

转换步骤包括 MySQL 数据库中的异步查询(通过节点-mysql)和 我找不到如何管理流中的异步查询。我的不同尝试给出了错误或没有。

我最后的尝试是:

h(inputStream)
.split()
.through(JSONStream.parse())
.map(function (data) {
    h.wrapCallback(pool.query(data, function (err, rows) {
        return rows;
    }));
})
.pipe(outputStream);

关于如何做到这一点的任何提示?

谢谢。

简短回答:您的 map 转换必须 return 某些东西。现在 return 没什么。

长答案:

好的,为了这个答案的目的,我将稍微简化您的逻辑。假设我们想要这个。

input -> map to rows -> output

问题是映射是异步的,正如您指出的那样,map 函数必须 return 某些东西。在这种情况下,您只需 return 输入中每个元素的流。所以它看起来像这样。

// input -> map to a stream of streams of rows -> output
h(input).map(h.wrapCallback(pool.query)).pipe(output);

最后一个问题实际上是获取行而不是流的流。您可以使用 flatMap 转换来执行此操作,该转换会将 "flatten" 流的流变为 "normal" 流。

// input -> map to a stream of rows -> output
h(input).flatMap(h.wrapCallback(pool.query)).pipe(output);