AWS Lambda 和节点:在流式传输时写入数据 - 过早结束并且数据丢失
AWS Lambda & Node: Write data while streaming - ends prematurely and data is missing
我有一个由写入 S3 存储桶触发的 Lambda 函数。它读取写入存储桶的 JSON 文件,解析出各个记录,并将它们写入数据库。
问题是;我不确定我做错了什么,因为流结束并且 Lambda 在写入所有数据之前退出。
我在我的可读流上 "flowing mode",在数据库写入期间我在 pausing/resuming。根据文档,这应该可以解决问题,但它没有按预期工作。
Lambda 处理程序:
exports.handler = async (event, context) => {
let result = false;
try {
result = await parseData(event);
} catch (e) {
console.error(e);
}
return result;
};
承诺:
const StreamArray = require("stream-json/streamers/StreamArray");
async parseData(event) {
try {
let objectStream = s3.getObject(params).createReadStream();
const streamParser = StreamArray.withParser();
return new Promise((resolve, reject) => {
objectStream.pipe(streamParser).on("data", async streamData => {
objectStream.pause();
let result = await writeData(streamData);
objectStream.resume();
}).on("finish", () => {
console.log("STREAM FINISH!");
resolve(true);
}).on("error", e => {
console.error("Stream error:", e);
reject(e);
});
});
} catch (e) {
console.error(e);
}
}
通过简单地将 stream-json 换成 JSONStream 就可以正常工作,无论如何,JSONStream 是一个使用更广泛的包。现在效果很好!
const JSONStream = require("JSONStream");
async parseData(event) {
try {
let objectStream = s3.getObject(params).createReadStream();
const streamParser = JSONStream.parse("*");
return new Promise((resolve, reject) => {
objectStream.pipe(streamParser).on("data", async streamData => {
streamParser.pause();
let result = await writeData(streamData);
streamParser.resume();
}).on("finish", () => {
console.log("STREAM FINISH!");
resolve(true);
}).on("error", e => {
console.error("Stream error:", e);
reject(e);
});
});
} catch (e) {
console.error(e);
}
}
我有一个由写入 S3 存储桶触发的 Lambda 函数。它读取写入存储桶的 JSON 文件,解析出各个记录,并将它们写入数据库。
问题是;我不确定我做错了什么,因为流结束并且 Lambda 在写入所有数据之前退出。
我在我的可读流上 "flowing mode",在数据库写入期间我在 pausing/resuming。根据文档,这应该可以解决问题,但它没有按预期工作。
Lambda 处理程序:
exports.handler = async (event, context) => {
let result = false;
try {
result = await parseData(event);
} catch (e) {
console.error(e);
}
return result;
};
承诺:
const StreamArray = require("stream-json/streamers/StreamArray");
async parseData(event) {
try {
let objectStream = s3.getObject(params).createReadStream();
const streamParser = StreamArray.withParser();
return new Promise((resolve, reject) => {
objectStream.pipe(streamParser).on("data", async streamData => {
objectStream.pause();
let result = await writeData(streamData);
objectStream.resume();
}).on("finish", () => {
console.log("STREAM FINISH!");
resolve(true);
}).on("error", e => {
console.error("Stream error:", e);
reject(e);
});
});
} catch (e) {
console.error(e);
}
}
通过简单地将 stream-json 换成 JSONStream 就可以正常工作,无论如何,JSONStream 是一个使用更广泛的包。现在效果很好!
const JSONStream = require("JSONStream");
async parseData(event) {
try {
let objectStream = s3.getObject(params).createReadStream();
const streamParser = JSONStream.parse("*");
return new Promise((resolve, reject) => {
objectStream.pipe(streamParser).on("data", async streamData => {
streamParser.pause();
let result = await writeData(streamData);
streamParser.resume();
}).on("finish", () => {
console.log("STREAM FINISH!");
resolve(true);
}).on("error", e => {
console.error("Stream error:", e);
reject(e);
});
});
} catch (e) {
console.error(e);
}
}