Kafka 连接自定义 timestamp.extractor

Kafka Connect with custom timestamp.extractor

我在将 jar 添加到 Kafka 连接 class 路径时遇到问题,同时尝试从 Kafka 读取消息到 S3。

目标是根据时间戳在分区中写入消息,时间戳是Kafka消息中Key的一部分。

长话短说,我必须提供自定义 时间戳提取器 。按照文档 here 创建了一个实现 TimestampExtractor 接口的 class 并将 JAR 位置添加到 plugin.path 属性.

问题是当我开始连接时,找不到 class。不知何故,罐子不在 class 路径中,我得到

org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor

附加数据:

版本: Confluent 4.0.0

连接:独立连接

开始命令:

sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \ /home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \ /home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties

希望得到任何帮助。

要使自定义时间戳提取器 classes 可用于您的 S3 连接器,您需要以下内容:

  • 添加带有自定义 classes 的 jar 以及其他连接器的依赖项。示例:

    保存在 ./share/java/kafka-connect-s3 下(如果您希望这样) 仅在 S3 连接器中可用,或在 ./share/java/kafka-connect-storage-common 使其可供 所有存储接收器连接器(当前为 S3 和 HDFS 连接器)。

  • 确保您的自定义 class 实现了 io.confluent.connect.storage.partitioner.TimestampExtractor 接口。
  • 在连接器配置中设置 timestamp.extractor 属性 时使用完全限定的 class 名称,当然要确保它与您的包匹配定义并打包在你的 jar 中。例如:

    timestamp.extractor=me.connectors.MyTimestampExtractor

最后,您将按照类似的过程使自定义分区程序可用于您的连接器。