Kafka Connect 找不到已开发插件的 class
Kafka Connect cant' find class of developed plugin
我为实现 SinkConnector 的 kafka connect 创建了一个插件,我使用 gradle jar 任务将其打包到一个 jar 中:
jar {
archiveName='name.jar'
}
我把它复制到kafka集群的一个文件夹里,我设置CLASSPATH=我的jar在哪里。
然后我执行 kafka 脚本来启动独立连接,它给我一个错误,说我的 class 找不到:
[2017-07-25 05:15:52,084] WARN could not get type for name mypackage.SplunkSinkConnector from any class loader (org.reflections.Reflections:384)
org.reflections.ReflectionsException: could not get type for name mypackage.SplunkSinkConnector
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:378)
at org.reflections.ReflectionUtils.forNames(ReflectionUtils.java:397)
at org.reflections.Reflections.getSubTypesOf(Reflections.java:367)
at org.apache.kafka.connect.runtime.PluginDiscovery.connectorPlugins(PluginDiscovery.java:76)
at org.apache.kafka.connect.runtime.PluginDiscovery.scanClasspathForPlugins(PluginDiscovery.java:70)
at org.apache.kafka.connect.runtime.AbstractHerder.run(AbstractHerder.java:354)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: mypackage.SplunkSinkConnector
at java.net.URLClassLoader.run(URLClassLoader.java:372)
at java.net.URLClassLoader.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:376)
... 6 more
[2017-07-25 05:15:52,419] INFO Reflections took 1586 ms to scan 62 urls, producing 3002 keys and 15379 values (org.reflections.Reflections:229)
[2017-07-25 05:15:52,420] WARN could not get type for name mypackage.SplunkSinkConnector from any class loader (org.reflections.Reflections:384)
org.reflections.ReflectionsException: could not get type for name mypackage.SplunkSinkConnector
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:378)
at org.reflections.ReflectionUtils.forNames(ReflectionUtils.java:397)
at org.reflections.Reflections.getSubTypesOf(Reflections.java:367)
at org.apache.kafka.connect.runtime.ConnectorFactory.getConnectorClass(ConnectorFactory.java:69)
at org.apache.kafka.connect.runtime.ConnectorFactory.newConnector(ConnectorFactory.java:38)
at org.apache.kafka.connect.runtime.AbstractHerder.getConnector(AbstractHerder.java:334)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:233)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:159)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:93)
Caused by: java.lang.ClassNotFoundException: mypackage.SplunkSinkConnector
at java.net.URLClassLoader.run(URLClassLoader.java:372)
at java.net.URLClassLoader.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:376)
... 8 more
知道为什么不拿起我的罐子吗?
谢谢
================================
编辑:Kakfa Connect 版本 10.2.1,根据脚本,class路径计算方式为:CLASSPATH="$CLASSPATH":"$KAFKA_HOME/libs/*"
您能否检查您的 .jar 文件以确保 class 存在。将 Scala 用作 JVM shell:
// Or Maybe try $CLASSPATH with the full classpath that you are using.
scala -classpath path-to-jar.jar
// If class is not loaded, this will trigger an error.
classOf[mypackage.SplunkSinkConnector]
仅供参考,我正是这样做的,使用自定义 .jar 插件和通过 CLASSPATH 环境变量加载的 Kafka Connect。
更新:这是我的插件的 build.gradle
文件。 IMO,这是构建具有一些简单依赖项的 Java.jar 的最简单方法。我用 gradle jar
构建 jar,它将在 ./build/libs/(project-name).jar
:
处创建
apply plugin: 'java'
apply plugin: 'idea'
sourceCompatibility = 1.8
repositories {
mavenLocal()
mavenCentral()
maven { url "http://packages.confluent.io/maven/" }
}
configurations {
all*.exclude group: 'org.slf4j', module: 'slf4j-log4j12'
all*.exclude group: 'log4j'
}
dependencies {
compile 'io.confluent:kafka-connect-storage-partitioner:3.2.1'
compile 'org.apache.kafka:connect-api:0.10.2.1'
testCompile 'junit:junit:4.+'
}
idea {
project {
languageLevel = '1.8'
}
}
我为实现 SinkConnector 的 kafka connect 创建了一个插件,我使用 gradle jar 任务将其打包到一个 jar 中:
jar {
archiveName='name.jar'
}
我把它复制到kafka集群的一个文件夹里,我设置CLASSPATH=我的jar在哪里。 然后我执行 kafka 脚本来启动独立连接,它给我一个错误,说我的 class 找不到:
[2017-07-25 05:15:52,084] WARN could not get type for name mypackage.SplunkSinkConnector from any class loader (org.reflections.Reflections:384)
org.reflections.ReflectionsException: could not get type for name mypackage.SplunkSinkConnector
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:378)
at org.reflections.ReflectionUtils.forNames(ReflectionUtils.java:397)
at org.reflections.Reflections.getSubTypesOf(Reflections.java:367)
at org.apache.kafka.connect.runtime.PluginDiscovery.connectorPlugins(PluginDiscovery.java:76)
at org.apache.kafka.connect.runtime.PluginDiscovery.scanClasspathForPlugins(PluginDiscovery.java:70)
at org.apache.kafka.connect.runtime.AbstractHerder.run(AbstractHerder.java:354)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: mypackage.SplunkSinkConnector
at java.net.URLClassLoader.run(URLClassLoader.java:372)
at java.net.URLClassLoader.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:376)
... 6 more
[2017-07-25 05:15:52,419] INFO Reflections took 1586 ms to scan 62 urls, producing 3002 keys and 15379 values (org.reflections.Reflections:229)
[2017-07-25 05:15:52,420] WARN could not get type for name mypackage.SplunkSinkConnector from any class loader (org.reflections.Reflections:384)
org.reflections.ReflectionsException: could not get type for name mypackage.SplunkSinkConnector
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:378)
at org.reflections.ReflectionUtils.forNames(ReflectionUtils.java:397)
at org.reflections.Reflections.getSubTypesOf(Reflections.java:367)
at org.apache.kafka.connect.runtime.ConnectorFactory.getConnectorClass(ConnectorFactory.java:69)
at org.apache.kafka.connect.runtime.ConnectorFactory.newConnector(ConnectorFactory.java:38)
at org.apache.kafka.connect.runtime.AbstractHerder.getConnector(AbstractHerder.java:334)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:233)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:159)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:93)
Caused by: java.lang.ClassNotFoundException: mypackage.SplunkSinkConnector
at java.net.URLClassLoader.run(URLClassLoader.java:372)
at java.net.URLClassLoader.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:376)
... 8 more
知道为什么不拿起我的罐子吗?
谢谢
================================
编辑:Kakfa Connect 版本 10.2.1,根据脚本,class路径计算方式为:CLASSPATH="$CLASSPATH":"$KAFKA_HOME/libs/*"
您能否检查您的 .jar 文件以确保 class 存在。将 Scala 用作 JVM shell:
// Or Maybe try $CLASSPATH with the full classpath that you are using.
scala -classpath path-to-jar.jar
// If class is not loaded, this will trigger an error.
classOf[mypackage.SplunkSinkConnector]
仅供参考,我正是这样做的,使用自定义 .jar 插件和通过 CLASSPATH 环境变量加载的 Kafka Connect。
更新:这是我的插件的 build.gradle
文件。 IMO,这是构建具有一些简单依赖项的 Java.jar 的最简单方法。我用 gradle jar
构建 jar,它将在 ./build/libs/(project-name).jar
:
apply plugin: 'java'
apply plugin: 'idea'
sourceCompatibility = 1.8
repositories {
mavenLocal()
mavenCentral()
maven { url "http://packages.confluent.io/maven/" }
}
configurations {
all*.exclude group: 'org.slf4j', module: 'slf4j-log4j12'
all*.exclude group: 'log4j'
}
dependencies {
compile 'io.confluent:kafka-connect-storage-partitioner:3.2.1'
compile 'org.apache.kafka:connect-api:0.10.2.1'
testCompile 'junit:junit:4.+'
}
idea {
project {
languageLevel = '1.8'
}
}