Lambda 触发器无法按预期处理批量数据
Lambda trigger is not working as intended with bulk data
我正在使用 lambda 触发器来检测对 DynamoDB table(推文)的插入。一旦触发,我想在事件中获取消息,并使用 Comprehend 获取它的情绪。然后我想更新第二个 DynamoDB table (SentimentAnalysis),其中我根据情绪将 + 1 添加到一个值。
如果我手动插入单个项目,这很好用,但我希望能够使用 Twitter API 将批量数据插入我的 DynamoDB table 并分析每条推文的情绪。如果 Twitter 参数中指定的计数 <= 5,则 lambda 函数可以正常工作,但以上任何内容都会导致 SentimentAnalysis table 中的更新出现问题,并且触发器会不断重复自身,没有任何进展或停止的迹象.
这是我的 lambda 代码:
let AWS = require("aws-sdk");
let comprehend = new AWS.Comprehend();
let documentClient = new AWS.DynamoDB.DocumentClient();
exports.handler = (event, context) => {
event.Records.forEach(record => {
if (record.eventName == "INSERT") {
//console.log(JSON.stringify(record.dynamodb.NewImage.tweet.S));
let params = {
LanguageCode: "en",
Text: JSON.stringify(record.dynamodb.NewImage.tweet.S)
};
comprehend.detectSentiment(params, (err, data) => {
if (err) {
console.log("\nError with call to Comprehend:\n " + JSON.stringify(err));
} else {
console.log("\nSuccessful call to Comprehend:\n " + data.Sentiment);
//when comprehend is successful, update the sentiment analysis data
//we can use the ADD expression to increment the value of a number
let sentimentParams = {
TableName: "SentimentAnalysis",
Key: {
city: record.dynamodb.NewImage.city.S,
},
UpdateExpression: "ADD " + data.Sentiment.toLowerCase() + " :pr",
ExpressionAttributeValues: {
":pr": 1
}
};
documentClient.update(sentimentParams, (err, data) => {
if (err) {
console.error("Unable to read item " + JSON.stringify(sentimentParams.TableName));
} else {
console.log("Successful Update: " + JSON.stringify(data));
}
});
}
});
}
});
};
This is the image of a successful call, it works with the first few tweets
This is the unsuccessful call right after the first image. The request is always timed out
超时是它反复发生的原因。如果 lambda 超时或出现其他错误,将导致重新处理该批次。你需要处理这个,因为交付是“至少一次”。您还需要找出超时的原因。它可能像更小的批次一样简单,也可能是使用阶跃函数的更复杂的解决方案。您可能只能增加 lambda 的超时时间。
我正在使用 lambda 触发器来检测对 DynamoDB table(推文)的插入。一旦触发,我想在事件中获取消息,并使用 Comprehend 获取它的情绪。然后我想更新第二个 DynamoDB table (SentimentAnalysis),其中我根据情绪将 + 1 添加到一个值。
如果我手动插入单个项目,这很好用,但我希望能够使用 Twitter API 将批量数据插入我的 DynamoDB table 并分析每条推文的情绪。如果 Twitter 参数中指定的计数 <= 5,则 lambda 函数可以正常工作,但以上任何内容都会导致 SentimentAnalysis table 中的更新出现问题,并且触发器会不断重复自身,没有任何进展或停止的迹象.
这是我的 lambda 代码:
let AWS = require("aws-sdk");
let comprehend = new AWS.Comprehend();
let documentClient = new AWS.DynamoDB.DocumentClient();
exports.handler = (event, context) => {
event.Records.forEach(record => {
if (record.eventName == "INSERT") {
//console.log(JSON.stringify(record.dynamodb.NewImage.tweet.S));
let params = {
LanguageCode: "en",
Text: JSON.stringify(record.dynamodb.NewImage.tweet.S)
};
comprehend.detectSentiment(params, (err, data) => {
if (err) {
console.log("\nError with call to Comprehend:\n " + JSON.stringify(err));
} else {
console.log("\nSuccessful call to Comprehend:\n " + data.Sentiment);
//when comprehend is successful, update the sentiment analysis data
//we can use the ADD expression to increment the value of a number
let sentimentParams = {
TableName: "SentimentAnalysis",
Key: {
city: record.dynamodb.NewImage.city.S,
},
UpdateExpression: "ADD " + data.Sentiment.toLowerCase() + " :pr",
ExpressionAttributeValues: {
":pr": 1
}
};
documentClient.update(sentimentParams, (err, data) => {
if (err) {
console.error("Unable to read item " + JSON.stringify(sentimentParams.TableName));
} else {
console.log("Successful Update: " + JSON.stringify(data));
}
});
}
});
}
});
};
This is the image of a successful call, it works with the first few tweets
This is the unsuccessful call right after the first image. The request is always timed out
超时是它反复发生的原因。如果 lambda 超时或出现其他错误,将导致重新处理该批次。你需要处理这个,因为交付是“至少一次”。您还需要找出超时的原因。它可能像更小的批次一样简单,也可能是使用阶跃函数的更复杂的解决方案。您可能只能增加 lambda 的超时时间。