在交付到 S3 之前,我可以在 Kinesis Firehose 中自定义分区吗?

Can I customize partitioning in Kinesis Firehose before delivering to S3?

我有一个 Firehose 流,旨在从不同来源和不同事件类型中摄取数百万个事件。流应将所有数据作为 raw\unaltered 数据的存储传送到一个 S3 存储桶。

我正在考虑根据事件消息中嵌入的元数据(如事件源、事件类型和事件日期)在 S3 中对这些数据进行分区。

但是,Firehose 遵循其基于记录到达时间的默认分区。是否可以自定义此分区行为以满足我的需要?

更新:已接受的答案已更新为新答案,表明该功能将于 2021 年 9 月可用

没有。不能根据活动内容'partition'

一些选项是:

  • 发送到单独的 Firehose 流
  • 发送到 Kinesis 数据流(而不是 Firehose)并编写您自己的自定义 Lambda 函数来处理和保存数据(参见:AWS Developer Forums: Athena and Kinesis Firehose
  • 使用 Kinesis Analytics 处理消息并'direct'它到不同的 Firehose 流

如果您打算将输出与 Amazon Athena 或 Amazon EMR 一起使用,您还可以考虑将其转换为具有 much better performance 的 Parquet 格式。这将需要 post- 将 S3 中的数据作为批处理而不是在数据到达流时进行转换。

基于 John 的回答,如果您没有近乎实时的流式传输要求,我们发现使用 Athena 进行批处理对我们来说是一个简单的解决方案。

Kinesis 流到给定的 table unpartitioned_event_data,这可以利用 native record arrival time partitioning.

我们定义了另一个 Athena table partitioned_event_table,它可以使用自定义分区键定义并利用 Athena 具有的 INSERT INTO 功能。 Athena 将自动以您想要的格式重新分区您的数据,而无需任何自定义消费者或新的基础设施来管理。这可以通过 cron、SNS 或类似 Airflow 的东西来安排。

很棒的是,您可以创建一个视图,对两个 table 执行 UNION,以便在一个地方查询历史和实时数据。

我们实际上在 Radar 和 talk about more trade-offs in this blog post 处理了这个问题。

自 2021 年 9 月 1 日起,AWS Kinesis Firehose 支持此功能。阅读 the announcement blog post here.

来自文档:

You can use the Key and Value fields to specify the data record parameters to be used as dynamic partitioning keys and jq queries to generate dynamic partitioning key values. ...

这是 UI 的样子:

在撰写本文时,Vlad 提到的动态分区功能仍然很新。我需要它成为 CloudFormation 模板的一部分,但仍未正确记录。我必须添加 DynamicPartitioningConfiguration 才能使其正常工作。 MetadataExtractionQuery 语法也没有正确记录。

  MyKinesisFirehoseStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    ...
    Properties:
      ExtendedS3DestinationConfiguration:
        Prefix: "clients/client_id=!{client_id}/dt=!{timestamp:yyyy-MM-dd}/"
        ErrorOutputPrefix: "errors/!{firehose:error-output-type}/"
        DynamicPartitioningConfiguration:
          Enabled: "true"
          RetryOptions:
            DurationInSeconds: "300"
        ProcessingConfiguration:
          Enabled: "true"
          Processors:
            - Type: AppendDelimiterToRecord
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: "{client_id:.client_id}"
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6

为了扩展 Murali 的答案,我们在 CDK 中实现了它:

我们的 json 数据看起来像这样:

{
    "data": 
        {
        "timestamp":1633521266990,
        "defaultTopic":"Topic",
        "data":
        {
            "OUT1":"Inactive",
            "Current_mA":3.92
        }
    }
}

CDK 代码如下所示:

const DeliveryStream = new CfnDeliveryStream(this, 'deliverystream', {
  deliveryStreamName: 'deliverystream',
  extendedS3DestinationConfiguration: {
    cloudWatchLoggingOptions: {
      enabled: true,
    },
    bucketArn: Bucket.bucketArn,
    roleArn: deliveryStreamRole.roleArn,
    prefix: 'defaultTopic=!{partitionKeyFromQuery:defaultTopic}/!{timestamp:yyyy/MM/dd}/',
    errorOutputPrefix: 'error/!{firehose:error-output-type}/',
    bufferingHints: {
      intervalInSeconds: 60,
    },
    dynamicPartitioningConfiguration: {
      enabled: true,
    },
    processingConfiguration: {
      enabled: true,
      processors: [
        {
          type: 'MetadataExtraction',
          parameters: [
            {
              parameterName: 'MetadataExtractionQuery',
              parameterValue: '{Topic: .data.defaultTopic}',
            },
            {
              parameterName: 'JsonParsingEngine',
              parameterValue: 'JQ-1.6',
            },
          ],
        },
        {
          type: 'AppendDelimiterToRecord',
          parameters: [
            {
              parameterName: 'Delimiter',
              parameterValue: '\n',
            },
          ],
        },
      ],
    },
  },
})