Kafka Connect Sink Partition by recordField 在 Ticks 中

Kafka Connect Sink Partition by recordField which is in Ticks

我有一个kafka connect sink。在我的主题中,我有一个以刻度表示的字段,而不是正确的时间戳。我最终想将其用作目标中的分区字段(在本例中为第 2 代 Azure 数据湖)。

我尝试将 TimeBasedPartitioner 与 timestamp.extractor 和 timestampconvertor 一起使用,但它只是在格式上出错。据我所知——所有这些时间戳转换器都使用“时间戳”字段,而我的是在滴答中,所以我必须在使用时间戳转换器之前进行额外的转换,但我不确定如何,因为我已经研究过 SMT , 不要提供任何这样的东西。

我得到的错误

java.lang.IllegalArgumentException: Invalid format: "20204642-05-16 21:34:40.000+0000" is malformed at " 21:34:40.000+0000"

这是我的接收器配置的样子

{
    "name": "azuredatalakegen2-sink-featuretracking-dl",
    "config": {
      "connector.class": "io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
      "topics": "sometopic",
      "topics.dir": "filesystem/folder",
      "flush.size": "1",
      "file.delim": "-",
      "path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
      "locale": "UTC",
      "timezone": "UTC",
      "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
      "partition.duration.ms": "300000",
      "timestamp.extractor": "RecordField",
      "timestamp.field": "event_date_time_ticks",
      "format.class":"io.confluent.connect.azure.storage.format.parquet.ParquetFormat",
      "transforms": "flatten,TimestampConverter",
      "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
      "transforms.flatten.delimiter": "_",
      "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
      "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSSZ",
      "transforms.TimestampConverter.field":"event_date_time_ticks",
      "transforms.TimestampConverter.target.type": "string",
      "max.retries": "288",
      "retry.backoff.ms": "300000",
      "errors.retry.timeout":"3600000",
      "errors.retry.delay.max.ms":"60000",
      "errors.log.enable":"true",
      "errors.log.include.messages":"true",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
     "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
.......other conn related configs.......
    }
    }

这是我看到的 SMT:SMTs in Confluent Platform

如何使用以下字段对目标中的数据进行分区:event_date_time_ticks,以刻度为单位,例如 637535500015510000 表示:2021-04-09 T07:26:41.551Z

打勾转换为日期时间:Tick to Datetime

即使我尝试 FieldPartitioner ,我如何在上面的接收器配置中将该刻度转换为日期时间格式?还是我必须写一些自定义的东西?

TimestampConverter expects Unix epoch time,不是滴答声。

您需要对其进行转换,这必须是您的 Producer 中的自定义转换或修改(这应该不是主要问题,因为大多数语言都有日期时间纪元函数)

Convert ticks to unix timestamp