Nodejs 直通流
Nodejs PassThrough Stream
我想通过 net.Socket (TCP) 流传输 fs.Readstream。为此,我使用了 .pipe。
fs.Readstream 完成后,我不想结束 net.Socket 流。这就是为什么我使用
readStream.pipe(socket, {
end: false
})
不幸的是,我在另一边听不到 'close'、'finish' 或 'end'。这使我无法在另一侧关闭我的 fs.Writestream。但是,net.Socket 连接仍然存在,我也需要它,因为我想接收一个 ID 作为响应。
由于我没有得到相反的 'close' 或 'finish',不幸的是我无法结束 fs.Writestream 因此无法发送具有相应 ID[=12= 的响应]
有没有办法通过 net.socket 手动发送 'close' 或 'finish' 事件而不关闭它?
使用该命令,只有我自己的事件会做出反应。
谁能告诉我我做错了什么?
var socket : net.Socket; //TCP connect
var readStream = fs.createWriteStream('test.txt');
socket.on('connect', () => {
readStream.pipe(socket, {
end: false
})
readStream.on('close', () => {
socket.emit('close');
socket.emit('finish');
})
//waiting for answer
//waiting for answer
//waiting for answer
socket.on('data', (c) => {
console.log('got my answer: ' + c.toString());
})
})
}
好吧,除了向另一方提供某种方式让另一方知道流已以编程方式结束之外,您对单个流没有太多可以做的。
当套接字发送一个 end
事件时,它实际上会刷新缓冲区,然后关闭 TCP 连接,然后在另一端在最后一个字节传递后转换为 finish
。为了re-use连接你可以考虑这两个选项:
一:使用 HTTP keep-alive
正如您所想,您不是第一个遇到此问题的人。这实际上是很常见的事情,您已经了解了一些协议,例如 HTTP。这将引入少量开销,但仅在开始和结束流时 - 在您的情况下,这可能比其他选项更容易接受。
您可以简单地使用 HTTP 连接并通过 http 请求发送数据,而不是使用基本的 TCP 流,HTTP POST
请求就可以了,您的代码看起来没有任何不同,除了放弃{end: false}
。套接字需要发送 headers,因此它的构造如下:
const socket : HTTP.ClientRequest = http.request({method: 'POST', url: '//wherever.org/somewhere/there:9087', headers: {
'connection': 'keep-alive',
'transfer-encoding': 'chunked'
}}, (res) => {
// here you can call the code to push more streams since the
});
readStream.pipe(socket); // so our socket (vel connection) will end, but the underlying channel will stay open.
您实际上不需要等待套接字连接,并像上面的示例一样直接通过管道传输流,但是如果您的连接失败,请检查它的行为方式。您等待 connect
事件也将起作用,因为 HTTP 请求 class 实现了所有 TCP 连接事件和方法(尽管它在签名上可能有一些细微差别)。
更多阅读:
- Wikipedia article of keep-alive - 很好地解释了它是如何工作的
- Node.js http.Agent options - 您可以控制拥有的连接数,更重要的是设置默认的保持活动状态。
哦还有一点警告 - TCP keep-alive 是不同的东西,所以不要在那里混淆。
二:使用一个"magic"结束包
在这种情况下,您要做的是发送一个简单的结束数据包,例如:套接字末尾的 \x00
(一个 nul
字符)。这有一个主要缺点,因为您需要对流做一些事情以确保 nul
字符不会出现在那里 - 这将引入数据处理的开销(因此更多 CPU用法)。
为了这样做,您需要在将数据发送到套接字之前通过转换流推送数据 - 下面是一个示例,但它仅适用于字符串,因此请根据您的需要进行调整.
const zeroEncoder = new Transform({
encoding: 'utf-8',
transform(chunk, enc, cb) { cb(chunk.toString().replace('\x00', '\x00')); },
flush: (cb) => cb('\x00')
});
// ... whereever you do the writing:
readStream
.pipe(zeroEncoder)
.on('unpipe', () => console.log('this will be your end marker to send in another stream'))
.pipe(socket, {end: false})
那么另一边:
tcpStream.on('data', (chunk) => {
if (chunk.toString().endsWith('\x00')) {
output.end(decodeZeros(chunk));
// and rotate output
} else {
output.write(decodeZeros(chunk));
}
});
如您所见,这要复杂得多,这也只是一个示例 - 您可以通过使用 JSON、7 位传输编码或其他一些方式稍微简化它,但它会所有情况都需要一些技巧,最重要的是通读整个流并为其提供更多内存——所以我真的不推荐这种方法。如果你这样做了:
- 确保 encode/decode 数据正确
- 考虑是否可以找到不会出现在数据中的字节
- 以上可能适用于字符串,但至少对缓冲区来说会很糟糕
- 最后没有错误控制或流量控制 - 所以至少需要
pause
/resume
逻辑。
希望对您有所帮助。
我想通过 net.Socket (TCP) 流传输 fs.Readstream。为此,我使用了 .pipe。 fs.Readstream 完成后,我不想结束 net.Socket 流。这就是为什么我使用
readStream.pipe(socket, {
end: false
})
不幸的是,我在另一边听不到 'close'、'finish' 或 'end'。这使我无法在另一侧关闭我的 fs.Writestream。但是,net.Socket 连接仍然存在,我也需要它,因为我想接收一个 ID 作为响应。 由于我没有得到相反的 'close' 或 'finish',不幸的是我无法结束 fs.Writestream 因此无法发送具有相应 ID[=12= 的响应]
有没有办法通过 net.socket 手动发送 'close' 或 'finish' 事件而不关闭它? 使用该命令,只有我自己的事件会做出反应。 谁能告诉我我做错了什么?
var socket : net.Socket; //TCP connect
var readStream = fs.createWriteStream('test.txt');
socket.on('connect', () => {
readStream.pipe(socket, {
end: false
})
readStream.on('close', () => {
socket.emit('close');
socket.emit('finish');
})
//waiting for answer
//waiting for answer
//waiting for answer
socket.on('data', (c) => {
console.log('got my answer: ' + c.toString());
})
})
}
好吧,除了向另一方提供某种方式让另一方知道流已以编程方式结束之外,您对单个流没有太多可以做的。
当套接字发送一个 end
事件时,它实际上会刷新缓冲区,然后关闭 TCP 连接,然后在另一端在最后一个字节传递后转换为 finish
。为了re-use连接你可以考虑这两个选项:
一:使用 HTTP keep-alive
正如您所想,您不是第一个遇到此问题的人。这实际上是很常见的事情,您已经了解了一些协议,例如 HTTP。这将引入少量开销,但仅在开始和结束流时 - 在您的情况下,这可能比其他选项更容易接受。
您可以简单地使用 HTTP 连接并通过 http 请求发送数据,而不是使用基本的 TCP 流,HTTP POST
请求就可以了,您的代码看起来没有任何不同,除了放弃{end: false}
。套接字需要发送 headers,因此它的构造如下:
const socket : HTTP.ClientRequest = http.request({method: 'POST', url: '//wherever.org/somewhere/there:9087', headers: {
'connection': 'keep-alive',
'transfer-encoding': 'chunked'
}}, (res) => {
// here you can call the code to push more streams since the
});
readStream.pipe(socket); // so our socket (vel connection) will end, but the underlying channel will stay open.
您实际上不需要等待套接字连接,并像上面的示例一样直接通过管道传输流,但是如果您的连接失败,请检查它的行为方式。您等待 connect
事件也将起作用,因为 HTTP 请求 class 实现了所有 TCP 连接事件和方法(尽管它在签名上可能有一些细微差别)。
更多阅读:
- Wikipedia article of keep-alive - 很好地解释了它是如何工作的
- Node.js http.Agent options - 您可以控制拥有的连接数,更重要的是设置默认的保持活动状态。
哦还有一点警告 - TCP keep-alive 是不同的东西,所以不要在那里混淆。
二:使用一个"magic"结束包
在这种情况下,您要做的是发送一个简单的结束数据包,例如:套接字末尾的 \x00
(一个 nul
字符)。这有一个主要缺点,因为您需要对流做一些事情以确保 nul
字符不会出现在那里 - 这将引入数据处理的开销(因此更多 CPU用法)。
为了这样做,您需要在将数据发送到套接字之前通过转换流推送数据 - 下面是一个示例,但它仅适用于字符串,因此请根据您的需要进行调整.
const zeroEncoder = new Transform({
encoding: 'utf-8',
transform(chunk, enc, cb) { cb(chunk.toString().replace('\x00', '\x00')); },
flush: (cb) => cb('\x00')
});
// ... whereever you do the writing:
readStream
.pipe(zeroEncoder)
.on('unpipe', () => console.log('this will be your end marker to send in another stream'))
.pipe(socket, {end: false})
那么另一边:
tcpStream.on('data', (chunk) => {
if (chunk.toString().endsWith('\x00')) {
output.end(decodeZeros(chunk));
// and rotate output
} else {
output.write(decodeZeros(chunk));
}
});
如您所见,这要复杂得多,这也只是一个示例 - 您可以通过使用 JSON、7 位传输编码或其他一些方式稍微简化它,但它会所有情况都需要一些技巧,最重要的是通读整个流并为其提供更多内存——所以我真的不推荐这种方法。如果你这样做了:
- 确保 encode/decode 数据正确
- 考虑是否可以找到不会出现在数据中的字节
- 以上可能适用于字符串,但至少对缓冲区来说会很糟糕
- 最后没有错误控制或流量控制 - 所以至少需要
pause
/resume
逻辑。
希望对您有所帮助。