Node.js:在可写流中的背压上写入了错误的额外字节
Node.js: Wrong extra bytes were wrote on Back pressure in writable stream
我用 Socket.io 进行了简单的二进制传输,以将文件从客户端传输到服务器。我认为它有效,但我意识到文件的大小不同。在 writableStream.write 失败时,我附加了 drain 事件处理程序以等待它可以被重写并继续写入,但是每次发生 drain 事件时,文件的大小都会增加 drain 事件触发的次数,每个 10240 字节大小我为每个块传输设置的。
在这里写代码之前,我需要解释一下代码流程:
- 客户端请求上传文件
- 服务器创建空文件(创建可写流)并授权传输
- 客户端传输数据(块)直到结束
- 服务器用可写流写入块
- 客户端在发送全部后结束传输
- 服务器关闭可写流。
- 完成!
这是服务器端代码:
var writeStream = null;
var fileSize = 0;
var wrote = 0;
socket.on('clientRequestFileTransfer', (fileInfo) => {
console.log(`Client request file transfer: ${fileInfo.name}(${fileInfo.size})`);
fileSize = fileInfo.size;
wrote = 0;
writeStream = fs.createWriteStream(__dirname + '/' + fileInfo.name);
writeStream.on('close', () => {
console.log('Write stream ended.');
});
console.log('File created.');
socket.emit('serverGrantFileTransfer');
});
socket.on('clientSentChunk', (chunk) => {
function write() {
let writeDone = writeStream.write(chunk);
if(!writeDone) {
console.log('Back pressure!');
return writeStream.once('drain', write);
}
else {
wrote += chunk.length;
console.log(`Wrote chunks: ${chunk.length} / ${wrote} / ${fileSize}`);
socket.emit('serverRequestContinue');
}
}
write();
});
socket.on('clientFinishTransmission', () => {
writeStream.end();
console.log('Transmission complete!');
});
它的客户端(添加了读取二进制文件的代码):
var fileEl = document.getElementById('file');
fileEl.onchange = function() {
var file = fileEl.files[0];
if(!file) return;
var socket = io('http://localhost:3000');
socket.on('connect', function() {
var fileReader = new FileReader();
fileReader.onloadend = function() {
var bin = fileReader.result;
var chunkSize = 10240;
var sent = 0;
// make server knows the name and size of the file
socket.once('serverGrantFileTransfer', () => {
function beginTransfer() {
if(sent >= bin.byteLength) {
console.log('Transmission complete!');
socket.emit('clientFinishTransmission');
return;
}
var chunk = bin.slice(sent, sent + chunkSize);
socket.once('serverRequestContinue', beginTransfer);
socket.emit('clientSentChunk', chunk);
sent += chunk.byteLength;
console.log('Sent: ' + sent);
}
beginTransfer();
});
socket.emit('clientRequestFileTransfer', {
name: file.name,
size: file.size
});
};
fileReader.readAsArrayBuffer(file);
});
};
我用 4,162,611 字节大小的文件测试了这段代码,它有 1 次写入失败(1 次背压)。上传后,我检查了创建文件的大小,它是4,172,851字节,比原来的大10240字节,是chunk(10240)的大小。
有时写入失败 2 次,大小比原来的大 20480 字节,是我发送的块大小的两倍。
我仔细检查了我的 Backpressure 代码,但对我来说似乎没什么问题。我正在使用 Node v6.2.2 并使用 Socket.io v1.6.0,从 Chrome 浏览器测试。我错过了什么吗?还是我误解了背压?
任何建议将不胜感激。
更新
看起来当背压发生时,它写了两次相同的数据(正如我在评论中所说)。所以我修改了这样的代码:
socket.on('clientSentChunk', (chunk) => {
function write() {
var writeDone = writeStream.write(chunk);
wrote += chunk.length;
if(!writeDone) {
console.log('**************** Back pressure ****************');
// writeStream.once('drain', write);
// no rewrite, just continue transmission
writeStream.once('drain', () => socket.emit('serverRequestContinue'));
}
else {
console.log(`Wrote chunks: ${chunk.length} / ${wrote} / ${fileSize}`);
socket.emit('serverRequestContinue');
}
}
write();
});
成功了。我很困惑,因为当可写流写入失败时,它不会将数据写入流中,但实际上不会。有人知道吗?
我想问题是你如何在 var bin = ...;
中创建 bin 对象。你能把代码放在这里吗?
更新
后台代码如下:
var express = require('express');
var app = express();
var server = app.listen(80);
var io = require('socket.io');
var fs = require('fs');
io = io.listen(server);
app.use(express.static(__dirname + '/public'));
app.use(function(req, res, next) {
//res.send({a:1})
res.sendFile(__dirname + '/socket_test_index.html');
});
io.on('connection', function(client) {
console.log('Client connected...', client);
client.on('join', function(data) {
//console.log(data);
});
setInterval(()=>{
client.emit('news', 'news from server');
}, 10000)
});
io.of('/upload', function(client){
console.log('upload')
logic(client);
})
function logic(socket) {
var writeStream = null;
var fileSize = 0;
var wrote = 0;
socket.on('clientRequestFileTransfer', (fileInfo) => {
console.log(`Client request file transfer: ${fileInfo.name}(${fileInfo.size})`);
fileSize = fileInfo.size;
wrote = 0;
writeStream = fs.createWriteStream(__dirname + '/' + fileInfo.name);
writeStream.on('close', () => {
console.log('Write stream ended.');
});
console.log('File created.');
socket.emit('serverGrantFileTransfer');
});
socket.on('clientSentChunk', (chunk) => {
function write() {
var writeDone = writeStream.write(chunk);
if(!writeDone) {
console.log('Back pressure!');
return writeStream.once('drain', write);
}
else {
wrote += chunk.length;
console.log(`Wrote chunks: ${chunk.length} / ${wrote} / ${fileSize}`);
socket.emit('serverRequestContinue');
}
}
write();
});
socket.on('clientFinishTransmission', () => {
writeStream.end();
console.log('Transmission complete!');
});
socket.emit('serverGrantFileTransfer', {});
}
这里是 html 代码:
<script src="/socket.io/socket.io.js"></script>
<script>
var socket = io('http://localhost');
socket.on('news', function (data) {
console.log(data);
});
socket.on('connect', function(data) {
socket.emit('join', 'Hello World from client');
});
</script>
<input type="file" id="file" />
<script>
var fileEl = document.getElementById('file');
fileEl.onchange = function() {
var file = fileEl.files[0];
if(!file) return;
var socket = io('http://localhost/upload');
socket.on('connect', function() {
var fileReader = new FileReader();
fileReader.onloadend = function() {
var bin = fileReader.result;
var chunkSize = 10240;
var sent = 0;
// make server knows the name and size of the file
socket.once('serverGrantFileTransfer', () => {
function beginTransfer() {
if(sent >= bin.byteLength) {
console.log('Transmission complete!');
socket.emit('clientFinishTransmission');
return;
}
var chunk = bin.slice(sent, sent + chunkSize);
socket.once('serverRequestContinue', beginTransfer);
socket.emit('clientSentChunk', chunk);
sent += chunk.byteLength;
console.log('Sent: ' + sent);
}
beginTransfer();
});
socket.emit('clientRequestFileTransfer', {
name: file.name,
size: file.size
});
};
fileReader.readAsArrayBuffer(file);
});
};
</script>
将它们复制到 express 项目的根目录。我用两张图片和一个 .pdf 文件进行了测试。他们都被传输完全相同的字节
我用 Socket.io 进行了简单的二进制传输,以将文件从客户端传输到服务器。我认为它有效,但我意识到文件的大小不同。在 writableStream.write 失败时,我附加了 drain 事件处理程序以等待它可以被重写并继续写入,但是每次发生 drain 事件时,文件的大小都会增加 drain 事件触发的次数,每个 10240 字节大小我为每个块传输设置的。
在这里写代码之前,我需要解释一下代码流程:
- 客户端请求上传文件
- 服务器创建空文件(创建可写流)并授权传输
- 客户端传输数据(块)直到结束
- 服务器用可写流写入块
- 客户端在发送全部后结束传输
- 服务器关闭可写流。
- 完成!
这是服务器端代码:
var writeStream = null;
var fileSize = 0;
var wrote = 0;
socket.on('clientRequestFileTransfer', (fileInfo) => {
console.log(`Client request file transfer: ${fileInfo.name}(${fileInfo.size})`);
fileSize = fileInfo.size;
wrote = 0;
writeStream = fs.createWriteStream(__dirname + '/' + fileInfo.name);
writeStream.on('close', () => {
console.log('Write stream ended.');
});
console.log('File created.');
socket.emit('serverGrantFileTransfer');
});
socket.on('clientSentChunk', (chunk) => {
function write() {
let writeDone = writeStream.write(chunk);
if(!writeDone) {
console.log('Back pressure!');
return writeStream.once('drain', write);
}
else {
wrote += chunk.length;
console.log(`Wrote chunks: ${chunk.length} / ${wrote} / ${fileSize}`);
socket.emit('serverRequestContinue');
}
}
write();
});
socket.on('clientFinishTransmission', () => {
writeStream.end();
console.log('Transmission complete!');
});
它的客户端(添加了读取二进制文件的代码):
var fileEl = document.getElementById('file');
fileEl.onchange = function() {
var file = fileEl.files[0];
if(!file) return;
var socket = io('http://localhost:3000');
socket.on('connect', function() {
var fileReader = new FileReader();
fileReader.onloadend = function() {
var bin = fileReader.result;
var chunkSize = 10240;
var sent = 0;
// make server knows the name and size of the file
socket.once('serverGrantFileTransfer', () => {
function beginTransfer() {
if(sent >= bin.byteLength) {
console.log('Transmission complete!');
socket.emit('clientFinishTransmission');
return;
}
var chunk = bin.slice(sent, sent + chunkSize);
socket.once('serverRequestContinue', beginTransfer);
socket.emit('clientSentChunk', chunk);
sent += chunk.byteLength;
console.log('Sent: ' + sent);
}
beginTransfer();
});
socket.emit('clientRequestFileTransfer', {
name: file.name,
size: file.size
});
};
fileReader.readAsArrayBuffer(file);
});
};
我用 4,162,611 字节大小的文件测试了这段代码,它有 1 次写入失败(1 次背压)。上传后,我检查了创建文件的大小,它是4,172,851字节,比原来的大10240字节,是chunk(10240)的大小。
有时写入失败 2 次,大小比原来的大 20480 字节,是我发送的块大小的两倍。
我仔细检查了我的 Backpressure 代码,但对我来说似乎没什么问题。我正在使用 Node v6.2.2 并使用 Socket.io v1.6.0,从 Chrome 浏览器测试。我错过了什么吗?还是我误解了背压? 任何建议将不胜感激。
更新
看起来当背压发生时,它写了两次相同的数据(正如我在评论中所说)。所以我修改了这样的代码:
socket.on('clientSentChunk', (chunk) => {
function write() {
var writeDone = writeStream.write(chunk);
wrote += chunk.length;
if(!writeDone) {
console.log('**************** Back pressure ****************');
// writeStream.once('drain', write);
// no rewrite, just continue transmission
writeStream.once('drain', () => socket.emit('serverRequestContinue'));
}
else {
console.log(`Wrote chunks: ${chunk.length} / ${wrote} / ${fileSize}`);
socket.emit('serverRequestContinue');
}
}
write();
});
成功了。我很困惑,因为当可写流写入失败时,它不会将数据写入流中,但实际上不会。有人知道吗?
我想问题是你如何在 var bin = ...;
中创建 bin 对象。你能把代码放在这里吗?
更新
后台代码如下:
var express = require('express');
var app = express();
var server = app.listen(80);
var io = require('socket.io');
var fs = require('fs');
io = io.listen(server);
app.use(express.static(__dirname + '/public'));
app.use(function(req, res, next) {
//res.send({a:1})
res.sendFile(__dirname + '/socket_test_index.html');
});
io.on('connection', function(client) {
console.log('Client connected...', client);
client.on('join', function(data) {
//console.log(data);
});
setInterval(()=>{
client.emit('news', 'news from server');
}, 10000)
});
io.of('/upload', function(client){
console.log('upload')
logic(client);
})
function logic(socket) {
var writeStream = null;
var fileSize = 0;
var wrote = 0;
socket.on('clientRequestFileTransfer', (fileInfo) => {
console.log(`Client request file transfer: ${fileInfo.name}(${fileInfo.size})`);
fileSize = fileInfo.size;
wrote = 0;
writeStream = fs.createWriteStream(__dirname + '/' + fileInfo.name);
writeStream.on('close', () => {
console.log('Write stream ended.');
});
console.log('File created.');
socket.emit('serverGrantFileTransfer');
});
socket.on('clientSentChunk', (chunk) => {
function write() {
var writeDone = writeStream.write(chunk);
if(!writeDone) {
console.log('Back pressure!');
return writeStream.once('drain', write);
}
else {
wrote += chunk.length;
console.log(`Wrote chunks: ${chunk.length} / ${wrote} / ${fileSize}`);
socket.emit('serverRequestContinue');
}
}
write();
});
socket.on('clientFinishTransmission', () => {
writeStream.end();
console.log('Transmission complete!');
});
socket.emit('serverGrantFileTransfer', {});
}
这里是 html 代码:
<script src="/socket.io/socket.io.js"></script>
<script>
var socket = io('http://localhost');
socket.on('news', function (data) {
console.log(data);
});
socket.on('connect', function(data) {
socket.emit('join', 'Hello World from client');
});
</script>
<input type="file" id="file" />
<script>
var fileEl = document.getElementById('file');
fileEl.onchange = function() {
var file = fileEl.files[0];
if(!file) return;
var socket = io('http://localhost/upload');
socket.on('connect', function() {
var fileReader = new FileReader();
fileReader.onloadend = function() {
var bin = fileReader.result;
var chunkSize = 10240;
var sent = 0;
// make server knows the name and size of the file
socket.once('serverGrantFileTransfer', () => {
function beginTransfer() {
if(sent >= bin.byteLength) {
console.log('Transmission complete!');
socket.emit('clientFinishTransmission');
return;
}
var chunk = bin.slice(sent, sent + chunkSize);
socket.once('serverRequestContinue', beginTransfer);
socket.emit('clientSentChunk', chunk);
sent += chunk.byteLength;
console.log('Sent: ' + sent);
}
beginTransfer();
});
socket.emit('clientRequestFileTransfer', {
name: file.name,
size: file.size
});
};
fileReader.readAsArrayBuffer(file);
});
};
</script>
将它们复制到 express 项目的根目录。我用两张图片和一个 .pdf 文件进行了测试。他们都被传输完全相同的字节