AWS 使用 DynamoDB Lambda 配置 Kinesis Stream

AWS Configure Kinesis Stream with DynamoDB Lambda

通过关注这个问题,

DynamoDB --> DynamoDBStreams --> Lambda 函数 --> Kinesis Firehose --> Redshift。

如何配置我的 Kinesis 函数以获取 Lambda 函数源?

我创建了一个 DynamoDB table(采购销售),并添加了 DynamoDB 流。然后我配置了 Lambda 函数来获取 DynamoDB 流。我的问题是如何配置 Kinesis 以获取 Lambda 函数源?我知道如何配置 Lambda 转换,但是我想选择作为源。不确定如何配置下面的 Direct Put Source。

谢谢,

执行了这些步骤:

在您的情况下,您会将 dynamodb 流式传输到 redshift

DynamoDB --> DynamoDBStreams --> Lambda Function --> Kinesis Firehose --> Redshift.

首先,您需要一个 lambda 函数来处理 DynamoDBStream。对于每个 DynamoDBStream 事件,使用 firehose PutRecord API 将数据发送到 firehose。来自 example

var firehose = new AWS.Firehose();
firehose.putRecord({
  DeliveryStreamName: 'STRING_VALUE', /* required */
  Record: { /* required */
    Data: new Buffer('...') || 'STRING_VALUE' /* Strings will be Base-64 encoded on your behalf */ /* required */
  }
}, function(err, data) {
  if (err) console.log(err, err.stack); // an error occurred
  else     console.log(data);           // successful response
});

接下来,我们要知道数据是如何插入到RedShift中的。来自firehose document,

For data delivery to Amazon Redshift, Kinesis Firehose first delivers incoming data to your S3 bucket in the format described earlier. Kinesis Firehose then issues an Amazon Redshift COPY command to load the data from your S3 bucket to your Amazon Redshift cluster.

所以,我们应该知道什么数据格式让COPY命令将数据映射到RedShift模式。我们要按照data format requirement for redshift COPY command.

By default, the COPY command expects the source data to be character-delimited UTF-8 text. The default delimiter is a pipe character ( | ).

因此,您可以对输入dynamodb流事件的lambda进行编程,将其转换为管道(|)分隔的行记录,并将其写入firehose。

var firehose = new AWS.Firehose();
firehose.putRecord({
  DeliveryStreamName: 'YOUR_FIREHOSE_NAME',
  Record: { /* required */
    Data: "RED_SHIFT_COLUMN_1_DATA|RED_SHIFT_COLUMN_2_DATA\n"
  }
}, function(err, data) {
  if (err) console.log(err, err.stack); // an error occurred
  else     console.log(data);           // successful response
});

记得添加 \n 因为 firehose 不会为您添加新行。