Amazon Kinesis Firehose 是否以编程方式支持数据转换?

Does Amazon Kinesis Firehose support Data Transformations programatically?

我有一个用例,我必须验证发送到 Kinesis firehose 的有效负载确实已发送。

为了做到这一点,我想出了链 Firehose -> Firehose 数据转换(使用 lambda) -> DDB -> 检查 DDB 中的有效载荷(有效载荷是 DDB 中的哈希键)。我必须以编程方式一次定义整个链。数据转换同http://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

我正在做这一切,因为我无法完全控制它转到的 S3 存储桶中的文件名。所以我需要将确切的有效负载发送到某个持久键值存储中。

问题是

http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesisfirehose/model/CreateDeliveryStreamRequest.html 似乎不支持添加数据转换 lambda。

我的问题是,这是否可以不接触控制台一次(完全通过 AWS Kinesis Firehose API)。

或者有任何替代建议以某种方式将数据移动到 DDB。

好吧,经过大量的努力和文档搜寻,我想通了。

您必须使用 lambda ARN 定义处理配置才能定义数据转换。

http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesisfirehose/model/ProcessingConfiguration.html

下面是这样的配置在代码中的样子。

final ProcessingConfiguration processingConfiguration =
      new ProcessingConfiguration().withEnabled(true)
         .withProcessors(newProcessor().withType(ProcessorType.Lambda)
         .withParameters(new ProcessorParameter().withParameterName(LambdaArn)
         .withParameterValue(lamdbaFunctionArn)));

final CreateDeliveryStreamResult describeDeliveryStreamResult =
      client.createDeliveryStream(new CreateDeliveryStreamRequest().withExtendedS3DestinationConfiguration(
         new ExtendedS3DestinationConfiguration()
             .withBucketARN(s3BucketARN)
             .withRoleARN(roleArn)
             .withPrefix(keyPrefix)
             .withProcessingConfiguration(processingConfiguration))
             .withDeliveryStreamName(streamName));

此处的 ARN 是各种对象(S3 目标、数据转换 lambda、IAM 角色等)的资源名称。

您可以使用 lambda 函数来指定任务。 https://github.com/hixichen/golang_lamda_decode_protobuf_firehose