是否可以在 AWS Cloudformation 中指定数据格式转换?
Is it possible to specify data format conversion in AWS Cloudformation?
AWS 文档看起来好像您可以在 cloudformation 中为 AWS::KinesisFirehose::DeliveryStream
指定 DataFormatConversionConfiguration
,但不是关于 属性 应该去哪里的文档。尝试将其添加到 Properties
、ExtendedS3DestinationConfiguration
、ProcessingConfiguration
和 Processors
之一下。每次,CF 都会抱怨-
The following resource(s) failed to update: [EventFirehose].
12:24:32 UTC-0500
UPDATE_FAILED AWS::KinesisFirehose::DeliveryStream EventFirehose Encountered unsupported property DataFormatConversionConfiguration
有自己的文档说-
If you want Kinesis Data Firehose to convert the format of your input data from JSON to Parquet or ORC, specify the optional DataFormatConversionConfiguration element in ExtendedS3DestinationConfiguration or in ExtendedS3DestinationUpdate.
我做错了什么?
根据 SDK documentation, it should be inside ExtendedS3DestinationConfiguration
or ExtendedS3DestinationUpdate
. However, the cloudformation is currently NOT supporting this property as per this docs. This is a very common discrepancy between cloudformation and other AWS services. One similar issue is mentioned here (最近已解决)。
暂时可以通过SDK更新或者等cloudformation赶上一段时间
If you want Kinesis Data Firehose to convert the format of your input data from JSON to Parquet or ORC, specify the optional DataFormatConversionConfiguration element in ExtendedS3DestinationConfiguration or in ExtendedS3DestinationUpdate
编辑
截至 2019 年 6 月,DataFormatConversionConfiguration
属性 在 CloudFormation 中可用。查看更改日志:https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/ReleaseHistory.html
这里是我如何解决这个任务的。 Firehose strem 以 parquet 格式将数据写入 S3 存储桶:
LogsCollectionDatabase:
Type: AWS::Glue::Database
Properties:
DatabaseInput:
Description: Database for Kinesis Analytics
Name: !Ref DatabaseName
CatalogId: !Ref AWS::AccountId
LogsCollectionTable:
Type: AWS::Glue::Table
DependsOn: LogsCollectionDatabase
Properties:
DatabaseName: !Ref LogsCollectionDatabase
CatalogId: !Ref AWS::AccountId
TableInput:
Name: serverlogs
Description: Table for storing logs from kinesis
TableType: EXTERNAL_TABLE
StorageDescriptor:
Columns:
- Type: string
Name: col1
- Type: string
Name: col2
Location: !Sub s3://${DestinationBucketName}/${DestinationBucketPrefix}
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
SerdeInfo:
SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
KinesisFirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
DependsOn: FirehoseDeliveryIAMPolicy
Properties:
DeliveryStreamName: !Ref RegionalStreamName
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !Ref DestinationBucketArn
Prefix: !Ref DestinationBucketPrefix
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 64
ErrorOutputPrefix: errors/
RoleARN: !GetAtt FirehoseDeliveryIAMRole.Arn
DataFormatConversionConfiguration:
Enabled: true
InputFormatConfiguration:
Deserializer:
HiveJsonSerDe: {}
OutputFormatConfiguration:
Serializer:
ParquetSerDe: {}
SchemaConfiguration:
CatalogId: !Ref AWS::AccountId
RoleARN: !GetAtt FirehoseDeliveryIAMRole.Arn
DatabaseName: !Ref LogsCollectionDatabase
TableName: !Ref LogsCollectionTable
Region: !Ref AWS::Region
VersionId: LATEST
当然,需要为 FirehosStream 定义 IAM 角色和策略
AWS 文档看起来好像您可以在 cloudformation 中为 AWS::KinesisFirehose::DeliveryStream
指定 DataFormatConversionConfiguration
,但不是关于 属性 应该去哪里的文档。尝试将其添加到 Properties
、ExtendedS3DestinationConfiguration
、ProcessingConfiguration
和 Processors
之一下。每次,CF 都会抱怨-
The following resource(s) failed to update: [EventFirehose]. 12:24:32 UTC-0500
UPDATE_FAILED AWS::KinesisFirehose::DeliveryStream EventFirehose Encountered unsupported property DataFormatConversionConfiguration
有自己的文档说-
If you want Kinesis Data Firehose to convert the format of your input data from JSON to Parquet or ORC, specify the optional DataFormatConversionConfiguration element in ExtendedS3DestinationConfiguration or in ExtendedS3DestinationUpdate.
我做错了什么?
根据 SDK documentation, it should be inside ExtendedS3DestinationConfiguration
or ExtendedS3DestinationUpdate
. However, the cloudformation is currently NOT supporting this property as per this docs. This is a very common discrepancy between cloudformation and other AWS services. One similar issue is mentioned here
暂时可以通过SDK更新或者等cloudformation赶上一段时间
If you want Kinesis Data Firehose to convert the format of your input data from JSON to Parquet or ORC, specify the optional DataFormatConversionConfiguration element in ExtendedS3DestinationConfiguration or in ExtendedS3DestinationUpdate
编辑
截至 2019 年 6 月,DataFormatConversionConfiguration
属性 在 CloudFormation 中可用。查看更改日志:https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/ReleaseHistory.html
这里是我如何解决这个任务的。 Firehose strem 以 parquet 格式将数据写入 S3 存储桶:
LogsCollectionDatabase:
Type: AWS::Glue::Database
Properties:
DatabaseInput:
Description: Database for Kinesis Analytics
Name: !Ref DatabaseName
CatalogId: !Ref AWS::AccountId
LogsCollectionTable:
Type: AWS::Glue::Table
DependsOn: LogsCollectionDatabase
Properties:
DatabaseName: !Ref LogsCollectionDatabase
CatalogId: !Ref AWS::AccountId
TableInput:
Name: serverlogs
Description: Table for storing logs from kinesis
TableType: EXTERNAL_TABLE
StorageDescriptor:
Columns:
- Type: string
Name: col1
- Type: string
Name: col2
Location: !Sub s3://${DestinationBucketName}/${DestinationBucketPrefix}
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
SerdeInfo:
SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
KinesisFirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
DependsOn: FirehoseDeliveryIAMPolicy
Properties:
DeliveryStreamName: !Ref RegionalStreamName
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !Ref DestinationBucketArn
Prefix: !Ref DestinationBucketPrefix
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 64
ErrorOutputPrefix: errors/
RoleARN: !GetAtt FirehoseDeliveryIAMRole.Arn
DataFormatConversionConfiguration:
Enabled: true
InputFormatConfiguration:
Deserializer:
HiveJsonSerDe: {}
OutputFormatConfiguration:
Serializer:
ParquetSerDe: {}
SchemaConfiguration:
CatalogId: !Ref AWS::AccountId
RoleARN: !GetAtt FirehoseDeliveryIAMRole.Arn
DatabaseName: !Ref LogsCollectionDatabase
TableName: !Ref LogsCollectionTable
Region: !Ref AWS::Region
VersionId: LATEST
当然,需要为 FirehosStream 定义 IAM 角色和策略