如何调用 S3.putObject(或 S3.upload)并使用 gzip 流作为正文

How to call S3.putObject (or S3.upload) and use a gzip stream as body

由于我们的日志记录机制无法创建大的 gz 文件,我正在尝试使用 lambda 来完成。当我将所有这些从 S3 加载到内存中并随后创建 gzip 文件时,它就起作用了。但这需要太多内存。这就是我尝试以下操作的原因:将 gzip 流启动到内存中,当我从 S3 接收到文件内容时,我将其写入 gzip 流。没有运气。除了其他想法,我尝试了下面的代码。

我从这里 https://github.com/aws/aws-sdk-js/issues/2961 读到 aws-sdk 需要知道流的长度。这就是我使用 streamToBuffer 函数的原因,该函数也在给定 link.

中进行了描述
const aws = require('aws-sdk')
const zlib = require('zlib')
const stream = require('stream')

async function streamToBuffer(readableStream) {
  const chunks = []
  return new Promise((resolve, reject) => {
    readableStream.on('data', (chunk) => chunks.push(Buffer.from(chunk)))
    readableStream.on('error', (err) => reject(err))
    readableStream.on('end', () => resolve(Buffer.concat(chunks)))
  })
}

const gzip = zlib.createGzip()
gzip.setEncoding('utf8')

for (let ii = 0; ii < files.length; ii++) {
  const params = {
    Bucket: srcBucket,
    Key: `${files[ii]}`,
  };
  console.log('Get:', params.Key, 'from:', params.Bucket);
  var resp = await s3.getObject(params).promise().catch(err=>{
    console.log(err, err.stack)
    return 'Failed to list objects'
  })
  
  gzip.write(resp.Body);
}

gzip.flush()
gzip.end()

var destPath = files[0].replace(srcPrefix, destPrefix).replace('.txt','.gz')

var msg = 'merging ' + srcBucket + ':' + currentPrefix + '* to ' + srcBucket + ':' + destPath

console.log('Attempting: ' + msg);
const data = await s3.putObject({
  Bucket: srcBucket,
  Key: destPath,
  Body: await streamToBuffer(gzip)
}).promise().catch(err => {
  console.log('Error: ' + msg)
  console.log(err, err.stack)
  return -1
})

if (data === -1) {
  return 'Error while putting new object to S3'
}
  
console.log('Success: ' + msg);
console.log(data);

代码将文件放入 S3。 但是,它不是一个正确的 gzip 文件。我无法打开它。我知道代码不是很好。但我认为它应该有效。感谢您的帮助。

编辑: 我忘了说日志文件是包含 json 条目的文本文件。还没有gzipped。

更新: 我尝试使用 s3.upload 而不是 s3.putObject 直接使用 gzip 流,因为上传应该支持它。它导致了这个错误:

"The \"list[0]\" argument must be an instance of Buffer or Uint8Array. Received type string ('\u001f\ufffd\b\u0000\u0000\u0...)"

按照此处所述将 gzip.setEncoding('utf8') 设置为 gzip.setEncoding(null) https://github.com/aws/aws-sdk-js/issues/2081 没有帮助。

我终于成功了! 我不必为 gzip 设置编码,但在 write 期间。这是我创建正确 gzip 文件的代码:

const aws = require('aws-sdk')
const zlib = require('zlib')
const stream = require('stream')

async function streamToBuffer(readableStream) {
  const chunks = []
  return new Promise((resolve, reject) => {
    readableStream.on('data', (chunk) => chunks.push(Buffer.from(chunk)))
    readableStream.on('error', (err) => reject(err))
    readableStream.on('end', () => resolve(Buffer.concat(chunks)))
  })
}

const gzip = zlib.createGzip()

for (let ii = 0; ii < files.length; ii++) {
  const params = {
    Bucket: srcBucket,
    Key: `${files[ii]}`,
  };
  console.log('Get:', params.Key, 'from:', params.Bucket);
  var resp = await s3.getObject(params).promise().catch(err=>{
    console.log(err, err.stack)
    return 'Failed to list objects'
  })
  
  // Add the encoding to create correct gzip files!
  gzip.write(resp.Body, 'utf-8');
}

gzip.flush()
gzip.end()

var destPath = files[0].replace(srcPrefix, destPrefix).replace('.txt','.gz')

var msg = 'merging ' + srcBucket + ':' + currentPrefix + '* to ' + srcBucket + ':' + destPath

console.log('Attempting: ' + msg);
const data = await s3.putObject({
  Bucket: srcBucket,
  Key: destPath,
  ContentType: "application/json",
  ContentEncoding: "gzip",
  Body: await streamToBuffer(gzip)
}).promise().catch(err => {
  console.log('Error: ' + msg)
  console.log(err, err.stack)
  return -1
})

if (data === -1) {
  return 'Error while putting new object to S3'
}
  
console.log('Success: ' + msg);
console.log(data);

代码仍然不够好,感谢您的建议。

您一定要使用 Node.js 吗?对于 gzipping,您还可以使用 Python Lambda 函数。使用 gzipzipfile 库,这可能非常简单:

gzipped_content = gzip.compress(f_in.read())
destinationbucket.upload_fileobj(io.BytesIO(gzipped_content),
                                                        final_file_path,
                                                        ExtraArgs={"ContentType": "text/plain"}
                                                )

这里有 lambda 函数的完整教程:https://medium.com/p/f7bccf0099c9