数据未被转换 Node.js 转换流
Data not being transformed Node.js Transform streams
我正在尝试创建一个转换流,它从 socket.io
获取数据,将其转换为 JSON,然后将其发送到标准输出。我完全不明白为什么数据似乎没有任何转换就直接通过了。我正在使用 through2
库。这是我的代码:
getStreamNames().then(streamNames => {
const socket = io(SOCKETIO_URL);
socket.on('connect', () => {
socket.emit('Subscribe', {subs: streamNames});
});
const stream = through2.obj(function (chunk, enc, callback) {
callback(null, parseString(chunk))
}).pipe(through2.obj(function (chunk, enc, callback) {
callback(null, JSON.stringify(chunk));
})).pipe(process.stdout);
socket.on('m', data => stream.write(data));
},
);
getStreamNames
returns 解析为流名称数组的承诺(我正在调用外部 socket.io
API)并且 parseString
需要从 API 返回的字符串并将其转换为 JSON 以便于管理。
我正在寻找的是我的控制台,在我使用 parseString
解析它之后打印出字符串化的 JSON,然后使用 JSON.stringify
使其成为标准输出。实际发生的是数据直接通过流并且没有进行任何转换。
供参考,来自 API 的数据采用奇怪的格式,类似于
field1~field2~0x23~fieldn
所以这就是我需要 parseString
方法的原因。
我一定是漏了什么。有什么想法吗?
编辑:
解析字符串:
function(value) {
var valuesArray = value.split("~");
var valuesArrayLenght = valuesArray.length;
var mask = valuesArray[valuesArrayLenght - 1];
var maskInt = parseInt(mask, 16);
var unpackedCurrent = {};
var currentField = 0;
for (var property in this.FIELDS) {
if (this.FIELDS[property] === 0) {
unpackedCurrent[property] = valuesArray[currentField];
currentField++;
}
else if (maskInt & this.FIELDS[property]) {
if (property === 'LASTMARKET') {
unpackedCurrent[property] = valuesArray[currentField];
}
else {
unpackedCurrent[property] = parseFloat(valuesArray[currentField]);
}
currentField++;
}
}
return unpackedCurrent;
};
谢谢
问题是您正在编写的流实际上是 process.stdout
,因为 .pipe
returns 最后一个 stream.Writable
,所以您可以继续链接,在你的情况,process.stdout
.
const x = stream.pipe(stream2).pipe(stream3).pipe(process.stdout);
x === process.stdout // true
所以你所做的只是:process.stdout.write(data)
没有通过管道。
您需要做的是将您的第一个 through2
流分配给 stream
变量,然后 .pipe
在该流上。
const stream = through2.obj((chunk, enc, callback) => {
callback(null, parseString(chunk))
});
stream
.pipe(through2.obj((chunk, enc, callback) => {
callback(null, JSON.stringify(chunk));
}))
.pipe(process.stdout);
socket.on('m', data => stream.write(data));
我正在尝试创建一个转换流,它从 socket.io
获取数据,将其转换为 JSON,然后将其发送到标准输出。我完全不明白为什么数据似乎没有任何转换就直接通过了。我正在使用 through2
库。这是我的代码:
getStreamNames().then(streamNames => {
const socket = io(SOCKETIO_URL);
socket.on('connect', () => {
socket.emit('Subscribe', {subs: streamNames});
});
const stream = through2.obj(function (chunk, enc, callback) {
callback(null, parseString(chunk))
}).pipe(through2.obj(function (chunk, enc, callback) {
callback(null, JSON.stringify(chunk));
})).pipe(process.stdout);
socket.on('m', data => stream.write(data));
},
);
getStreamNames
returns 解析为流名称数组的承诺(我正在调用外部 socket.io
API)并且 parseString
需要从 API 返回的字符串并将其转换为 JSON 以便于管理。
我正在寻找的是我的控制台,在我使用 parseString
解析它之后打印出字符串化的 JSON,然后使用 JSON.stringify
使其成为标准输出。实际发生的是数据直接通过流并且没有进行任何转换。
供参考,来自 API 的数据采用奇怪的格式,类似于
field1~field2~0x23~fieldn
所以这就是我需要 parseString
方法的原因。
我一定是漏了什么。有什么想法吗?
编辑:
解析字符串:
function(value) {
var valuesArray = value.split("~");
var valuesArrayLenght = valuesArray.length;
var mask = valuesArray[valuesArrayLenght - 1];
var maskInt = parseInt(mask, 16);
var unpackedCurrent = {};
var currentField = 0;
for (var property in this.FIELDS) {
if (this.FIELDS[property] === 0) {
unpackedCurrent[property] = valuesArray[currentField];
currentField++;
}
else if (maskInt & this.FIELDS[property]) {
if (property === 'LASTMARKET') {
unpackedCurrent[property] = valuesArray[currentField];
}
else {
unpackedCurrent[property] = parseFloat(valuesArray[currentField]);
}
currentField++;
}
}
return unpackedCurrent;
};
谢谢
问题是您正在编写的流实际上是 process.stdout
,因为 .pipe
returns 最后一个 stream.Writable
,所以您可以继续链接,在你的情况,process.stdout
.
const x = stream.pipe(stream2).pipe(stream3).pipe(process.stdout);
x === process.stdout // true
所以你所做的只是:process.stdout.write(data)
没有通过管道。
您需要做的是将您的第一个 through2
流分配给 stream
变量,然后 .pipe
在该流上。
const stream = through2.obj((chunk, enc, callback) => {
callback(null, parseString(chunk))
});
stream
.pipe(through2.obj((chunk, enc, callback) => {
callback(null, JSON.stringify(chunk));
}))
.pipe(process.stdout);
socket.on('m', data => stream.write(data));