在 Confluent 4.1 + Kafka 1.1 中为 Kafka Connect 打包自定义 Java `partitioner.class` 插件?

Packaging a custom Java `partitioner.class` plugin for Kafka Connect in Confluent 4.1 + Kafka 1.1?

我已经成功地在 Confluent 3.2.x(Kafka 0.10.x 上使用用 Java 编写的简单自定义分区程序 class ).我想升级到 Confluent 4.1 (Kafka 1.1) 但遇到错误。

Kafka Connect 的插件加载机制似乎在 CP 3.3.0 中有所改变。以前,只有 CLASSPATH 选项,但是 CP 3.3.0+ 有一个更新的推荐机制 plugin.path

如果我尝试继续使用遗留的 CLASSPATH 插件机制,当我尝试使用我的插件时,我得到:

java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.DefaultPartitioner

那是CP内部class。对于较旧的 CP 3.2.x,它在 class 路径上可用,但是在 CP >= 3.3.0 中新的 class 路径隔离工作,我认为必须提供连同插件。

我认为切换到较新的推荐 plugin.path 机制是明智的。我删除了 CLASSPATH 条目。在默认 /etc/kafka/connect-distributed.properties 中,我看到 plugin.path=/usr/share/java,所以我将我的插件 .jar 安装到 /usr/share/java/my-custom-partitioner/my-custom-partitioner.jar。我也尝试在那里添加和不添加依赖项 .jar 文件。

我的插件似乎在 Kafka Connect 服务启动时加载:

INFO Loading plugin from: /usr/share/java/my-custom-partitioner (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:194)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/my-custom-partitioner/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)

当我这样做时:

curl -X PUT -H "Content-Type: application/json" --data-binary "@sink_test_1.json" my-dev-test-vm:8083/connectors/sink-test-1/config

我得到:

{"error_code":500,"message":null}%             

我在kafka connect systemd日志中可以看到:

java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:270)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access[=16=]0(PartitionerConfig.java:33)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(PartitionerConfig.java:238)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:617)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:625)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:525)
at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:508)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:490)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)

不清楚哪里出了问题或为什么我的分区程序 class 没有正确加载。

仅供参考,我已经用 CP 4.1 + Kafka 1.1 依赖项重建了我的 Java 插件,并进行了一些小更新以匹配 API 更改,例如将 getSchemaGeneratorClass 的实现添加到我的分区程序class.

Custom Kafka Connect Partitioner classes 将无法通过旧的 CLASSPATH 机制工作,并且它们将无法作为插件使用较新的 Kafka 0.11.0+ 隔离插件机制。

唯一可行的解​​决方案是将带有自定义 Kafka Connect Partitioner class 的自定义 .jar 文件复制到 /usr/share/java/kafka-connect-storage-common/kafka-connect-storage-common 插件目录中。自定义 Kafka Connect Partitioner 插件 classes 必须存在于同一目录中,因此它们位于相同的隔离 class 加载程序中。

仅供参考,您可以看到 Kafka 0.11.0+ 隔离插件机制将仅加载不包含 Kafka 的四个特定 Java classes 的子classes在此处连接分区器:

https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1e70451bcd4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L279

感谢 cricket_007 推荐这个确切的解决方案:将自定义 Kafka Connect 分区程序 .jar 文件放在 /share/java/kafka-storage-common 目录中。我通过艰难的方式了解到为什么必须这样做以及为什么替代方案不起作用。