Node.js AWS Lambda 程序包中的代码不会调用 putRecord() 将数据添加到 AWS Kinesis Firehose Stream
Code in Node.js AWS Lambda Package will not call putRecord() for Adding Data into AWS Kinesis Firehose Stream
我构建了一个 AWS Lambda 部署包(使用 Node.js),当对象被 PUT 到指定的 S3 存储桶时执行。我已经将要执行的代码配置为暂时将 10 条随机生成的记录添加到 Kinesis Firehose 流中。
除了将记录添加到 Kinesis 流中之外,Lambda 函数工作正常。我在 AWS CloudWatch 日志中没有看到任何错误消息。使用 console.log() 打印语句,似乎 putRecord() 调用甚至没有执行,我不知道为什么。其他人能解决这个问题吗?
这是我的 Lambda 函数部署包中的代码:
console.log('Loading function');
var aws = require('aws-sdk');
var s3 = new aws.S3({ apiVersion: '2006-03-01' });
var zlib = require('zlib');
function _writeToKinesis(kinesis) {
var currTime = new Date().getMilliseconds();
var sensor = 'sensor-' + Math.floor(Math.random() * 100000);
var reading = Math.floor(Math.random() * 1000000);
var record = JSON.stringify({
time : currTime,
sensor : sensor,
reading : reading
});
console.log("record: " + record);
var recordParams = {
Data : record,
PartitionKey : sensor,
StreamName : "my_firehose"
};
console.log("recordParams: " + recordParams);
kinesis.putRecord(recordParams, function(err, data) {
console.log("test");
if (err) {
console.log(err);
}
else {
console.log('Successfully sent data to Kinesis.');
}
});
}
exports.handler = function(event, context) {
//console.log('Received event:', JSON.stringify(event, null, 2));
// Get the object from the event and show its content type
var record = event.Records[0];
var bucket = record.s3.bucket.name;
var key = record.s3.object.key;
var params = {
Bucket: bucket,
Key: key
};
s3.getObject(params, function(err, data) {
if (err) {
console.log(err);
var message = "Error getting object " + key + " from bucket " + bucket +
". Make sure they exist and your bucket is in the same region as this function.";
console.log(message);
context.fail(message);
} else {
console.log('CONTENT TYPE:', data.ContentType);
var kinesis = new aws.Kinesis({ apiVersion: '2013-12-02', region : "us-east-1"});
var count = 0;
while (count < 10) {
setTimeout(_writeToKinesis(kinesis), 1000);
count++;
}
context.succeed("OK");
}
});
};
这里是 CloudWatch 日志输出:
START RequestId: c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 Version: $LATEST
2015-10-18T20:13:59.743Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 CONTENT TYPE: application/zip
2015-10-18T20:13:59.861Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":860,"sensor":"sensor-12149","reading":146264}
2015-10-18T20:13:59.861Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:13:59.980Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":924,"sensor":"sensor-86345","reading":956735}
2015-10-18T20:13:59.980Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:13:59.982Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":982,"sensor":"sensor-4925","reading":822265}
2015-10-18T20:13:59.982Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:14:00.060Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":60,"sensor":"sensor-40822","reading":796150}
2015-10-18T20:14:00.060Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:14:00.061Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":61,"sensor":"sensor-92861","reading":855213}
2015-10-18T20:14:00.061Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:14:00.063Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":63,"sensor":"sensor-84324","reading":155159}
2015-10-18T20:14:00.063Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:14:00.121Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":121,"sensor":"sensor-54930","reading":365471}
2015-10-18T20:14:00.121Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:14:00.122Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":122,"sensor":"sensor-1330","reading":981637}
2015-10-18T20:14:00.122Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:14:00.123Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":123,"sensor":"sensor-92245","reading":634723}
2015-10-18T20:14:00.123Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:14:00.161Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":161,"sensor":"sensor-29594","reading":227706}
2015-10-18T20:14:00.161Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
END RequestId: c3f9e9d3-75d4-11e5-a597-f7dcab9cd666
REPORT RequestId: c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 Duration: 1258.07 ms Billed Duration: 1300 ms Memory Size: 128 MB Max Memory Used: 15 MB
"OK"
P.S。我有一个与 Lambda 函数关联的 IAM 角色,该角色配置了从 S3 读取以及写入 Kinesis 的策略。
我在尝试执行 listStreams() 时发现了问题。它只打印出 Kinesis 流,而不是 Firehose 流。我曾假设,在 API 中,Firehose 在 Kinesis 保护伞下。然而,Firehose 是独立的 API。
此外,我 运行 遇到了另一个问题,此处发布了解决方法:Running AWS Firehose in lambda.js gives an undefined error。目前,如果您将 Firehose API 与 Lambda 一起使用,则需要在 Lambda 函数部署包 (npm install aws-sdk) 中包含 aws-sdk 模块。显然有一个为亚马逊创建的票来解决这个问题。
我构建了一个 AWS Lambda 部署包(使用 Node.js),当对象被 PUT 到指定的 S3 存储桶时执行。我已经将要执行的代码配置为暂时将 10 条随机生成的记录添加到 Kinesis Firehose 流中。
除了将记录添加到 Kinesis 流中之外,Lambda 函数工作正常。我在 AWS CloudWatch 日志中没有看到任何错误消息。使用 console.log() 打印语句,似乎 putRecord() 调用甚至没有执行,我不知道为什么。其他人能解决这个问题吗?
这是我的 Lambda 函数部署包中的代码:
console.log('Loading function');
var aws = require('aws-sdk');
var s3 = new aws.S3({ apiVersion: '2006-03-01' });
var zlib = require('zlib');
function _writeToKinesis(kinesis) {
var currTime = new Date().getMilliseconds();
var sensor = 'sensor-' + Math.floor(Math.random() * 100000);
var reading = Math.floor(Math.random() * 1000000);
var record = JSON.stringify({
time : currTime,
sensor : sensor,
reading : reading
});
console.log("record: " + record);
var recordParams = {
Data : record,
PartitionKey : sensor,
StreamName : "my_firehose"
};
console.log("recordParams: " + recordParams);
kinesis.putRecord(recordParams, function(err, data) {
console.log("test");
if (err) {
console.log(err);
}
else {
console.log('Successfully sent data to Kinesis.');
}
});
}
exports.handler = function(event, context) {
//console.log('Received event:', JSON.stringify(event, null, 2));
// Get the object from the event and show its content type
var record = event.Records[0];
var bucket = record.s3.bucket.name;
var key = record.s3.object.key;
var params = {
Bucket: bucket,
Key: key
};
s3.getObject(params, function(err, data) {
if (err) {
console.log(err);
var message = "Error getting object " + key + " from bucket " + bucket +
". Make sure they exist and your bucket is in the same region as this function.";
console.log(message);
context.fail(message);
} else {
console.log('CONTENT TYPE:', data.ContentType);
var kinesis = new aws.Kinesis({ apiVersion: '2013-12-02', region : "us-east-1"});
var count = 0;
while (count < 10) {
setTimeout(_writeToKinesis(kinesis), 1000);
count++;
}
context.succeed("OK");
}
});
};
这里是 CloudWatch 日志输出:
START RequestId: c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 Version: $LATEST
2015-10-18T20:13:59.743Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 CONTENT TYPE: application/zip
2015-10-18T20:13:59.861Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":860,"sensor":"sensor-12149","reading":146264}
2015-10-18T20:13:59.861Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:13:59.980Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":924,"sensor":"sensor-86345","reading":956735}
2015-10-18T20:13:59.980Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:13:59.982Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":982,"sensor":"sensor-4925","reading":822265}
2015-10-18T20:13:59.982Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:14:00.060Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":60,"sensor":"sensor-40822","reading":796150}
2015-10-18T20:14:00.060Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:14:00.061Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":61,"sensor":"sensor-92861","reading":855213}
2015-10-18T20:14:00.061Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:14:00.063Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":63,"sensor":"sensor-84324","reading":155159}
2015-10-18T20:14:00.063Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:14:00.121Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":121,"sensor":"sensor-54930","reading":365471}
2015-10-18T20:14:00.121Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:14:00.122Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":122,"sensor":"sensor-1330","reading":981637}
2015-10-18T20:14:00.122Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:14:00.123Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":123,"sensor":"sensor-92245","reading":634723}
2015-10-18T20:14:00.123Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
2015-10-18T20:14:00.161Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 record: {"time":161,"sensor":"sensor-29594","reading":227706}
2015-10-18T20:14:00.161Z c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 recordParams: [object Object]
END RequestId: c3f9e9d3-75d4-11e5-a597-f7dcab9cd666
REPORT RequestId: c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 Duration: 1258.07 ms Billed Duration: 1300 ms Memory Size: 128 MB Max Memory Used: 15 MB
"OK"
P.S。我有一个与 Lambda 函数关联的 IAM 角色,该角色配置了从 S3 读取以及写入 Kinesis 的策略。
我在尝试执行 listStreams() 时发现了问题。它只打印出 Kinesis 流,而不是 Firehose 流。我曾假设,在 API 中,Firehose 在 Kinesis 保护伞下。然而,Firehose 是独立的 API。
此外,我 运行 遇到了另一个问题,此处发布了解决方法:Running AWS Firehose in lambda.js gives an undefined error。目前,如果您将 Firehose API 与 Lambda 一起使用,则需要在 Lambda 函数部署包 (npm install aws-sdk) 中包含 aws-sdk 模块。显然有一个为亚马逊创建的票来解决这个问题。