使用kafka-connect将数据摄取到s3时如何根据json字段的一部分进行分区
How to partition based on part of json field when using kafka-connect to ingest data to s3
我正在尝试使用 s3-sink kafka 连接器将一些 json 数据存储到 s3 中。我的 json 格式如下:
{
"server": someserver,
"id": someid,
"time": "2018-01-18T23:47:03.737487Z"
}
我想根据数据所在的时间对我的数据进行分区,但忽略分钟和秒。例如。上面的 json 将属于 2018-01-18T23 目录。我应该如何在属性文件中设置 field.partition 来实现这一点?
非常感谢!
实现您用 Confluent's S3 connector 描述的粗略方法是:
- 将 属性
timestamp.extractor
定义为 RecordField
,以从记录中的字段中提取时间戳。
- 将 属性
timestamp.field
设置为该记录字段的名称(在您的示例中为 time
)
- 设置
path.format
属性。正如您在示例中提到的那样,这将允许您将文件存储最多一小时,而忽略更精细的粒度(分钟、秒等)。
- 同时将
partition.duration.ms
设置为对您有意义的粒度。重要的是 -1
的默认值不允许您使用基于时间的分区。
- 最后,如果您使用的是预定义的分区程序之一或相关的基于时间的自定义分区程序,也请设置属性
locale
和 timezone
。
请注意,连接器附带了一个预定义的基于时间的分区程序 class,您可能会发现它对您的用例很有用。您可以通过设置使用它:
partitioner.class=io.confluent.connect.storage.partitioner.HourlyPartitioner
我正在尝试使用 s3-sink kafka 连接器将一些 json 数据存储到 s3 中。我的 json 格式如下:
{
"server": someserver,
"id": someid,
"time": "2018-01-18T23:47:03.737487Z"
}
我想根据数据所在的时间对我的数据进行分区,但忽略分钟和秒。例如。上面的 json 将属于 2018-01-18T23 目录。我应该如何在属性文件中设置 field.partition 来实现这一点?
非常感谢!
实现您用 Confluent's S3 connector 描述的粗略方法是:
- 将 属性
timestamp.extractor
定义为RecordField
,以从记录中的字段中提取时间戳。 - 将 属性
timestamp.field
设置为该记录字段的名称(在您的示例中为time
) - 设置
path.format
属性。正如您在示例中提到的那样,这将允许您将文件存储最多一小时,而忽略更精细的粒度(分钟、秒等)。 - 同时将
partition.duration.ms
设置为对您有意义的粒度。重要的是-1
的默认值不允许您使用基于时间的分区。 - 最后,如果您使用的是预定义的分区程序之一或相关的基于时间的自定义分区程序,也请设置属性
locale
和timezone
。
请注意,连接器附带了一个预定义的基于时间的分区程序 class,您可能会发现它对您的用例很有用。您可以通过设置使用它:
partitioner.class=io.confluent.connect.storage.partitioner.HourlyPartitioner