Kafka Connect Sink Partition by recordField 在 Ticks 中
Kafka Connect Sink Partition by recordField which is in Ticks
我有一个kafka connect sink。在我的主题中,我有一个以刻度表示的字段,而不是正确的时间戳。我最终想将其用作目标中的分区字段(在本例中为第 2 代 Azure 数据湖)。
我尝试将 TimeBasedPartitioner 与 timestamp.extractor 和 timestampconvertor 一起使用,但它只是在格式上出错。据我所知——所有这些时间戳转换器都使用“时间戳”字段,而我的是在滴答中,所以我必须在使用时间戳转换器之前进行额外的转换,但我不确定如何,因为我已经研究过 SMT , 不要提供任何这样的东西。
我得到的错误
java.lang.IllegalArgumentException: Invalid format: "20204642-05-16 21:34:40.000+0000" is malformed at " 21:34:40.000+0000"
这是我的接收器配置的样子
{
"name": "azuredatalakegen2-sink-featuretracking-dl",
"config": {
"connector.class": "io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
"topics": "sometopic",
"topics.dir": "filesystem/folder",
"flush.size": "1",
"file.delim": "-",
"path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "UTC",
"timezone": "UTC",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "300000",
"timestamp.extractor": "RecordField",
"timestamp.field": "event_date_time_ticks",
"format.class":"io.confluent.connect.azure.storage.format.parquet.ParquetFormat",
"transforms": "flatten,TimestampConverter",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSSZ",
"transforms.TimestampConverter.field":"event_date_time_ticks",
"transforms.TimestampConverter.target.type": "string",
"max.retries": "288",
"retry.backoff.ms": "300000",
"errors.retry.timeout":"3600000",
"errors.retry.delay.max.ms":"60000",
"errors.log.enable":"true",
"errors.log.include.messages":"true",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
.......other conn related configs.......
}
}
这是我看到的 SMT:SMTs in Confluent Platform
如何使用以下字段对目标中的数据进行分区:event_date_time_ticks,以刻度为单位,例如 637535500015510000 表示:2021-04-09 T07:26:41.551Z
打勾转换为日期时间:Tick to Datetime
即使我尝试 FieldPartitioner ,我如何在上面的接收器配置中将该刻度转换为日期时间格式?还是我必须写一些自定义的东西?
TimestampConverter expects Unix epoch time,不是滴答声。
您需要对其进行转换,这必须是您的 Producer 中的自定义转换或修改(这应该不是主要问题,因为大多数语言都有日期时间纪元函数)
Convert ticks to unix timestamp
我有一个kafka connect sink。在我的主题中,我有一个以刻度表示的字段,而不是正确的时间戳。我最终想将其用作目标中的分区字段(在本例中为第 2 代 Azure 数据湖)。
我尝试将 TimeBasedPartitioner 与 timestamp.extractor 和 timestampconvertor 一起使用,但它只是在格式上出错。据我所知——所有这些时间戳转换器都使用“时间戳”字段,而我的是在滴答中,所以我必须在使用时间戳转换器之前进行额外的转换,但我不确定如何,因为我已经研究过 SMT , 不要提供任何这样的东西。
我得到的错误
java.lang.IllegalArgumentException: Invalid format: "20204642-05-16 21:34:40.000+0000" is malformed at " 21:34:40.000+0000"
这是我的接收器配置的样子
{
"name": "azuredatalakegen2-sink-featuretracking-dl",
"config": {
"connector.class": "io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
"topics": "sometopic",
"topics.dir": "filesystem/folder",
"flush.size": "1",
"file.delim": "-",
"path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "UTC",
"timezone": "UTC",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "300000",
"timestamp.extractor": "RecordField",
"timestamp.field": "event_date_time_ticks",
"format.class":"io.confluent.connect.azure.storage.format.parquet.ParquetFormat",
"transforms": "flatten,TimestampConverter",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSSZ",
"transforms.TimestampConverter.field":"event_date_time_ticks",
"transforms.TimestampConverter.target.type": "string",
"max.retries": "288",
"retry.backoff.ms": "300000",
"errors.retry.timeout":"3600000",
"errors.retry.delay.max.ms":"60000",
"errors.log.enable":"true",
"errors.log.include.messages":"true",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
.......other conn related configs.......
}
}
这是我看到的 SMT:SMTs in Confluent Platform
如何使用以下字段对目标中的数据进行分区:event_date_time_ticks,以刻度为单位,例如 637535500015510000 表示:2021-04-09 T07:26:41.551Z
打勾转换为日期时间:Tick to Datetime
即使我尝试 FieldPartitioner ,我如何在上面的接收器配置中将该刻度转换为日期时间格式?还是我必须写一些自定义的东西?
TimestampConverter expects Unix epoch time,不是滴答声。
您需要对其进行转换,这必须是您的 Producer 中的自定义转换或修改(这应该不是主要问题,因为大多数语言都有日期时间纪元函数)
Convert ticks to unix timestamp