将自定义转换器与 Kafka Connect 一起使用?
Using a custom converter with Kafka Connect?
我正在尝试将自定义转换器与 Kafka Connect 一起使用,但我似乎无法正确使用。我希望有人对此有经验,可以帮助我解决这个问题!
初始情况
我的自定义转换器的 class 路径是 custom.CustomStringConverter
。
为了避免任何错误,我的自定义转换器目前只是预先存在的 StringConverter 的 copy/paste(当然,当我让它工作时,这会改变)。
https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
我有一个包含 3 个节点的 kafka 连接集群,这些节点是 运行 confluent 的官方 docker 图像(confluentinc/cp-kafka-connect:3.3.0
)。
每个节点都配置为加载一个包含我的转换器的 jar(使用 docker 卷)。
会发生什么?
当连接器启动时,它们会正确加载 jar 并找到自定义转换器。事实上,这就是我在日志中看到的:
[2017-10-10 13:06:46,274] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[...]
[2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and 'CustomString' to plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
然后我 POST 对其中一个连接器节点进行 JSON 配置以创建我的连接器:
{
"name": "hdfsSinkCustom",
"config": {
"topics": "yellow",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "custom.CustomStringConverter",
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
"topics.dir": "yellow_storage",
"flush.size": "1",
"rotate.interval.ms": "1000"
}
}
并收到以下回复:
{
"error_code": 400,
"message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value custom.CustomStringConverter for configuration value.converter: Class custom.CustomStringConverter could not be found.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}
我错过了什么?
如果我尝试 运行 Kafka Connect stadnalone,错误消息是相同的。
有人遇到过这个问题吗?我错过了什么?
好的,感谢 Kafka 用户邮件列表中的 Philip Schmitt,我找到了解决方案。
他提到了这个问题:https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-6007,这确实是我面临的问题。
引用他的话:
To test this, I simply copied my SMT jar to the folder of the connector I was using and adjusted the plugin.path property.
的确,我通过将转换器放在连接器的文件夹中摆脱了这个错误。
我还尝试了其他方法:创建自定义连接器并将该自定义连接器与自定义转换器一起使用,两者均作为插件加载。它也有效。
总结:转换器由连接器加载。如果您的连接器是插件,您的转换器也应该是插件。如果您的连接器不是插件(与您的 kafka connect distrib 捆绑在一起),您的转换器也不应该是。
我正在尝试将自定义转换器与 Kafka Connect 一起使用,但我似乎无法正确使用。我希望有人对此有经验,可以帮助我解决这个问题!
初始情况
我的自定义转换器的 class 路径是
custom.CustomStringConverter
。为了避免任何错误,我的自定义转换器目前只是预先存在的 StringConverter 的 copy/paste(当然,当我让它工作时,这会改变)。 https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
我有一个包含 3 个节点的 kafka 连接集群,这些节点是 运行 confluent 的官方 docker 图像(
confluentinc/cp-kafka-connect:3.3.0
)。每个节点都配置为加载一个包含我的转换器的 jar(使用 docker 卷)。
会发生什么?
当连接器启动时,它们会正确加载 jar 并找到自定义转换器。事实上,这就是我在日志中看到的:
[2017-10-10 13:06:46,274] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[...]
[2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and 'CustomString' to plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
然后我 POST 对其中一个连接器节点进行 JSON 配置以创建我的连接器:
{
"name": "hdfsSinkCustom",
"config": {
"topics": "yellow",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "custom.CustomStringConverter",
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
"topics.dir": "yellow_storage",
"flush.size": "1",
"rotate.interval.ms": "1000"
}
}
并收到以下回复:
{
"error_code": 400,
"message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value custom.CustomStringConverter for configuration value.converter: Class custom.CustomStringConverter could not be found.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}
我错过了什么?
如果我尝试 运行 Kafka Connect stadnalone,错误消息是相同的。
有人遇到过这个问题吗?我错过了什么?
好的,感谢 Kafka 用户邮件列表中的 Philip Schmitt,我找到了解决方案。
他提到了这个问题:https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-6007,这确实是我面临的问题。
引用他的话:
To test this, I simply copied my SMT jar to the folder of the connector I was using and adjusted the plugin.path property.
的确,我通过将转换器放在连接器的文件夹中摆脱了这个错误。
我还尝试了其他方法:创建自定义连接器并将该自定义连接器与自定义转换器一起使用,两者均作为插件加载。它也有效。
总结:转换器由连接器加载。如果您的连接器是插件,您的转换器也应该是插件。如果您的连接器不是插件(与您的 kafka connect distrib 捆绑在一起),您的转换器也不应该是。