Kafka 连接自定义 timestamp.extractor
Kafka Connect with custom timestamp.extractor
我在将 jar 添加到 Kafka 连接 class 路径时遇到问题,同时尝试从 Kafka 读取消息到 S3。
目标是根据时间戳在分区中写入消息,时间戳是Kafka消息中Key的一部分。
长话短说,我必须提供自定义 时间戳提取器 。按照文档 here 创建了一个实现 TimestampExtractor
接口的 class 并将 JAR 位置添加到 plugin.path
属性.
问题是当我开始连接时,找不到 class。不知何故,罐子不在 class 路径中,我得到
org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor
附加数据:
版本: Confluent 4.0.0
连接:独立连接
开始命令:
sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \
/home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \
/home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties
希望得到任何帮助。
要使自定义时间戳提取器 classes 可用于您的 S3 连接器,您需要以下内容:
添加带有自定义 classes 的 jar 以及其他连接器的依赖项。示例:
保存在 ./share/java/kafka-connect-s3
下(如果您希望这样)
仅在 S3 连接器中可用,或在
./share/java/kafka-connect-storage-common
使其可供
所有存储接收器连接器(当前为 S3 和 HDFS 连接器)。
- 确保您的自定义 class 实现了
io.confluent.connect.storage.partitioner.TimestampExtractor
接口。
在连接器配置中设置 timestamp.extractor
属性 时使用完全限定的 class 名称,当然要确保它与您的包匹配定义并打包在你的 jar 中。例如:
timestamp.extractor=me.connectors.MyTimestampExtractor
最后,您将按照类似的过程使自定义分区程序可用于您的连接器。
我在将 jar 添加到 Kafka 连接 class 路径时遇到问题,同时尝试从 Kafka 读取消息到 S3。
目标是根据时间戳在分区中写入消息,时间戳是Kafka消息中Key的一部分。
长话短说,我必须提供自定义 时间戳提取器 。按照文档 here 创建了一个实现 TimestampExtractor
接口的 class 并将 JAR 位置添加到 plugin.path
属性.
问题是当我开始连接时,找不到 class。不知何故,罐子不在 class 路径中,我得到
org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor
附加数据:
版本: Confluent 4.0.0
连接:独立连接
开始命令:
sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \
/home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \
/home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties
希望得到任何帮助。
要使自定义时间戳提取器 classes 可用于您的 S3 连接器,您需要以下内容:
添加带有自定义 classes 的 jar 以及其他连接器的依赖项。示例:
保存在
./share/java/kafka-connect-s3
下(如果您希望这样) 仅在 S3 连接器中可用,或在./share/java/kafka-connect-storage-common
使其可供 所有存储接收器连接器(当前为 S3 和 HDFS 连接器)。- 确保您的自定义 class 实现了
io.confluent.connect.storage.partitioner.TimestampExtractor
接口。 在连接器配置中设置
timestamp.extractor
属性 时使用完全限定的 class 名称,当然要确保它与您的包匹配定义并打包在你的 jar 中。例如:timestamp.extractor=me.connectors.MyTimestampExtractor
最后,您将按照类似的过程使自定义分区程序可用于您的连接器。