按事件时间对 Kinesis firehose S3 记录进行分区
Partition Kinesis firehose S3 records by event time
Firehose->S3 使用当前日期作为在 S3 中创建密钥的前缀。因此,这会在写入记录时对数据进行分区。我的 firehose 流包含具有特定事件时间的事件。
有没有办法创建包含此事件时间的 S3 密钥?下游的处理工具取决于每个事件在与实际发生时间相关的 "hour-folder" 中。还是在 Firehose 完成后必须要有一个额外的处理步骤?
事件时间可以在分区键中,或者我可以使用 Lambda 函数从记录中解析它。
Kinesis Firehose(目前)不允许客户端控制如何生成最终 S3 对象的日期后缀。
您唯一的选择是在 Kinesis Firehose 之后添加一个 post-processing 层。例如,您可以使用 Data Pipeline 安排每小时一次的 EMR 作业,该作业读取上一小时内写入的所有文件并将它们发布到正确的 S3 目的地。
这不是问题的答案,但我想解释一下根据事件到达时间存储记录背后的想法。
首先简单介绍一下流。 Kinesis 只是一个数据流。而且它有一个消费的概念。只有通过 顺序 读取流才能可靠地使用它。还有一种将检查点作为暂停和恢复消费过程的机制的想法。检查点只是一个序列号,用于标识流中的位置。通过指定此编号,可以从特定事件开始读取流。
现在回到默认的 s3 firehose 设置... 由于运动流的容量非常有限,很可能需要将来自运动的数据存储在某处以进行分析稍后。 firehose to s3 setup 开箱即用。它只是将流中的原始数据存储到 s3 存储桶中。但从逻辑上讲,此数据仍然是相同的 记录流 。为了能够可靠地使用(读取)这个流,需要这些序列号作为检查点。而这些数字是记录到达时间。
如果我想按创建时间读取记录怎么办?看起来完成此任务的正确方法是顺序读取 s3 流,将其转储到某个 [时间序列] 数据库或数据仓库,并对该存储进行基于创建时间的读取。否则,在读取 s3(流)时,总是会有非零的机会错过一些事件。所以我根本不建议重新排序 s3 存储桶。
对于未来的读者 - Firehose 支持 Amazon S3 对象的自定义前缀
https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
您需要进行一些 post 处理或编写自定义流媒体使用者(例如 Lambda)来执行此操作。
我们公司处理的事件量很大,因此编写 Lambda 函数似乎不是一个很好的用钱方式。相反,我们发现使用 Athena 进行批处理是一个非常简单的解决方案。
首先,您流入 Athena table、events
,可以选择 partitioned by an arrival-time。
然后,您定义另一个 Athena table,例如 events_by_event_time
,它由事件的 event_time
属性分区,或者它已在模式中定义。
最后,您为 运行 一个 Athena INSERT INTO 查询安排了一个进程,该查询从 events
获取事件并自动将它们重新分区到 events_by_event_time
,现在您的事件已分区event_time
,无需 EMR、数据管道或任何其他基础设施。
您可以对事件的任何属性执行此操作。还值得注意的是,您可以创建一个视图,执行两个 table 的 UNION 以查询实时和历史事件。
实际上我在 blog post here.
中写了更多相关内容
AWS 于 2021 年 8 月开始提供“动态分区”:
Dynamic partitioning enables you to continuously partition streaming data in Kinesis Data Firehose by using keys within data (for example, customer_id or transaction_id) and then deliver the data grouped by these keys into corresponding Amazon Simple Storage Service (Amazon S3) prefixes.
https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
Firehose->S3 使用当前日期作为在 S3 中创建密钥的前缀。因此,这会在写入记录时对数据进行分区。我的 firehose 流包含具有特定事件时间的事件。
有没有办法创建包含此事件时间的 S3 密钥?下游的处理工具取决于每个事件在与实际发生时间相关的 "hour-folder" 中。还是在 Firehose 完成后必须要有一个额外的处理步骤?
事件时间可以在分区键中,或者我可以使用 Lambda 函数从记录中解析它。
Kinesis Firehose(目前)不允许客户端控制如何生成最终 S3 对象的日期后缀。
您唯一的选择是在 Kinesis Firehose 之后添加一个 post-processing 层。例如,您可以使用 Data Pipeline 安排每小时一次的 EMR 作业,该作业读取上一小时内写入的所有文件并将它们发布到正确的 S3 目的地。
这不是问题的答案,但我想解释一下根据事件到达时间存储记录背后的想法。
首先简单介绍一下流。 Kinesis 只是一个数据流。而且它有一个消费的概念。只有通过 顺序 读取流才能可靠地使用它。还有一种将检查点作为暂停和恢复消费过程的机制的想法。检查点只是一个序列号,用于标识流中的位置。通过指定此编号,可以从特定事件开始读取流。
现在回到默认的 s3 firehose 设置... 由于运动流的容量非常有限,很可能需要将来自运动的数据存储在某处以进行分析稍后。 firehose to s3 setup 开箱即用。它只是将流中的原始数据存储到 s3 存储桶中。但从逻辑上讲,此数据仍然是相同的 记录流 。为了能够可靠地使用(读取)这个流,需要这些序列号作为检查点。而这些数字是记录到达时间。
如果我想按创建时间读取记录怎么办?看起来完成此任务的正确方法是顺序读取 s3 流,将其转储到某个 [时间序列] 数据库或数据仓库,并对该存储进行基于创建时间的读取。否则,在读取 s3(流)时,总是会有非零的机会错过一些事件。所以我根本不建议重新排序 s3 存储桶。
对于未来的读者 - Firehose 支持 Amazon S3 对象的自定义前缀
https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
您需要进行一些 post 处理或编写自定义流媒体使用者(例如 Lambda)来执行此操作。
我们公司处理的事件量很大,因此编写 Lambda 函数似乎不是一个很好的用钱方式。相反,我们发现使用 Athena 进行批处理是一个非常简单的解决方案。
首先,您流入 Athena table、events
,可以选择 partitioned by an arrival-time。
然后,您定义另一个 Athena table,例如 events_by_event_time
,它由事件的 event_time
属性分区,或者它已在模式中定义。
最后,您为 运行 一个 Athena INSERT INTO 查询安排了一个进程,该查询从 events
获取事件并自动将它们重新分区到 events_by_event_time
,现在您的事件已分区event_time
,无需 EMR、数据管道或任何其他基础设施。
您可以对事件的任何属性执行此操作。还值得注意的是,您可以创建一个视图,执行两个 table 的 UNION 以查询实时和历史事件。
实际上我在 blog post here.
中写了更多相关内容AWS 于 2021 年 8 月开始提供“动态分区”:
Dynamic partitioning enables you to continuously partition streaming data in Kinesis Data Firehose by using keys within data (for example, customer_id or transaction_id) and then deliver the data grouped by these keys into corresponding Amazon Simple Storage Service (Amazon S3) prefixes.
https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html