如何调用 S3.putObject(或 S3.upload)并使用 gzip 流作为正文
How to call S3.putObject (or S3.upload) and use a gzip stream as body
由于我们的日志记录机制无法创建大的 gz 文件,我正在尝试使用 lambda 来完成。当我将所有这些从 S3 加载到内存中并随后创建 gzip 文件时,它就起作用了。但这需要太多内存。这就是我尝试以下操作的原因:将 gzip 流启动到内存中,当我从 S3 接收到文件内容时,我将其写入 gzip 流。没有运气。除了其他想法,我尝试了下面的代码。
我从这里 https://github.com/aws/aws-sdk-js/issues/2961 读到 aws-sdk 需要知道流的长度。这就是我使用 streamToBuffer 函数的原因,该函数也在给定 link.
中进行了描述
const aws = require('aws-sdk')
const zlib = require('zlib')
const stream = require('stream')
async function streamToBuffer(readableStream) {
const chunks = []
return new Promise((resolve, reject) => {
readableStream.on('data', (chunk) => chunks.push(Buffer.from(chunk)))
readableStream.on('error', (err) => reject(err))
readableStream.on('end', () => resolve(Buffer.concat(chunks)))
})
}
const gzip = zlib.createGzip()
gzip.setEncoding('utf8')
for (let ii = 0; ii < files.length; ii++) {
const params = {
Bucket: srcBucket,
Key: `${files[ii]}`,
};
console.log('Get:', params.Key, 'from:', params.Bucket);
var resp = await s3.getObject(params).promise().catch(err=>{
console.log(err, err.stack)
return 'Failed to list objects'
})
gzip.write(resp.Body);
}
gzip.flush()
gzip.end()
var destPath = files[0].replace(srcPrefix, destPrefix).replace('.txt','.gz')
var msg = 'merging ' + srcBucket + ':' + currentPrefix + '* to ' + srcBucket + ':' + destPath
console.log('Attempting: ' + msg);
const data = await s3.putObject({
Bucket: srcBucket,
Key: destPath,
Body: await streamToBuffer(gzip)
}).promise().catch(err => {
console.log('Error: ' + msg)
console.log(err, err.stack)
return -1
})
if (data === -1) {
return 'Error while putting new object to S3'
}
console.log('Success: ' + msg);
console.log(data);
代码将文件放入 S3。 但是,它不是一个正确的 gzip 文件。我无法打开它。我知道代码不是很好。但我认为它应该有效。感谢您的帮助。
编辑: 我忘了说日志文件是包含 json 条目的文本文件。还没有gzipped。
更新: 我尝试使用 s3.upload 而不是 s3.putObject 直接使用 gzip 流,因为上传应该支持它。它导致了这个错误:
"The \"list[0]\" argument must be an instance of Buffer or Uint8Array. Received type string ('\u001f\ufffd\b\u0000\u0000\u0...)"
按照此处所述将 gzip.setEncoding('utf8')
设置为 gzip.setEncoding(null)
https://github.com/aws/aws-sdk-js/issues/2081 没有帮助。
我终于成功了!
我不必为 gzip 设置编码,但在 write 期间。这是我创建正确 gzip 文件的代码:
const aws = require('aws-sdk')
const zlib = require('zlib')
const stream = require('stream')
async function streamToBuffer(readableStream) {
const chunks = []
return new Promise((resolve, reject) => {
readableStream.on('data', (chunk) => chunks.push(Buffer.from(chunk)))
readableStream.on('error', (err) => reject(err))
readableStream.on('end', () => resolve(Buffer.concat(chunks)))
})
}
const gzip = zlib.createGzip()
for (let ii = 0; ii < files.length; ii++) {
const params = {
Bucket: srcBucket,
Key: `${files[ii]}`,
};
console.log('Get:', params.Key, 'from:', params.Bucket);
var resp = await s3.getObject(params).promise().catch(err=>{
console.log(err, err.stack)
return 'Failed to list objects'
})
// Add the encoding to create correct gzip files!
gzip.write(resp.Body, 'utf-8');
}
gzip.flush()
gzip.end()
var destPath = files[0].replace(srcPrefix, destPrefix).replace('.txt','.gz')
var msg = 'merging ' + srcBucket + ':' + currentPrefix + '* to ' + srcBucket + ':' + destPath
console.log('Attempting: ' + msg);
const data = await s3.putObject({
Bucket: srcBucket,
Key: destPath,
ContentType: "application/json",
ContentEncoding: "gzip",
Body: await streamToBuffer(gzip)
}).promise().catch(err => {
console.log('Error: ' + msg)
console.log(err, err.stack)
return -1
})
if (data === -1) {
return 'Error while putting new object to S3'
}
console.log('Success: ' + msg);
console.log(data);
代码仍然不够好,感谢您的建议。
您一定要使用 Node.js 吗?对于 gzipping,您还可以使用 Python Lambda 函数。使用 gzip
和 zipfile
库,这可能非常简单:
gzipped_content = gzip.compress(f_in.read())
destinationbucket.upload_fileobj(io.BytesIO(gzipped_content),
final_file_path,
ExtraArgs={"ContentType": "text/plain"}
)
这里有 lambda 函数的完整教程:https://medium.com/p/f7bccf0099c9
由于我们的日志记录机制无法创建大的 gz 文件,我正在尝试使用 lambda 来完成。当我将所有这些从 S3 加载到内存中并随后创建 gzip 文件时,它就起作用了。但这需要太多内存。这就是我尝试以下操作的原因:将 gzip 流启动到内存中,当我从 S3 接收到文件内容时,我将其写入 gzip 流。没有运气。除了其他想法,我尝试了下面的代码。
我从这里 https://github.com/aws/aws-sdk-js/issues/2961 读到 aws-sdk 需要知道流的长度。这就是我使用 streamToBuffer 函数的原因,该函数也在给定 link.
中进行了描述const aws = require('aws-sdk')
const zlib = require('zlib')
const stream = require('stream')
async function streamToBuffer(readableStream) {
const chunks = []
return new Promise((resolve, reject) => {
readableStream.on('data', (chunk) => chunks.push(Buffer.from(chunk)))
readableStream.on('error', (err) => reject(err))
readableStream.on('end', () => resolve(Buffer.concat(chunks)))
})
}
const gzip = zlib.createGzip()
gzip.setEncoding('utf8')
for (let ii = 0; ii < files.length; ii++) {
const params = {
Bucket: srcBucket,
Key: `${files[ii]}`,
};
console.log('Get:', params.Key, 'from:', params.Bucket);
var resp = await s3.getObject(params).promise().catch(err=>{
console.log(err, err.stack)
return 'Failed to list objects'
})
gzip.write(resp.Body);
}
gzip.flush()
gzip.end()
var destPath = files[0].replace(srcPrefix, destPrefix).replace('.txt','.gz')
var msg = 'merging ' + srcBucket + ':' + currentPrefix + '* to ' + srcBucket + ':' + destPath
console.log('Attempting: ' + msg);
const data = await s3.putObject({
Bucket: srcBucket,
Key: destPath,
Body: await streamToBuffer(gzip)
}).promise().catch(err => {
console.log('Error: ' + msg)
console.log(err, err.stack)
return -1
})
if (data === -1) {
return 'Error while putting new object to S3'
}
console.log('Success: ' + msg);
console.log(data);
代码将文件放入 S3。 但是,它不是一个正确的 gzip 文件。我无法打开它。我知道代码不是很好。但我认为它应该有效。感谢您的帮助。
编辑: 我忘了说日志文件是包含 json 条目的文本文件。还没有gzipped。
更新: 我尝试使用 s3.upload 而不是 s3.putObject 直接使用 gzip 流,因为上传应该支持它。它导致了这个错误:
"The \"list[0]\" argument must be an instance of Buffer or Uint8Array. Received type string ('\u001f\ufffd\b\u0000\u0000\u0...)"
按照此处所述将 gzip.setEncoding('utf8')
设置为 gzip.setEncoding(null)
https://github.com/aws/aws-sdk-js/issues/2081 没有帮助。
我终于成功了! 我不必为 gzip 设置编码,但在 write 期间。这是我创建正确 gzip 文件的代码:
const aws = require('aws-sdk')
const zlib = require('zlib')
const stream = require('stream')
async function streamToBuffer(readableStream) {
const chunks = []
return new Promise((resolve, reject) => {
readableStream.on('data', (chunk) => chunks.push(Buffer.from(chunk)))
readableStream.on('error', (err) => reject(err))
readableStream.on('end', () => resolve(Buffer.concat(chunks)))
})
}
const gzip = zlib.createGzip()
for (let ii = 0; ii < files.length; ii++) {
const params = {
Bucket: srcBucket,
Key: `${files[ii]}`,
};
console.log('Get:', params.Key, 'from:', params.Bucket);
var resp = await s3.getObject(params).promise().catch(err=>{
console.log(err, err.stack)
return 'Failed to list objects'
})
// Add the encoding to create correct gzip files!
gzip.write(resp.Body, 'utf-8');
}
gzip.flush()
gzip.end()
var destPath = files[0].replace(srcPrefix, destPrefix).replace('.txt','.gz')
var msg = 'merging ' + srcBucket + ':' + currentPrefix + '* to ' + srcBucket + ':' + destPath
console.log('Attempting: ' + msg);
const data = await s3.putObject({
Bucket: srcBucket,
Key: destPath,
ContentType: "application/json",
ContentEncoding: "gzip",
Body: await streamToBuffer(gzip)
}).promise().catch(err => {
console.log('Error: ' + msg)
console.log(err, err.stack)
return -1
})
if (data === -1) {
return 'Error while putting new object to S3'
}
console.log('Success: ' + msg);
console.log(data);
代码仍然不够好,感谢您的建议。
您一定要使用 Node.js 吗?对于 gzipping,您还可以使用 Python Lambda 函数。使用 gzip
和 zipfile
库,这可能非常简单:
gzipped_content = gzip.compress(f_in.read())
destinationbucket.upload_fileobj(io.BytesIO(gzipped_content),
final_file_path,
ExtraArgs={"ContentType": "text/plain"}
)
这里有 lambda 函数的完整教程:https://medium.com/p/f7bccf0099c9