使用 NodeJS 从 AWS S3 流式传输文件

streaming files from AWS S3 with NodeJS

我正在尝试将大型 csv 文件中的数据流式传输到 readline。我尝试将 readStream 从 s3 传输到 readline 输入,但是我遇到了一个错误,S3 只允许连接在一定时间内保持打开状态。

我正在从 s3 创建流,如下所示:

import * as AWS from 'aws-sdk';
import {s3Env} from '../config';

export default async function createAWSStream(): Promise<SmartStream> {
    return new Promise((resolve, reject) => {
        const params = {
            Bucket: s3Env.bucket,
            Key: s3Env.key
        };

        try {
            const s3 = new AWS.S3({
                accessKeyId: s3Env.accessKey,
                secretAccessKey: s3Env.secret
            });

            s3.headObject(bucketParams, (error, data) => {
                if (error) {
                    throw error
                };

                const stream = s3.getObject(params).createReadStream();

                resolve(stream);
            })
        } catch (error) {
            reject(error);
        }
    })
}

然后我将它传送到 readline:

import * as readline from 'readline';
import createAWSStream from './createAWSStream';

export const readCSVFile = async function(): Promise<void> {
  const rStream = await createAWSStream();

  const lineReader = readline.createInterface({
    input: rStream
  });

  for await (const line of lineReader) {
    // process line
  }
}

我发现 s3 连接的超时设置为 120000 毫秒(2 分钟)。我尝试简单地提高超时时间,但是我 运行 遇到更多来自 HTTPS 连接的超时问题。

如何才能以正确的方式从 AWS S3 流式传输数据,而无需将大量超时设置为某个非常大的时间范围?

我能够使用 AWS S3 Range property and creating a custom readable stream with NodeJS Stream API 来解决这个问题。

通过使用这个“智能流”,我能够在对 S3 实例的单独请求中获取数据块。通过分块获取数据,我避免了任何超时错误并创建了更高效的流。 NodeJS Readable Super class 处理缓冲区,以免使 readline 的输入过载。它还会自动处理流的暂停和恢复。

这 class 使得从 AWS S3 流式传输大文件变得非常容易:

import {Readable, ReadableOptions} from "stream";
import type {S3} from "aws-sdk";

export class SmartStream extends Readable {
    _currentCursorPosition = 0; // Holds the current starting position for our range queries
    _s3DataRange = 2048 * 1024; // Amount of bytes to grab (I have jacked this up HD video files)
    _maxContentLength: number; // Total number of bites in the file
    _s3: S3; // AWS.S3 instance
    _s3StreamParams: S3.GetObjectRequest; // Parameters passed into s3.getObject method

    constructor(
        parameters: S3.GetObjectRequest,
        s3: S3,
        maxLength: number,
        // You can pass any ReadableStream options to the NodeJS Readable super class here
        // For this example we wont use this, however I left it in to be more robust
        nodeReadableStreamOptions?: ReadableOptions
    ) {
        super(nodeReadableStreamOptions);
        this._maxContentLength = maxLength;
        this._s3 = s3;
        this._s3StreamParams = parameters;
    }

    _read() {
        if (this._currentCursorPosition > this._maxContentLength) {
            // If the current position is greater than the amount of bytes in the file
            // We push null into the buffer, NodeJS ReadableStream will see this as the end of file (EOF) and emit the 'end' event
            this.push(null);
        } else {
            // Calculate the range of bytes we want to grab
            const range = this._currentCursorPosition + this._s3DataRange;
            // If the range is greater than the total number of bytes in the file
            // We adjust the range to grab the remaining bytes of data
            const adjustedRange =
                range < this._maxContentLength ? range : this._maxContentLength;
            // Set the Range property on our s3 stream parameters
            this._s3StreamParams.Range = `bytes=${this._currentCursorPosition}-${adjustedRange}`;
            // Update the current range beginning for the next go
            this._currentCursorPosition = adjustedRange + 1;
            // Grab the range of bytes from the file
            this._s3.getObject(this._s3StreamParams, (error, data) => {
                if (error) {
                    // If we encounter an error grabbing the bytes
                    // We destroy the stream, NodeJS ReadableStream will emit the 'error' event
                    this.destroy(error);
                } else {
                    // We push the data into the stream buffer
                    this.push(data.Body);
                }
            });
        }
    }
}

为了将它应用到 createAWSStream 函数中,我只是替换了创建 readStream 的行:

const stream = s3.getObject(params).createReadStream();

改为创建我的 SmartStream class 的实例,传入 s3 参数对象、s3 实例和数据的内容长度。

const stream = new SmartStream(params, s3, data.ContentLength);