@aws-sdk/lib-storage 使用 JSONStream.stringify() 从 MongoDB 流式传输 JSON 到 S3
@aws-sdk/lib-storage to Stream JSON from MongoDB to S3 with JSONStream.stringify()
我正在尝试使用新版本的@aws-sdk/lib-storage将JSON从MongoDB流式传输到S3:
"@aws-sdk/client-s3": "^3.17.0"
"@aws-sdk/lib-storage": "^3.34.0"
"JSONStream": "^1.3.5",
尝试 #1:我似乎没有正确使用 JSONStream.stringify():
import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
readStream.pipe(JSONStream.stringify());
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: readStream,
},
});
await upload.done();
}
catch (err) {
log.error(err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
错误 #1:
TypeError [ERR_INVALID_ARG_TYPE]: The first argument must be one of
type string, Buffer, ArrayBuffer, Array, or Array-like Object.
Received type object
at Function.from (buffer.js:305:9)
at getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18)
at processTicksAndRejections (internal/process/task_queues.js:94:5)
at Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20)
at Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22)
at async Promise.all (index 0)
at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5)
at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)
尝试#2,使用变量 jsonStream
:
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
const jsonStream = readStream.pipe(JSONStream.stringify());
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: jsonStream,
},
});
错误#2:
ReferenceError: ReadableStream is not defined
at Object.getChunk (/.../node_modules/@aws-sdk/lib-storage/src/chunker.ts:22:30)
at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:187:24)
at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:37)
尝试#3:使用stream.PassThrough
:
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
readStream.pipe(JSONStream.stringify()).pipe(uploadStreamFile('benda_mongo.json'));
...
const stream = require('stream');
export const uploadStreamFile = async(fileName) => {
try{
const pass = new stream.PassThrough();
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: pass,
},
});
const res = await upload.done();
log.info('finished uploading file', fileName);
return res;
}
catch(err){
return;
}
};
错误 #3:
'dest.on is not a function at Stream.pipe (internal/streams/legacy.js:30:8'
尝试 #4:mongodb.stream({transform: doc => JSON.stringify...}) 而不是 JSONStream:
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import { env } from '../../../env';
const s3Client = new S3Client({ region: env.AWS_REGION });
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName)
.find('{}')
.limit(5)
.stream({ transform: doc => JSON.stringify(doc) + '\n' });
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: readStream,
},
});
await upload.done();
}
catch (err) {
log.error('waaaaa', err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
错误:#4:
TypeError [ERR_INVALID_ARG_TYPE]: The first argument must be one of
type string, Buffer, ArrayBuffer, Array, or Array-like Object.
Received type object
at Function.from (buffer.js:305:9)
at getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18)
at processTicksAndRejections (internal/process/task_queues.js:94:5)
at Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20)
at Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22)
at async Promise.all (index 0)
at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5)
at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)
尝试 #5:使用 stream.PassThrough()
和 return pass
到 pipe
:
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream({ transform: doc => JSON.stringify(doc) + '\n' });
readStream.pipe(uploadStreamFile());
}
catch (err) {
log.error('waaaaa', err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
const stream = require('stream');
export const uploadStreamFile = async() => {
try{
const pass = new stream.PassThrough();
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: pass,
},
});
await upload.done();
return pass;
}
catch(err){
log.error('pawoooooo', err);
return;
}
};
错误 #5:
TypeError: dest.on is not a function
at Cursor.pipe (_stream_readable.js:680:8)
查看错误堆栈跟踪后,问题可能与以下事实有关:MongoDB 驱动程序 provides a cursor in object mode 而 Upload
的 Body
参数需要一个传统流,适合Buffer
在这种情况下处理。
以您的原始代码为参考,您可以尝试提供一个Transform
流来处理这两个需求。
请考虑以下代码:
import { Transform } from 'stream';
import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
// We are creating here a Transform to adapt both sides
const toJSONTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(JSON.stringify(chunk) + '\n');
callback();
}
});
readStream.pipe(toJSONTransform);
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: toJSONTransform,
},
});
await upload.done();
}
catch (err) {
log.error(err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
在代码中,在toJSONTransform
中我们将流的可写部分定义为对象模式;相反,可读部分将适合从 S3 Upload
方法读取...至少,我希望如此。
关于您报告的第二个错误,即与 dest.on
相关的错误,我最初认为,我写信给您的可能性是,该错误是有动机的,因为在 uploadStreamFile
中您返回了一个Promise
,不是流,您将 Promise
传递给需要流的 pipe
方法,基本上是您返回了错误的变量。但是我没有意识到您正在尝试将 PassThrough
流作为参数传递给 Upload
方法:请注意该流不包含任何信息,因为您没有传递任何信息对它来说,从 MongoDB 查询获得的可读流的内容永远不会传递给回调,也不会传递给 Upload
本身。
我发现使用 stream.PassThrough
的其他解决方案,使用 JSONStream 将流式传输对象数组,而不是一个接一个地传输:
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const passThroughStream = new stream.PassThrough();
const readStream = db.collection(collectionName)
.find('{}')
.stream();
readStream.on('end', () => passThroughStream.end());
readStream.pipe(JSONStream.stringify()).pipe(passThroughStream);
await uploadStreamFile('benda_mongo.json', passThroughStream);
}
catch (err) {
log.error(err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
export const uploadStreamFile = async(fileName, stream) => {
try{
log.info('start uploading file', fileName);
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: `${fileName}`,
Body: stream,
},
});
const res = await upload.done();
log.info('finished uploading file', fileName);
return res;
}
catch(err){
log.error(err);
return;
}
};
我正在尝试使用新版本的@aws-sdk/lib-storage将JSON从MongoDB流式传输到S3:
"@aws-sdk/client-s3": "^3.17.0"
"@aws-sdk/lib-storage": "^3.34.0"
"JSONStream": "^1.3.5",
尝试 #1:我似乎没有正确使用 JSONStream.stringify():
import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
readStream.pipe(JSONStream.stringify());
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: readStream,
},
});
await upload.done();
}
catch (err) {
log.error(err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
错误 #1:
TypeError [ERR_INVALID_ARG_TYPE]: The first argument must be one of type string, Buffer, ArrayBuffer, Array, or Array-like Object. Received type object at Function.from (buffer.js:305:9) at getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) at processTicksAndRejections (internal/process/task_queues.js:94:5) at Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) at Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) at async Promise.all (index 0) at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)
尝试#2,使用变量 jsonStream
:
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
const jsonStream = readStream.pipe(JSONStream.stringify());
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: jsonStream,
},
});
错误#2:
ReferenceError: ReadableStream is not defined at Object.getChunk (/.../node_modules/@aws-sdk/lib-storage/src/chunker.ts:22:30) at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:187:24) at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:37)
尝试#3:使用stream.PassThrough
:
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
readStream.pipe(JSONStream.stringify()).pipe(uploadStreamFile('benda_mongo.json'));
...
const stream = require('stream');
export const uploadStreamFile = async(fileName) => {
try{
const pass = new stream.PassThrough();
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: pass,
},
});
const res = await upload.done();
log.info('finished uploading file', fileName);
return res;
}
catch(err){
return;
}
};
错误 #3:
'dest.on is not a function at Stream.pipe (internal/streams/legacy.js:30:8'
尝试 #4:mongodb.stream({transform: doc => JSON.stringify...}) 而不是 JSONStream:
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import { env } from '../../../env';
const s3Client = new S3Client({ region: env.AWS_REGION });
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName)
.find('{}')
.limit(5)
.stream({ transform: doc => JSON.stringify(doc) + '\n' });
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: readStream,
},
});
await upload.done();
}
catch (err) {
log.error('waaaaa', err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
错误:#4:
TypeError [ERR_INVALID_ARG_TYPE]: The first argument must be one of type string, Buffer, ArrayBuffer, Array, or Array-like Object. Received type object at Function.from (buffer.js:305:9) at getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) at processTicksAndRejections (internal/process/task_queues.js:94:5) at Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) at Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) at async Promise.all (index 0) at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)
尝试 #5:使用 stream.PassThrough()
和 return pass
到 pipe
:
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream({ transform: doc => JSON.stringify(doc) + '\n' });
readStream.pipe(uploadStreamFile());
}
catch (err) {
log.error('waaaaa', err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
const stream = require('stream');
export const uploadStreamFile = async() => {
try{
const pass = new stream.PassThrough();
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: pass,
},
});
await upload.done();
return pass;
}
catch(err){
log.error('pawoooooo', err);
return;
}
};
错误 #5:
TypeError: dest.on is not a function at Cursor.pipe (_stream_readable.js:680:8)
查看错误堆栈跟踪后,问题可能与以下事实有关:MongoDB 驱动程序 provides a cursor in object mode 而 Upload
的 Body
参数需要一个传统流,适合Buffer
在这种情况下处理。
以您的原始代码为参考,您可以尝试提供一个Transform
流来处理这两个需求。
请考虑以下代码:
import { Transform } from 'stream';
import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
// We are creating here a Transform to adapt both sides
const toJSONTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(JSON.stringify(chunk) + '\n');
callback();
}
});
readStream.pipe(toJSONTransform);
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: toJSONTransform,
},
});
await upload.done();
}
catch (err) {
log.error(err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
在代码中,在toJSONTransform
中我们将流的可写部分定义为对象模式;相反,可读部分将适合从 S3 Upload
方法读取...至少,我希望如此。
关于您报告的第二个错误,即与 dest.on
相关的错误,我最初认为,我写信给您的可能性是,该错误是有动机的,因为在 uploadStreamFile
中您返回了一个Promise
,不是流,您将 Promise
传递给需要流的 pipe
方法,基本上是您返回了错误的变量。但是我没有意识到您正在尝试将 PassThrough
流作为参数传递给 Upload
方法:请注意该流不包含任何信息,因为您没有传递任何信息对它来说,从 MongoDB 查询获得的可读流的内容永远不会传递给回调,也不会传递给 Upload
本身。
我发现使用 stream.PassThrough
的其他解决方案,使用 JSONStream 将流式传输对象数组,而不是一个接一个地传输:
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const passThroughStream = new stream.PassThrough();
const readStream = db.collection(collectionName)
.find('{}')
.stream();
readStream.on('end', () => passThroughStream.end());
readStream.pipe(JSONStream.stringify()).pipe(passThroughStream);
await uploadStreamFile('benda_mongo.json', passThroughStream);
}
catch (err) {
log.error(err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
export const uploadStreamFile = async(fileName, stream) => {
try{
log.info('start uploading file', fileName);
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: `${fileName}`,
Body: stream,
},
});
const res = await upload.done();
log.info('finished uploading file', fileName);
return res;
}
catch(err){
log.error(err);
return;
}
};