是否可以在 AWS Cloudformation 中指定数据格式转换?

Is it possible to specify data format conversion in AWS Cloudformation?

AWS 文档看起来好像您可以在 cloudformation 中为 AWS::KinesisFirehose::DeliveryStream 指定 DataFormatConversionConfiguration,但不是关于 属性 应该去哪里的文档。尝试将其添加到 PropertiesExtendedS3DestinationConfigurationProcessingConfigurationProcessors 之一下。每次,CF 都会抱怨-

The following resource(s) failed to update: [EventFirehose]. 12:24:32 UTC-0500

UPDATE_FAILED AWS::KinesisFirehose::DeliveryStream EventFirehose Encountered unsupported property DataFormatConversionConfiguration

有自己的文档说-

If you want Kinesis Data Firehose to convert the format of your input data from JSON to Parquet or ORC, specify the optional DataFormatConversionConfiguration element in ExtendedS3DestinationConfiguration or in ExtendedS3DestinationUpdate.

我做错了什么?

根据 SDK documentation, it should be inside ExtendedS3DestinationConfiguration or ExtendedS3DestinationUpdate. However, the cloudformation is currently NOT supporting this property as per this docs. This is a very common discrepancy between cloudformation and other AWS services. One similar issue is mentioned here (最近已解决)。

暂时可以通过SDK更新或者等cloudformation赶上一段时间

If you want Kinesis Data Firehose to convert the format of your input data from JSON to Parquet or ORC, specify the optional DataFormatConversionConfiguration element in ExtendedS3DestinationConfiguration or in ExtendedS3DestinationUpdate

编辑

截至 2019 年 6 月,DataFormatConversionConfiguration 属性 在 CloudFormation 中可用。查看更改日志:https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/ReleaseHistory.html

这里是我如何解决这个任务的。 Firehose strem 以 parquet 格式将数据写入 S3 存储桶:

  LogsCollectionDatabase:
    Type: AWS::Glue::Database
    Properties:
      DatabaseInput:
        Description: Database for Kinesis Analytics
        Name: !Ref DatabaseName
      CatalogId: !Ref AWS::AccountId

  LogsCollectionTable:
    Type: AWS::Glue::Table
    DependsOn: LogsCollectionDatabase
    Properties:
      DatabaseName: !Ref LogsCollectionDatabase
      CatalogId: !Ref AWS::AccountId
      TableInput:
        Name: serverlogs
        Description: Table for storing logs from kinesis
        TableType: EXTERNAL_TABLE
        StorageDescriptor:
          Columns:
            - Type: string
              Name: col1
            - Type: string
              Name: col2
          Location: !Sub s3://${DestinationBucketName}/${DestinationBucketPrefix}
          InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
          OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
          SerdeInfo:
            SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe

  KinesisFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    DependsOn: FirehoseDeliveryIAMPolicy
    Properties:
      DeliveryStreamName: !Ref RegionalStreamName
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !Ref DestinationBucketArn
        Prefix: !Ref DestinationBucketPrefix
        BufferingHints:
          IntervalInSeconds: 60
          SizeInMBs: 64
        ErrorOutputPrefix: errors/
        RoleARN: !GetAtt FirehoseDeliveryIAMRole.Arn
        DataFormatConversionConfiguration:
          Enabled: true
          InputFormatConfiguration:
            Deserializer:
              HiveJsonSerDe: {}
          OutputFormatConfiguration:
            Serializer:
              ParquetSerDe: {}
          SchemaConfiguration:
            CatalogId: !Ref AWS::AccountId
            RoleARN: !GetAtt FirehoseDeliveryIAMRole.Arn
            DatabaseName: !Ref LogsCollectionDatabase
            TableName: !Ref LogsCollectionTable
            Region: !Ref AWS::Region
            VersionId: LATEST

当然,需要为 FirehosStream 定义 IAM 角色和策略