Node.js AWS Lambda 程序包中的代码不会调用 putRecord() 将数据添加到 AWS Kinesis Firehose Stream

Code in Node.js AWS Lambda Package will not call putRecord() for Adding Data into AWS Kinesis Firehose Stream

我构建了一个 AWS Lambda 部署包(使用 Node.js),当对象被 PUT 到指定的 S3 存储桶时执行。我已经将要执行的代码配置为暂时将 10 条随机生成的记录添加到 Kinesis Firehose 流中。

除了将记录添加到 Kinesis 流中之外,Lambda 函数工作正常。我在 AWS CloudWatch 日志中没有看到任何错误消息。使用 console.log() 打印语句,似乎 putRecord() 调用甚至没有执行,我不知道为什么。其他人能解决这个问题吗?

这是我的 Lambda 函数部署包中的代码:

console.log('Loading function');

var aws = require('aws-sdk');
var s3 = new aws.S3({ apiVersion: '2006-03-01' });
var zlib = require('zlib');

function _writeToKinesis(kinesis) {
    var currTime = new Date().getMilliseconds();
    var sensor = 'sensor-' + Math.floor(Math.random() * 100000);
    var reading = Math.floor(Math.random() * 1000000);

    var record = JSON.stringify({
      time : currTime,
      sensor : sensor,
      reading : reading
    });

    console.log("record: " + record);

    var recordParams = {
      Data : record,
      PartitionKey : sensor,
      StreamName : "my_firehose"
    };

    console.log("recordParams: " + recordParams);

    kinesis.putRecord(recordParams, function(err, data) {
      console.log("test");
      if (err) {
        console.log(err);
      }
      else {
        console.log('Successfully sent data to Kinesis.');
      }
    });
}

exports.handler = function(event, context) {
    //console.log('Received event:', JSON.stringify(event, null, 2));

    // Get the object from the event and show its content type
    var record = event.Records[0];
    var bucket = record.s3.bucket.name;
    var key = record.s3.object.key;
    var params = {
        Bucket: bucket,
        Key: key
    };

    s3.getObject(params, function(err, data) {
        if (err) {
            console.log(err);
            var message = "Error getting object " + key + " from bucket " + bucket +
                ". Make sure they exist and your bucket is in the same region as this function.";
            console.log(message);
            context.fail(message);
        } else {

            console.log('CONTENT TYPE:', data.ContentType);

            var kinesis = new aws.Kinesis({ apiVersion: '2013-12-02', region : "us-east-1"});

            var count = 0;
            while (count < 10) {
              setTimeout(_writeToKinesis(kinesis), 1000);
              count++;
            }

            context.succeed("OK");            
        }
    });
};

这里是 CloudWatch 日志输出:

START RequestId: c3f9e9d3-75d4-11e5-a597-f7dcab9cd666 Version: $LATEST
2015-10-18T20:13:59.743Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    CONTENT TYPE: application/zip
2015-10-18T20:13:59.861Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":860,"sensor":"sensor-12149","reading":146264}
2015-10-18T20:13:59.861Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:13:59.980Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":924,"sensor":"sensor-86345","reading":956735}
2015-10-18T20:13:59.980Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:13:59.982Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":982,"sensor":"sensor-4925","reading":822265}
2015-10-18T20:13:59.982Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:14:00.060Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":60,"sensor":"sensor-40822","reading":796150}
2015-10-18T20:14:00.060Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:14:00.061Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":61,"sensor":"sensor-92861","reading":855213}
2015-10-18T20:14:00.061Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:14:00.063Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":63,"sensor":"sensor-84324","reading":155159}
2015-10-18T20:14:00.063Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:14:00.121Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":121,"sensor":"sensor-54930","reading":365471}
2015-10-18T20:14:00.121Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:14:00.122Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":122,"sensor":"sensor-1330","reading":981637}
2015-10-18T20:14:00.122Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:14:00.123Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":123,"sensor":"sensor-92245","reading":634723}
2015-10-18T20:14:00.123Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
2015-10-18T20:14:00.161Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    record: {"time":161,"sensor":"sensor-29594","reading":227706}
2015-10-18T20:14:00.161Z    c3f9e9d3-75d4-11e5-a597-f7dcab9cd666    recordParams: [object Object]
END RequestId: c3f9e9d3-75d4-11e5-a597-f7dcab9cd666
REPORT RequestId: c3f9e9d3-75d4-11e5-a597-f7dcab9cd666  Duration: 1258.07 ms    Billed Duration: 1300 ms    Memory Size: 128 MB Max Memory Used: 15 MB

"OK"

P.S。我有一个与 Lambda 函数关联的 IAM 角色,该角色配置了从 S3 读取以及写入 Kinesis 的策略。

我在尝试执行 listStreams() 时发现了问题。它只打印出 Kinesis 流,而不是 Firehose 流。我曾假设,在 API 中,Firehose 在 Kinesis 保护伞下。然而,Firehose 是独立的 API。

此外,我 运行 遇到了另一个问题,此处发布了解决方法:Running AWS Firehose in lambda.js gives an undefined error。目前,如果您将 Firehose API 与 Lambda 一起使用,则需要在 Lambda 函数部署包 (npm install aws-sdk) 中包含 aws-sdk 模块。显然有一个为亚马逊创建的票来解决这个问题。