如何将从 gunzip 流修改的数据传输到 gzip 流?
How to pipe data modified from a gunzip stream into a gzip stream?
我需要通过 http 请求触发一个过程,我从 S3 下载一些数据,对其进行压缩,修改流,将其压缩并发送到 S3 中的另一个存储桶。
到目前为止我可以:
- 下载
- Gunzip
- 修改(过滤)数据
- return数据
或:
- 下载
- Gunzip
- Gzip
- 上传未修改的数据并检索对象的url
我的第一次尝试是使用 gunzip 流中的 on('data') 事件来修改数据;然后当 'end' 事件被抛出时,我可以 return 将它发送给发出请求的浏览器。
var accumulator = [];
gunzip.on('data', chunk=>{
var lines = chunk.toString('utf-8').split(\n);
lines.forEach(line=>{
if(shouldBeFiltered(line)){
accumulator.push(line);
}
})
})
gunzip.on('end', ()=>{
res.send(accumulator);
})
getS3.pipe(gunzip)
如果我尝试将 gunzip 传输到 gzip,而不是 returning 结果 (res.send),过滤器将被忽略。这是有道理的,因为我有一个 accumulator 数组,当结束事件被抛出时我 return (在前面的例子中)。
然后经过一番挖掘,我发现了一个建议应该将数据 pushed 到的参考资料,我尝试了以下方法,但没有用:
gunzip.on('data', chunk=>{
var lines = chunk.toString('utf-8').split(\n);
lines.forEach(line=>{
if(shouldBeFiltered(line)){
gunzip.push(line);
}
})
})
// the end event no longer mattered
// gunzip.on('end', ()=>{
// res.send(accumulator);
// })
getS3.pipe(gunzip).pipe(gzip).pipe(putS3(putS3param.Key, putS3param.Bucket));
然后我尝试创建一个转换流(这在我尝试这个概念时非常简单),但是我遇到了一个内部错误:
const stream = require('stream');
const Transform = stream.Transform;
function filter(pipeline) {
var the_filter = new Transform({
transform(chunk, encoding, next) {
console.log();
chunk += Buffer('Modified', 'utf-8');
this.push(chunk);
next();
}
});
pipeline.pipe(the_filter);
}
除了创建一个文件并将其压缩并上传之外,我没有其他想法。
感谢您的帮助!
经过多方挖掘,我终于在这个 page
中找到了答案
似乎将 Transform 设置为 objectMode 缺少了什么,除此之外,我看不到任何相关内容。
var stream = require('stream')
var liner = new stream.Transform( { objectMode: true } )
liner._transform = function (chunk, encoding, done) {
var data = chunk.toString()
if (this._lastLineData) data = this._lastLineData + data
var lines = data.split('\n')
this._lastLineData = lines.splice(lines.length-1,1)[0]
lines.forEach(this.push.bind(this))
done()
}
liner._flush = function (done) {
if (this._lastLineData) this.push(this._lastLineData)
this._lastLineData = null
done()
}
module.exports = liner
我需要通过 http 请求触发一个过程,我从 S3 下载一些数据,对其进行压缩,修改流,将其压缩并发送到 S3 中的另一个存储桶。
到目前为止我可以:
- 下载
- Gunzip
- 修改(过滤)数据
- return数据
或:
- 下载
- Gunzip
- Gzip
- 上传未修改的数据并检索对象的url
我的第一次尝试是使用 gunzip 流中的 on('data') 事件来修改数据;然后当 'end' 事件被抛出时,我可以 return 将它发送给发出请求的浏览器。
var accumulator = [];
gunzip.on('data', chunk=>{
var lines = chunk.toString('utf-8').split(\n);
lines.forEach(line=>{
if(shouldBeFiltered(line)){
accumulator.push(line);
}
})
})
gunzip.on('end', ()=>{
res.send(accumulator);
})
getS3.pipe(gunzip)
如果我尝试将 gunzip 传输到 gzip,而不是 returning 结果 (res.send),过滤器将被忽略。这是有道理的,因为我有一个 accumulator 数组,当结束事件被抛出时我 return (在前面的例子中)。
然后经过一番挖掘,我发现了一个建议应该将数据 pushed 到的参考资料,我尝试了以下方法,但没有用:
gunzip.on('data', chunk=>{
var lines = chunk.toString('utf-8').split(\n);
lines.forEach(line=>{
if(shouldBeFiltered(line)){
gunzip.push(line);
}
})
})
// the end event no longer mattered
// gunzip.on('end', ()=>{
// res.send(accumulator);
// })
getS3.pipe(gunzip).pipe(gzip).pipe(putS3(putS3param.Key, putS3param.Bucket));
然后我尝试创建一个转换流(这在我尝试这个概念时非常简单),但是我遇到了一个内部错误:
const stream = require('stream');
const Transform = stream.Transform;
function filter(pipeline) {
var the_filter = new Transform({
transform(chunk, encoding, next) {
console.log();
chunk += Buffer('Modified', 'utf-8');
this.push(chunk);
next();
}
});
pipeline.pipe(the_filter);
}
除了创建一个文件并将其压缩并上传之外,我没有其他想法。
感谢您的帮助!
经过多方挖掘,我终于在这个 page
中找到了答案似乎将 Transform 设置为 objectMode 缺少了什么,除此之外,我看不到任何相关内容。
var stream = require('stream')
var liner = new stream.Transform( { objectMode: true } )
liner._transform = function (chunk, encoding, done) {
var data = chunk.toString()
if (this._lastLineData) data = this._lastLineData + data
var lines = data.split('\n')
this._lastLineData = lines.splice(lines.length-1,1)[0]
lines.forEach(this.push.bind(this))
done()
}
liner._flush = function (done) {
if (this._lastLineData) this.push(this._lastLineData)
this._lastLineData = null
done()
}
module.exports = liner