Amazon Kinesis Firehose 是否以编程方式支持数据转换?
Does Amazon Kinesis Firehose support Data Transformations programatically?
我有一个用例,我必须验证发送到 Kinesis firehose 的有效负载确实已发送。
为了做到这一点,我想出了链 Firehose -> Firehose 数据转换(使用 lambda) -> DDB -> 检查 DDB 中的有效载荷(有效载荷是 DDB 中的哈希键)。我必须以编程方式一次定义整个链。数据转换同http://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html。
我正在做这一切,因为我无法完全控制它转到的 S3 存储桶中的文件名。所以我需要将确切的有效负载发送到某个持久键值存储中。
问题是
http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesisfirehose/model/CreateDeliveryStreamRequest.html 似乎不支持添加数据转换 lambda。
我的问题是,这是否可以不接触控制台一次(完全通过 AWS Kinesis Firehose API)。
或者有任何替代建议以某种方式将数据移动到 DDB。
好吧,经过大量的努力和文档搜寻,我想通了。
您必须使用 lambda ARN 定义处理配置才能定义数据转换。
下面是这样的配置在代码中的样子。
final ProcessingConfiguration processingConfiguration =
new ProcessingConfiguration().withEnabled(true)
.withProcessors(newProcessor().withType(ProcessorType.Lambda)
.withParameters(new ProcessorParameter().withParameterName(LambdaArn)
.withParameterValue(lamdbaFunctionArn)));
final CreateDeliveryStreamResult describeDeliveryStreamResult =
client.createDeliveryStream(new CreateDeliveryStreamRequest().withExtendedS3DestinationConfiguration(
new ExtendedS3DestinationConfiguration()
.withBucketARN(s3BucketARN)
.withRoleARN(roleArn)
.withPrefix(keyPrefix)
.withProcessingConfiguration(processingConfiguration))
.withDeliveryStreamName(streamName));
此处的 ARN 是各种对象(S3 目标、数据转换 lambda、IAM 角色等)的资源名称。
您可以使用 lambda 函数来指定任务。
https://github.com/hixichen/golang_lamda_decode_protobuf_firehose
我有一个用例,我必须验证发送到 Kinesis firehose 的有效负载确实已发送。
为了做到这一点,我想出了链 Firehose -> Firehose 数据转换(使用 lambda) -> DDB -> 检查 DDB 中的有效载荷(有效载荷是 DDB 中的哈希键)。我必须以编程方式一次定义整个链。数据转换同http://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html。
我正在做这一切,因为我无法完全控制它转到的 S3 存储桶中的文件名。所以我需要将确切的有效负载发送到某个持久键值存储中。
问题是
http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesisfirehose/model/CreateDeliveryStreamRequest.html 似乎不支持添加数据转换 lambda。
我的问题是,这是否可以不接触控制台一次(完全通过 AWS Kinesis Firehose API)。
或者有任何替代建议以某种方式将数据移动到 DDB。
好吧,经过大量的努力和文档搜寻,我想通了。
您必须使用 lambda ARN 定义处理配置才能定义数据转换。
下面是这样的配置在代码中的样子。
final ProcessingConfiguration processingConfiguration =
new ProcessingConfiguration().withEnabled(true)
.withProcessors(newProcessor().withType(ProcessorType.Lambda)
.withParameters(new ProcessorParameter().withParameterName(LambdaArn)
.withParameterValue(lamdbaFunctionArn)));
final CreateDeliveryStreamResult describeDeliveryStreamResult =
client.createDeliveryStream(new CreateDeliveryStreamRequest().withExtendedS3DestinationConfiguration(
new ExtendedS3DestinationConfiguration()
.withBucketARN(s3BucketARN)
.withRoleARN(roleArn)
.withPrefix(keyPrefix)
.withProcessingConfiguration(processingConfiguration))
.withDeliveryStreamName(streamName));
此处的 ARN 是各种对象(S3 目标、数据转换 lambda、IAM 角色等)的资源名称。
您可以使用 lambda 函数来指定任务。 https://github.com/hixichen/golang_lamda_decode_protobuf_firehose