Kinesis lambda DynamoDB
Kinesis lambda DynamoDB
我正在为一个用例学习 AWS 服务。在浏览了文档之后,我想出了一个简单的流程。我想使用 Streams API 和 KPL 将数据提取到 Kinesis 流中。我使用示例 putRecord 方法将数据提取到流中。我正在将此 JSON 提取到流中 -
{"userid":1234,"username":"jDoe","firstname":"John","lastname":"Doe"}
摄取数据后,我在 putRecordResult 中得到以下响应 -
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318638512162631666140828401666}
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318645765717549353915876638722}
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318649392495008197803400757250}
现在我编写了一个 Lambda 函数来获取这些数据并将其推送到 DynamoDB table。这是我的 Lambda 函数 -
console.log('Loading function');
var AWS = require('aws-sdk');
var tableName = "sampleTable";
var doc = require('dynamodb-doc');
var db = new doc.DynamoDB();
exports.handler = (event, context, callback) => {
//console.log('Received event:', JSON.stringify(event, null, 2));
event.Records.forEach((record) => {
// Kinesis data is base64 encoded so decode here
const payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
var userid = event.userid;
var username = event.username;
var firstname = event.firstname;
console.log(userid + "," + username +","+ firstname);
var item = {
"userid" : userid,
"username" : username,
"firstname" : firstname
};
var params = {
TableName : tableName,
Item : item
};
console.log(params);
db.putItem(params, function(err, data){
if(err) console.log(err);
else console.log(data);
});
});
callback(null, `Successfully processed ${event.Records.length} records.`);
};
不知何故,我无法在 lambda 函数执行中看到 console.logs。我在流页面中看到流中有 putRecord 和 get,但不知何故我在 Lambdafunction 页面和 DynamoDB table 中看不到任何内容。
我有一个 Java 代码的 IAM 策略,用于将数据摄取到 Kinesis 中,另一个用于 Lambda 函数,即 lambda-kinesis-execution-role 和一个 DynamoDB 摄取数据的策略进入 tables.
是否有任何教程说明如何以正确的方式完成它?我感觉我在这个过程中遗漏了很多要点,例如如何 link 所有这些 IAM 策略并使它们同步,以便当数据被放入流中时,它由 Lambda 处理并结束在 Dynamo 中?
非常感谢任何指点和帮助。
如果您上面的代码是您正在使用的代码的直接副本,那么您引用的是 event.userid
,但您应该使用 payload.userid
。您已将 Kinesis 记录解码为有效负载变量。
您可以使用 Lambda 函数
1.Create Kinesis 和 Dynamodb 的 IAM 角色
2.Now 根据 dynamodb-process-stream
的蓝图创建 Lambda 函数
3.Select 我们从 IAM 创建的执行角色
4.Click 创建函数
现在转到编辑代码部分并编写以下代码
const AWS =require('aws-sdk');
const docClient =new AWS.DynamoDB.DocumentClient({region : 'us-east-1'});
exports.handler = (event, context, callback) => {
event.Records.forEach((record) => {
var params={
Item :{
ROWTIME:Date.now().toString(),//Dynamodb column name
DATA:new Buffer(record.kinesis.data, base64').toString('ascii')//Dynamodb column name
},
TableName:'mytable'//Dynamodb Table Name
};
docClient.put(params,function(err,data){
if(err){
callback(err,null);
}
else{
callback(null,data);
}
});
});
};
我正在为一个用例学习 AWS 服务。在浏览了文档之后,我想出了一个简单的流程。我想使用 Streams API 和 KPL 将数据提取到 Kinesis 流中。我使用示例 putRecord 方法将数据提取到流中。我正在将此 JSON 提取到流中 -
{"userid":1234,"username":"jDoe","firstname":"John","lastname":"Doe"}
摄取数据后,我在 putRecordResult 中得到以下响应 -
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318638512162631666140828401666}
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318645765717549353915876638722}
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318649392495008197803400757250}
现在我编写了一个 Lambda 函数来获取这些数据并将其推送到 DynamoDB table。这是我的 Lambda 函数 -
console.log('Loading function');
var AWS = require('aws-sdk');
var tableName = "sampleTable";
var doc = require('dynamodb-doc');
var db = new doc.DynamoDB();
exports.handler = (event, context, callback) => {
//console.log('Received event:', JSON.stringify(event, null, 2));
event.Records.forEach((record) => {
// Kinesis data is base64 encoded so decode here
const payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
var userid = event.userid;
var username = event.username;
var firstname = event.firstname;
console.log(userid + "," + username +","+ firstname);
var item = {
"userid" : userid,
"username" : username,
"firstname" : firstname
};
var params = {
TableName : tableName,
Item : item
};
console.log(params);
db.putItem(params, function(err, data){
if(err) console.log(err);
else console.log(data);
});
});
callback(null, `Successfully processed ${event.Records.length} records.`);
};
不知何故,我无法在 lambda 函数执行中看到 console.logs。我在流页面中看到流中有 putRecord 和 get,但不知何故我在 Lambdafunction 页面和 DynamoDB table 中看不到任何内容。
我有一个 Java 代码的 IAM 策略,用于将数据摄取到 Kinesis 中,另一个用于 Lambda 函数,即 lambda-kinesis-execution-role 和一个 DynamoDB 摄取数据的策略进入 tables.
是否有任何教程说明如何以正确的方式完成它?我感觉我在这个过程中遗漏了很多要点,例如如何 link 所有这些 IAM 策略并使它们同步,以便当数据被放入流中时,它由 Lambda 处理并结束在 Dynamo 中?
非常感谢任何指点和帮助。
如果您上面的代码是您正在使用的代码的直接副本,那么您引用的是 event.userid
,但您应该使用 payload.userid
。您已将 Kinesis 记录解码为有效负载变量。
您可以使用 Lambda 函数
1.Create Kinesis 和 Dynamodb 的 IAM 角色
2.Now 根据 dynamodb-process-stream
的蓝图创建 Lambda 函数3.Select 我们从 IAM 创建的执行角色
4.Click 创建函数 现在转到编辑代码部分并编写以下代码
const AWS =require('aws-sdk');
const docClient =new AWS.DynamoDB.DocumentClient({region : 'us-east-1'});
exports.handler = (event, context, callback) => {
event.Records.forEach((record) => {
var params={
Item :{
ROWTIME:Date.now().toString(),//Dynamodb column name
DATA:new Buffer(record.kinesis.data, base64').toString('ascii')//Dynamodb column name
},
TableName:'mytable'//Dynamodb Table Name
};
docClient.put(params,function(err,data){
if(err){
callback(err,null);
}
else{
callback(null,data);
}
});
});
};