Kafka Mysql 连接器 plugin.path 配置
Kafka Mysql Connector plugin.path configuration
我正在尝试将 mysql
与 kafka
连接起来。我已经下载 debezium-debezium-connector-mysql
这是我的 connect-standalone.properties
:
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/ec2-user/share/confluent-hub-components/debezium-debezium-connector-mysql/lib
我的test.config
:
{
"name": "mysql-source-demo-customers",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "dsm1234",
"database.server.id": "42",
"database.server.name": "asgard",
"table.whitelist": "demo.customers",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.demo" ,
"include.schema.changes": "true",
"transforms": "unwrap,InsertTopic,InsertSourceDetails",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field":"messagetopic",
"transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSourceDetails.static.field":"messagesource",
"transforms.InsertSourceDetails.static.value":"Debezium CDC from MySQL on asgard"
}
}
当我运行bin/connect-standalone.sh config/connect-standalone.properties test.config
,
我收到一个错误:
[2019-11-19 09:10:45,146] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectStandalone:78)
[2019-11-19 09:10:45,170] INFO Loading plugin from: /home/ec2-user/share/confluent-hub-components/debezium-debezium-connector-mysql/lib/mysql-binlog-connector-java-0.19.1.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:222)
[2019-11-19 09:10:45,291] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ec2-user/share/confluent-hub-components/debezium-debezium-connector-mysql/lib/mysql-binlog-connector-java-0.19.1.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:245)
[2019-11-19 09:10:45,292] INFO Added plugin 'org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:174)
[2019-11-19 09:10:45,292] INFO Added plugin 'org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:174)
[2019-11-19 09:10:45,292] INFO Added plugin 'org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:174)
[2019-11-19 09:10:45,293] INFO Loading plugin from: /home/ec2-user/share/confluent-hub-components/debezium-debezium-connector-mysql/lib/debezium-connector-mysql-0.10.0.Final.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:222)
[2019-11-19 09:10:45,324] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone:130)
java.lang.NoClassDefFoundError: io/debezium/util/IoUtil
at io.debezium.connector.mysql.Module.(Module.java:19)
at io.debezium.connector.mysql.MySqlConnector.version(MySqlConnector.java:47)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:350)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:355)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:331)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:311)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:244)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:236)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:205)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:182)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:79)
Caused by: java.lang.ClassNotFoundException: io.debezium.util.IoUtil
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 12 more
通过查看日志,io/debezium/util/IoUtil
似乎是一个问题。
(我在 google 和 Whosebug 上搜索了插件路径。我在 Whosebug 中发现了一个类似的问题并进行了跟踪,但仍然无法正常工作。)
我错过了什么?
如果您使用 Confluent Hub 客户端安装连接器,则不需要编辑插件路径。它是递归扫描的,所以 /home/ec2-user/share/confluent-hub-components
应该可以工作。
旁注:我建议将插件存储在 ec2-user 主文件夹之外的其他地方,因为这样长的 运行ning 进程通常 运行 作为他们自己的、有限的用户帐户
我正在尝试将 mysql
与 kafka
连接起来。我已经下载 debezium-debezium-connector-mysql
这是我的 connect-standalone.properties
:
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/ec2-user/share/confluent-hub-components/debezium-debezium-connector-mysql/lib
我的test.config
:
{
"name": "mysql-source-demo-customers",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "dsm1234",
"database.server.id": "42",
"database.server.name": "asgard",
"table.whitelist": "demo.customers",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.demo" ,
"include.schema.changes": "true",
"transforms": "unwrap,InsertTopic,InsertSourceDetails",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field":"messagetopic",
"transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSourceDetails.static.field":"messagesource",
"transforms.InsertSourceDetails.static.value":"Debezium CDC from MySQL on asgard"
}
}
当我运行bin/connect-standalone.sh config/connect-standalone.properties test.config
,
我收到一个错误:
[2019-11-19 09:10:45,146] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectStandalone:78) [2019-11-19 09:10:45,170] INFO Loading plugin from: /home/ec2-user/share/confluent-hub-components/debezium-debezium-connector-mysql/lib/mysql-binlog-connector-java-0.19.1.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:222) [2019-11-19 09:10:45,291] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ec2-user/share/confluent-hub-components/debezium-debezium-connector-mysql/lib/mysql-binlog-connector-java-0.19.1.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:245) [2019-11-19 09:10:45,292] INFO Added plugin 'org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:174) [2019-11-19 09:10:45,292] INFO Added plugin 'org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:174) [2019-11-19 09:10:45,292] INFO Added plugin 'org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:174) [2019-11-19 09:10:45,293] INFO Loading plugin from: /home/ec2-user/share/confluent-hub-components/debezium-debezium-connector-mysql/lib/debezium-connector-mysql-0.10.0.Final.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:222) [2019-11-19 09:10:45,324] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone:130) java.lang.NoClassDefFoundError: io/debezium/util/IoUtil at io.debezium.connector.mysql.Module.(Module.java:19) at io.debezium.connector.mysql.MySqlConnector.version(MySqlConnector.java:47) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:350) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:355) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:331) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:311) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:244) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:236) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:205) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:182) at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:79) Caused by: java.lang.ClassNotFoundException: io.debezium.util.IoUtil at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 12 more
通过查看日志,io/debezium/util/IoUtil
似乎是一个问题。
(我在 google 和 Whosebug 上搜索了插件路径。我在 Whosebug 中发现了一个类似的问题并进行了跟踪,但仍然无法正常工作。)
我错过了什么?
如果您使用 Confluent Hub 客户端安装连接器,则不需要编辑插件路径。它是递归扫描的,所以 /home/ec2-user/share/confluent-hub-components
应该可以工作。
旁注:我建议将插件存储在 ec2-user 主文件夹之外的其他地方,因为这样长的 运行ning 进程通常 运行 作为他们自己的、有限的用户帐户