使用 NodeJS 从 AWS S3 流式传输文件
streaming files from AWS S3 with NodeJS
我正在尝试将大型 csv 文件中的数据流式传输到 readline。我尝试将 readStream 从 s3 传输到 readline 输入,但是我遇到了一个错误,S3 只允许连接在一定时间内保持打开状态。
我正在从 s3 创建流,如下所示:
import * as AWS from 'aws-sdk';
import {s3Env} from '../config';
export default async function createAWSStream(): Promise<SmartStream> {
return new Promise((resolve, reject) => {
const params = {
Bucket: s3Env.bucket,
Key: s3Env.key
};
try {
const s3 = new AWS.S3({
accessKeyId: s3Env.accessKey,
secretAccessKey: s3Env.secret
});
s3.headObject(bucketParams, (error, data) => {
if (error) {
throw error
};
const stream = s3.getObject(params).createReadStream();
resolve(stream);
})
} catch (error) {
reject(error);
}
})
}
然后我将它传送到 readline:
import * as readline from 'readline';
import createAWSStream from './createAWSStream';
export const readCSVFile = async function(): Promise<void> {
const rStream = await createAWSStream();
const lineReader = readline.createInterface({
input: rStream
});
for await (const line of lineReader) {
// process line
}
}
我发现 s3 连接的超时设置为 120000 毫秒(2 分钟)。我尝试简单地提高超时时间,但是我 运行 遇到更多来自 HTTPS 连接的超时问题。
如何才能以正确的方式从 AWS S3 流式传输数据,而无需将大量超时设置为某个非常大的时间范围?
我能够使用 AWS S3 Range
property and creating a custom readable stream with NodeJS Stream API 来解决这个问题。
通过使用这个“智能流”,我能够在对 S3 实例的单独请求中获取数据块。通过分块获取数据,我避免了任何超时错误并创建了更高效的流。 NodeJS Readable Super class 处理缓冲区,以免使 readline 的输入过载。它还会自动处理流的暂停和恢复。
这 class 使得从 AWS S3 流式传输大文件变得非常容易:
import {Readable, ReadableOptions} from "stream";
import type {S3} from "aws-sdk";
export class SmartStream extends Readable {
_currentCursorPosition = 0; // Holds the current starting position for our range queries
_s3DataRange = 2048 * 1024; // Amount of bytes to grab (I have jacked this up HD video files)
_maxContentLength: number; // Total number of bites in the file
_s3: S3; // AWS.S3 instance
_s3StreamParams: S3.GetObjectRequest; // Parameters passed into s3.getObject method
constructor(
parameters: S3.GetObjectRequest,
s3: S3,
maxLength: number,
// You can pass any ReadableStream options to the NodeJS Readable super class here
// For this example we wont use this, however I left it in to be more robust
nodeReadableStreamOptions?: ReadableOptions
) {
super(nodeReadableStreamOptions);
this._maxContentLength = maxLength;
this._s3 = s3;
this._s3StreamParams = parameters;
}
_read() {
if (this._currentCursorPosition > this._maxContentLength) {
// If the current position is greater than the amount of bytes in the file
// We push null into the buffer, NodeJS ReadableStream will see this as the end of file (EOF) and emit the 'end' event
this.push(null);
} else {
// Calculate the range of bytes we want to grab
const range = this._currentCursorPosition + this._s3DataRange;
// If the range is greater than the total number of bytes in the file
// We adjust the range to grab the remaining bytes of data
const adjustedRange =
range < this._maxContentLength ? range : this._maxContentLength;
// Set the Range property on our s3 stream parameters
this._s3StreamParams.Range = `bytes=${this._currentCursorPosition}-${adjustedRange}`;
// Update the current range beginning for the next go
this._currentCursorPosition = adjustedRange + 1;
// Grab the range of bytes from the file
this._s3.getObject(this._s3StreamParams, (error, data) => {
if (error) {
// If we encounter an error grabbing the bytes
// We destroy the stream, NodeJS ReadableStream will emit the 'error' event
this.destroy(error);
} else {
// We push the data into the stream buffer
this.push(data.Body);
}
});
}
}
}
为了将它应用到 createAWSStream
函数中,我只是替换了创建 readStream 的行:
const stream = s3.getObject(params).createReadStream();
改为创建我的 SmartStream
class 的实例,传入 s3 参数对象、s3 实例和数据的内容长度。
const stream = new SmartStream(params, s3, data.ContentLength);
我正在尝试将大型 csv 文件中的数据流式传输到 readline。我尝试将 readStream 从 s3 传输到 readline 输入,但是我遇到了一个错误,S3 只允许连接在一定时间内保持打开状态。
我正在从 s3 创建流,如下所示:
import * as AWS from 'aws-sdk';
import {s3Env} from '../config';
export default async function createAWSStream(): Promise<SmartStream> {
return new Promise((resolve, reject) => {
const params = {
Bucket: s3Env.bucket,
Key: s3Env.key
};
try {
const s3 = new AWS.S3({
accessKeyId: s3Env.accessKey,
secretAccessKey: s3Env.secret
});
s3.headObject(bucketParams, (error, data) => {
if (error) {
throw error
};
const stream = s3.getObject(params).createReadStream();
resolve(stream);
})
} catch (error) {
reject(error);
}
})
}
然后我将它传送到 readline:
import * as readline from 'readline';
import createAWSStream from './createAWSStream';
export const readCSVFile = async function(): Promise<void> {
const rStream = await createAWSStream();
const lineReader = readline.createInterface({
input: rStream
});
for await (const line of lineReader) {
// process line
}
}
我发现 s3 连接的超时设置为 120000 毫秒(2 分钟)。我尝试简单地提高超时时间,但是我 运行 遇到更多来自 HTTPS 连接的超时问题。
如何才能以正确的方式从 AWS S3 流式传输数据,而无需将大量超时设置为某个非常大的时间范围?
我能够使用 AWS S3 Range
property and creating a custom readable stream with NodeJS Stream API 来解决这个问题。
通过使用这个“智能流”,我能够在对 S3 实例的单独请求中获取数据块。通过分块获取数据,我避免了任何超时错误并创建了更高效的流。 NodeJS Readable Super class 处理缓冲区,以免使 readline 的输入过载。它还会自动处理流的暂停和恢复。
这 class 使得从 AWS S3 流式传输大文件变得非常容易:
import {Readable, ReadableOptions} from "stream";
import type {S3} from "aws-sdk";
export class SmartStream extends Readable {
_currentCursorPosition = 0; // Holds the current starting position for our range queries
_s3DataRange = 2048 * 1024; // Amount of bytes to grab (I have jacked this up HD video files)
_maxContentLength: number; // Total number of bites in the file
_s3: S3; // AWS.S3 instance
_s3StreamParams: S3.GetObjectRequest; // Parameters passed into s3.getObject method
constructor(
parameters: S3.GetObjectRequest,
s3: S3,
maxLength: number,
// You can pass any ReadableStream options to the NodeJS Readable super class here
// For this example we wont use this, however I left it in to be more robust
nodeReadableStreamOptions?: ReadableOptions
) {
super(nodeReadableStreamOptions);
this._maxContentLength = maxLength;
this._s3 = s3;
this._s3StreamParams = parameters;
}
_read() {
if (this._currentCursorPosition > this._maxContentLength) {
// If the current position is greater than the amount of bytes in the file
// We push null into the buffer, NodeJS ReadableStream will see this as the end of file (EOF) and emit the 'end' event
this.push(null);
} else {
// Calculate the range of bytes we want to grab
const range = this._currentCursorPosition + this._s3DataRange;
// If the range is greater than the total number of bytes in the file
// We adjust the range to grab the remaining bytes of data
const adjustedRange =
range < this._maxContentLength ? range : this._maxContentLength;
// Set the Range property on our s3 stream parameters
this._s3StreamParams.Range = `bytes=${this._currentCursorPosition}-${adjustedRange}`;
// Update the current range beginning for the next go
this._currentCursorPosition = adjustedRange + 1;
// Grab the range of bytes from the file
this._s3.getObject(this._s3StreamParams, (error, data) => {
if (error) {
// If we encounter an error grabbing the bytes
// We destroy the stream, NodeJS ReadableStream will emit the 'error' event
this.destroy(error);
} else {
// We push the data into the stream buffer
this.push(data.Body);
}
});
}
}
}
为了将它应用到 createAWSStream
函数中,我只是替换了创建 readStream 的行:
const stream = s3.getObject(params).createReadStream();
改为创建我的 SmartStream
class 的实例,传入 s3 参数对象、s3 实例和数据的内容长度。
const stream = new SmartStream(params, s3, data.ContentLength);