如何将多个 ReadableStreams 传输到单个 WriteStream?
How to pipe multiple ReadableStreams to a single WriteStream?
我正在处理防火墙限制,我一次只能 POST 10MB。为了处理更大的上传,我想使用类似 http://www.resumablejs.com 的东西,将多个块写入磁盘,然后在最后重新组合它们。
我现在正在编写测试,但我的实现中有些地方是错误的。
首先,我这样拆分文件:
const splitFile = async () => {
const chunkSize = 1024 * 1024;
const photo = fs.createReadStream(path.resolve(FIXTURES, 'hello-tron.jpg'));
// Write to 2 files
photo.on('readable', function() {
const file1 = path.resolve(TEMP, 'chunk.jpg.1');
const file2 = path.resolve(TEMP, 'chunk.jpg.2');
let data;
while (data = this.read(chunkSize)) {
if (!fs.existsSync(file1)) {
const output1 = fs.createWriteStream(file1);
output1.write(data);
output1.close();
return;
}
const output2 = fs.createWriteStream(file2);
output2.write(data);
if (data === null) {
output2.close();
}
}
});
return new Promise(resolve => {
photo.on('end', resolve);
});
};
然后我像这样重新组装它:
const recombine = async () => {
const output = fs.createWriteStream(path.resolve(TEMP, 'recombined.jpg'));
const file1 = path.resolve(TEMP, 'chunk.jpg.1');
const file2 = path.resolve(TEMP, 'chunk.jpg.2');
return new Promise(resolve => {
const stream1 = fs.createReadStream(file1);
const stream2 = fs.createReadStream(file2);
const recombinator = new Recombinator({
readables: [stream1, stream2]
});
stream1.on('readable', () => {
stream2.on('readable', () => {
recombinator.pipe(output);
});
});
stream1.on('end', () => {
stream2.on('end', () => {
resolve();
});
});
})
};
这是Recombinator
class:
/* Takes multiple readable streams and returns a single
* readable stream that can be piped to a writable stream
*/
const {Readable} = require('stream');
class Recombinator extends Readable {
constructor(opts) {
super({...opts, readables: undefined});
const self = this;
self.readables = opts.readables || [];
}
_read(size) {
this.push(this._getChunk(size));
}
_getChunk(size) {
const reader = this.readables.find(r => !r.closed);
if (!reader) {
return null;
}
const data = reader.read(size);
if (!data) {
reader.closed = true;
return this._getChunk(size);
}
return data;
}
}
module.exports = Recombinator;
这是原图:
这是重新组装的图像:
部分问题是认为 readable
事件只触发一次,但每次有数据要读取时都会触发。嵌套事件处理程序可能也不是很好。
我使用的解决方案是像这样更改 Recombinator
构造函数:
constructor(opts) {
super({...opts, readables: undefined});
const self = this;
self.readables = opts.readables || [];
self._readableCount = 0;
self._endedCount = 0;
// Attach listeners to know when all readables are open and closed
self.readables.forEach(r => {
r.on('readable', () => {
if (r._markedReadable) {
return;
}
r._markedReadable = true;
self._readableCount++;
});
r.on('end', () => {
if (r._markedEnded) {
return;
}
r._markedEnded = true;
self._endedCount++;
});
})
}
并添加异步方法以等待所有阅读器打开,如下所示:
async ready(retry = 10) {
if (this._readableCount === this.readables.length) {
return Promise.resolve();
}
if (retry === 0) {
return Promise.reject(`Timeout waiting for ${this.readables.length} readables to open - got ${this._readableCount}`);
}
await delay(500);
return this.ready(retry - 1);
}
async done(retry = 10) {
if (this._endedCount === this.readables.length) {
return Promise.resolve();
}
if (retry === 0) {
return Promise.reject(`Timeout waiting for ${this.readables.length} readables to end - got ${this._endedCount}`);
}
await delay(500);
return this.done(retry - 1);
}
我正在处理防火墙限制,我一次只能 POST 10MB。为了处理更大的上传,我想使用类似 http://www.resumablejs.com 的东西,将多个块写入磁盘,然后在最后重新组合它们。
我现在正在编写测试,但我的实现中有些地方是错误的。
首先,我这样拆分文件:
const splitFile = async () => {
const chunkSize = 1024 * 1024;
const photo = fs.createReadStream(path.resolve(FIXTURES, 'hello-tron.jpg'));
// Write to 2 files
photo.on('readable', function() {
const file1 = path.resolve(TEMP, 'chunk.jpg.1');
const file2 = path.resolve(TEMP, 'chunk.jpg.2');
let data;
while (data = this.read(chunkSize)) {
if (!fs.existsSync(file1)) {
const output1 = fs.createWriteStream(file1);
output1.write(data);
output1.close();
return;
}
const output2 = fs.createWriteStream(file2);
output2.write(data);
if (data === null) {
output2.close();
}
}
});
return new Promise(resolve => {
photo.on('end', resolve);
});
};
然后我像这样重新组装它:
const recombine = async () => {
const output = fs.createWriteStream(path.resolve(TEMP, 'recombined.jpg'));
const file1 = path.resolve(TEMP, 'chunk.jpg.1');
const file2 = path.resolve(TEMP, 'chunk.jpg.2');
return new Promise(resolve => {
const stream1 = fs.createReadStream(file1);
const stream2 = fs.createReadStream(file2);
const recombinator = new Recombinator({
readables: [stream1, stream2]
});
stream1.on('readable', () => {
stream2.on('readable', () => {
recombinator.pipe(output);
});
});
stream1.on('end', () => {
stream2.on('end', () => {
resolve();
});
});
})
};
这是Recombinator
class:
/* Takes multiple readable streams and returns a single
* readable stream that can be piped to a writable stream
*/
const {Readable} = require('stream');
class Recombinator extends Readable {
constructor(opts) {
super({...opts, readables: undefined});
const self = this;
self.readables = opts.readables || [];
}
_read(size) {
this.push(this._getChunk(size));
}
_getChunk(size) {
const reader = this.readables.find(r => !r.closed);
if (!reader) {
return null;
}
const data = reader.read(size);
if (!data) {
reader.closed = true;
return this._getChunk(size);
}
return data;
}
}
module.exports = Recombinator;
这是原图:
这是重新组装的图像:
部分问题是认为 readable
事件只触发一次,但每次有数据要读取时都会触发。嵌套事件处理程序可能也不是很好。
我使用的解决方案是像这样更改 Recombinator
构造函数:
constructor(opts) {
super({...opts, readables: undefined});
const self = this;
self.readables = opts.readables || [];
self._readableCount = 0;
self._endedCount = 0;
// Attach listeners to know when all readables are open and closed
self.readables.forEach(r => {
r.on('readable', () => {
if (r._markedReadable) {
return;
}
r._markedReadable = true;
self._readableCount++;
});
r.on('end', () => {
if (r._markedEnded) {
return;
}
r._markedEnded = true;
self._endedCount++;
});
})
}
并添加异步方法以等待所有阅读器打开,如下所示:
async ready(retry = 10) {
if (this._readableCount === this.readables.length) {
return Promise.resolve();
}
if (retry === 0) {
return Promise.reject(`Timeout waiting for ${this.readables.length} readables to open - got ${this._readableCount}`);
}
await delay(500);
return this.ready(retry - 1);
}
async done(retry = 10) {
if (this._endedCount === this.readables.length) {
return Promise.resolve();
}
if (retry === 0) {
return Promise.reject(`Timeout waiting for ${this.readables.length} readables to end - got ${this._endedCount}`);
}
await delay(500);
return this.done(retry - 1);
}