Nodejs 直通流

Nodejs PassThrough Stream

我想通过 net.Socket (TCP) 流传输 fs.Readstream。为此,我使用了 .pipe。 fs.Readstream 完成后,我不想结束 net.Socket 流。这就是为什么我使用

readStream.pipe(socket, { end: false })

不幸的是,我在另一边听不到 'close'、'finish' 或 'end'。这使我无法在另一侧关闭我的 fs.Writestream。但是,net.Socket 连接仍然存在,我也需要它,因为我想接收一个 ID 作为响应。 由于我没有得到相反的 'close' 或 'finish',不幸的是我无法结束 fs.Writestream 因此无法发送具有相应 ID[=12= 的响应]

有没有办法通过 net.socket 手动发送 'close' 或 'finish' 事件而不关闭它? 使用该命令,只有我自己的事件会做出反应。 谁能告诉我我做错了什么?

    var socket : net.Socket; //TCP connect
    var readStream = fs.createWriteStream('test.txt');

    socket.on('connect', () => {
        readStream.pipe(socket, {
            end: false
        })
        readStream.on('close', () => {
            socket.emit('close');
            socket.emit('finish');
        })

        //waiting for answer
        //waiting for answer
        //waiting for answer

        socket.on('data', (c) => {
            console.log('got my answer: ' + c.toString());
        })
    })
}

好吧,除了向另一方提供某种方式让另一方知道流已以编程方式结束之外,您对单个流没有太多可以做的。

当套接字发送一个 end 事件时,它实际上会刷新缓冲区,然后关闭 TCP 连接,然后在另一端在最后一个字节传递后转换为 finish。为了re-use连接你可以考虑这两个选项:

一:使用 HTTP keep-alive

正如您所想,您不是第一个遇到此问题的人。这实际上是很常见的事情,您已经了解了一些协议,例如 HTTP。这将引入少量开销,但仅在开始和结束流时 - 在您的情况下,这可能比其他选项更容易接受。

您可以简单地使用 HTTP 连接并通过 http 请求发送数据,而不是使用基本的 TCP 流,HTTP POST 请求就可以了,您的代码看起来没有任何不同,除了放弃{end: false}。套接字需要发送 headers,因此它的构造如下:

const socket : HTTP.ClientRequest = http.request({method: 'POST', url: '//wherever.org/somewhere/there:9087', headers: {
   'connection': 'keep-alive',
   'transfer-encoding': 'chunked'
}}, (res) => {
   // here you can call the code to push more streams since the 
});

readStream.pipe(socket); // so our socket (vel connection) will end, but the underlying channel will stay open.

您实际上不需要等待套接字连接,并像上面的示例一样直接通过管道传输流,但是如果您的连接失败,请检查它的行为方式。您等待 connect 事件也将起作用,因为 HTTP 请求 class 实现了所有 TCP 连接事件和方法(尽管它在签名上可能有一些细微差别)。

更多阅读:

哦还有一点警告 - TCP keep-alive 是不同的东西,所以不要在那里混淆。

二:使用一个"magic"结束包

在这种情况下,您要做的是发送一个简单的结束数据包,例如:套接字末尾的 \x00(一个 nul 字符)。这有一个主要缺点,因为您需要对流做一些事情以确保 nul 字符不会出现在那里 - 这将引入数据处理的开销(因此更多 CPU用法)。

为了这样做,您需要在将数据发送到套接字之前通过转换流推送数据 - 下面是一个示例,但它仅适用于字符串,因此请根据您的需要进行调整.

const zeroEncoder = new Transform({
  encoding: 'utf-8',
  transform(chunk, enc, cb) { cb(chunk.toString().replace('\x00', '\x00')); },
  flush: (cb) => cb('\x00')
});

// ... whereever you do the writing:

readStream
  .pipe(zeroEncoder)
  .on('unpipe', () => console.log('this will be your end marker to send in another stream'))
  .pipe(socket, {end: false})

那么另一边:

tcpStream.on('data', (chunk) => {
  if (chunk.toString().endsWith('\x00')) {
    output.end(decodeZeros(chunk));
    // and rotate output
  } else {
    output.write(decodeZeros(chunk));
  }
});

如您所见,这要复杂得多,这也只是一个示例 - 您可以通过使用 JSON、7 位传输编码或其他一些方式稍微简化它,但它会所有情况都需要一些技巧,最重要的是通读整个流并为其提供更多内存——所以我真的不推荐这种方法。如果你这样做了:

  • 确保 encode/decode 数据正确
  • 考虑是否可以找到不会出现在数据中的字节
  • 以上可能适用于字符串,但至少对缓冲区来说会很糟糕
  • 最后没有错误控制或流量控制 - 所以至少需要 pause/resume 逻辑。

希望对您有所帮助。