MySQL 到 Confluent Enterprise Kafka 数据摄取
MySQL to Confluent Enterprise Kafka data ingestion
我们有一个 3 节点融合企业 kafka 集群(linux 本地),一个节点有 kafka 连接服务 运行。我们想使用 mysql 将数据提取到 kafka 主题中。
尝试了以下内容 -
1.Installed mysql 在我的本地 windows 桌面上,创建了数据库,table 并在其中插入了一些数据。
2.Created 包含以下详细信息的 source-quickstart-mysql.properties
文件-
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://<IPAddressOfLocalMachine>:3306/test_db?user=root&password=pwd
tables.whitelist=emp
mode=incrementing
incrementing.column.name=empid
topic.prefix=test-mysql-jdbc-
connect-standalone.properties
有这个信息:
bootstrap.servers=IPaddressOfKCnode:9092
plugin.path=/usr/share/java
重启kafka连接服务
试图向 kafka 连接服务提交连接到我的 sql -
的请求
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IPaddressOfKCnode:8083/connectors/ -d '{"name": "emp-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://IPaddressOfLocalMachine:3306/test_db?user=root&password=pwd","table.whitelist": "emp","mode": "timestamp","topic.prefix": "mysql-" } }'
此处出现以下错误:
{"error_code":400,"message":"Connector configuration is invalid and contains the following 2 error(s):\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd for configuration Couldn't open connection to jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=admin for configuration Couldn't open connection to jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}
我也尝试过以下事情-
a.Stopped kafka连接服务和运行手动-
systemctl stop confluent-kafka-connect
b。 运行 像这样连接
/usr/bin/connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-mysql.properties
这个进程一开始成功启动,但一段时间后就死掉了。这是日志:
[2018-11-10 19:42:53,027] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2018-11-10 19:42:53,048] INFO AbstractConfig values:
batch.max.rows = 100
catalog.pattern = null
connection.attempts = 3
connection.backoff.ms = 10000
connection.password = null
connection.url = jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd
connection.user = null
dialect.name =
incrementing.column.name = empid
mode = incrementing
numeric.mapping = null
numeric.precision.mapping = false
poll.interval.ms = 5000
query =
schema.pattern = null
table.blacklist = []
table.poll.interval.ms = 60000
table.types = [TABLE]
table.whitelist = []
timestamp.column.name = []
timestamp.delay.interval.ms = 0
topic.prefix = test-mysql-jdbc-
validate.non.null = true
(org.apache.kafka.common.config.AbstractConfig:279)
[2018-11-10 19:45:00,439] INFO AbstractConfig values:
batch.max.rows = 100
catalog.pattern = null
connection.attempts = 3
connection.backoff.ms = 10000
connection.password = null
connection.url = jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=admin
connection.user = null
dialect.name =
incrementing.column.name = empid
mode = incrementing
numeric.mapping = null
numeric.precision.mapping = false
poll.interval.ms = 5000
query =
schema.pattern = null
table.blacklist = []
table.poll.interval.ms = 60000
table.types = [TABLE]
table.whitelist = []
timestamp.column.name = []
timestamp.delay.interval.ms = 0
topic.prefix = test-mysql-jdbc-
validate.non.null = true
(org.apache.kafka.common.config.AbstractConfig:279)
[2018-11-10 19:47:07,666] ERROR Failed to create job for /etc/kafka-connect-jdbc/source-quickstart-mysql.properties (org.apache.kafka.connect.cli.ConnectStandalone:102)
[2018-11-10 19:47:07,668] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:110)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:415)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:189)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
[2018-11-10 19:47:07,669] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)
我无法在 confluent 网站上找到关于将 Kafka Connect 服务用于其各种连接器、配置等的流畅和适当的文档。
请帮助获得实施数据摄取管道的正确步骤:mySQL--kafkaconnect--kafka
最后我期望 mysql table 中的插入在 kafka 主题和 kafka 消费者中产生数据以显示这些记录。
这种摄取似乎很简单,但我遗漏了一些基本的连接属性:(
谢谢!
这似乎是 JDBC 连接器的问题。你 运行 是什么 MySQL 版本?
要解决此问题,您需要:
首先,您的错误是从 curl
命令输出返回给您的:
Connector configuration is invalid and contains the following 2 error(s)
java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd
因此,您在 Kafka Connect 路径中缺少 MySQL 的 JDBC 驱动程序。
第二个错误出现在您发布的输出中:
Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure.
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
这表明 Kafka Connect 存在问题,无法访问您的 MySQL 机器。
你来自哪里 运行 Confluent Platform,它在 Docker 中,MySQL 的本地机器等等? 192.168.178.14
是您的 MySQL 服务器的地址吗?可以从 Kafka Connect 所在的主机到达 运行 吗?
您可以找到几个使用 Kafka 设置 MySQL 的示例:
- https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/ - 这使用 JDBC 连接器,正如您所做的那样
- https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/ - 这使用 Debezium,一种基于日志的 CDC 工具
有关 JDBC 连接器与基于日志的 CDC 的优缺点,请参阅 https://www.confluent.io/blog/no-more-silos-how-to-integrate-your-databases-with-apache-kafka-and-cdc。
免责声明:我写了以上博文。
谢谢 Robin 和 Giorgos 的回答!它帮助了很多。
这个问题与几件事有关 -
1. 缺少 MySQL JDBC 连接器罐。
我们必须将 MySQL Connector/J 8.0.13 放在 /usr/share/java/kafka-connect-jdbc/
.
下
2.The 连接问题是因为 MySQL kafka connect 尝试连接的用户没有足够的权限连接到远程连接服务。
为此,我创建了一个新的 mySQL 用户,具有对远程服务器(Kafka 连接)的完全权限和访问权限。
完成上述步骤后,重新启动 kafka-connect,摄取管道开始工作。
我们有一个 3 节点融合企业 kafka 集群(linux 本地),一个节点有 kafka 连接服务 运行。我们想使用 mysql 将数据提取到 kafka 主题中。
尝试了以下内容 -
1.Installed mysql 在我的本地 windows 桌面上,创建了数据库,table 并在其中插入了一些数据。
2.Created 包含以下详细信息的 source-quickstart-mysql.properties
文件-
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://<IPAddressOfLocalMachine>:3306/test_db?user=root&password=pwd
tables.whitelist=emp
mode=incrementing
incrementing.column.name=empid
topic.prefix=test-mysql-jdbc-
connect-standalone.properties
有这个信息:
bootstrap.servers=IPaddressOfKCnode:9092
plugin.path=/usr/share/java
重启kafka连接服务
试图向 kafka 连接服务提交连接到我的 sql -
的请求
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IPaddressOfKCnode:8083/connectors/ -d '{"name": "emp-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://IPaddressOfLocalMachine:3306/test_db?user=root&password=pwd","table.whitelist": "emp","mode": "timestamp","topic.prefix": "mysql-" } }'
此处出现以下错误:
{"error_code":400,"message":"Connector configuration is invalid and contains the following 2 error(s):\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd for configuration Couldn't open connection to jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=admin for configuration Couldn't open connection to jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}
我也尝试过以下事情-
a.Stopped kafka连接服务和运行手动-
systemctl stop confluent-kafka-connect
b。 运行 像这样连接
/usr/bin/connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-mysql.properties
这个进程一开始成功启动,但一段时间后就死掉了。这是日志:
[2018-11-10 19:42:53,027] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2018-11-10 19:42:53,048] INFO AbstractConfig values:
batch.max.rows = 100
catalog.pattern = null
connection.attempts = 3
connection.backoff.ms = 10000
connection.password = null
connection.url = jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd
connection.user = null
dialect.name =
incrementing.column.name = empid
mode = incrementing
numeric.mapping = null
numeric.precision.mapping = false
poll.interval.ms = 5000
query =
schema.pattern = null
table.blacklist = []
table.poll.interval.ms = 60000
table.types = [TABLE]
table.whitelist = []
timestamp.column.name = []
timestamp.delay.interval.ms = 0
topic.prefix = test-mysql-jdbc-
validate.non.null = true
(org.apache.kafka.common.config.AbstractConfig:279)
[2018-11-10 19:45:00,439] INFO AbstractConfig values:
batch.max.rows = 100
catalog.pattern = null
connection.attempts = 3
connection.backoff.ms = 10000
connection.password = null
connection.url = jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=admin
connection.user = null
dialect.name =
incrementing.column.name = empid
mode = incrementing
numeric.mapping = null
numeric.precision.mapping = false
poll.interval.ms = 5000
query =
schema.pattern = null
table.blacklist = []
table.poll.interval.ms = 60000
table.types = [TABLE]
table.whitelist = []
timestamp.column.name = []
timestamp.delay.interval.ms = 0
topic.prefix = test-mysql-jdbc-
validate.non.null = true
(org.apache.kafka.common.config.AbstractConfig:279)
[2018-11-10 19:47:07,666] ERROR Failed to create job for /etc/kafka-connect-jdbc/source-quickstart-mysql.properties (org.apache.kafka.connect.cli.ConnectStandalone:102)
[2018-11-10 19:47:07,668] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:110)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:415)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:189)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
[2018-11-10 19:47:07,669] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)
我无法在 confluent 网站上找到关于将 Kafka Connect 服务用于其各种连接器、配置等的流畅和适当的文档。 请帮助获得实施数据摄取管道的正确步骤:mySQL--kafkaconnect--kafka
最后我期望 mysql table 中的插入在 kafka 主题和 kafka 消费者中产生数据以显示这些记录。 这种摄取似乎很简单,但我遗漏了一些基本的连接属性:(
谢谢!
这似乎是 JDBC 连接器的问题。你 运行 是什么 MySQL 版本? 要解决此问题,您需要:
首先,您的错误是从 curl
命令输出返回给您的:
Connector configuration is invalid and contains the following 2 error(s)
java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd
因此,您在 Kafka Connect 路径中缺少 MySQL 的 JDBC 驱动程序。
第二个错误出现在您发布的输出中:
Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure.
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
这表明 Kafka Connect 存在问题,无法访问您的 MySQL 机器。
你来自哪里 运行 Confluent Platform,它在 Docker 中,MySQL 的本地机器等等? 192.168.178.14
是您的 MySQL 服务器的地址吗?可以从 Kafka Connect 所在的主机到达 运行 吗?
您可以找到几个使用 Kafka 设置 MySQL 的示例:
- https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/ - 这使用 JDBC 连接器,正如您所做的那样
- https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/ - 这使用 Debezium,一种基于日志的 CDC 工具
有关 JDBC 连接器与基于日志的 CDC 的优缺点,请参阅 https://www.confluent.io/blog/no-more-silos-how-to-integrate-your-databases-with-apache-kafka-and-cdc。
免责声明:我写了以上博文。
谢谢 Robin 和 Giorgos 的回答!它帮助了很多。
这个问题与几件事有关 -
1. 缺少 MySQL JDBC 连接器罐。
我们必须将 MySQL Connector/J 8.0.13 放在 /usr/share/java/kafka-connect-jdbc/
.
2.The 连接问题是因为 MySQL kafka connect 尝试连接的用户没有足够的权限连接到远程连接服务。 为此,我创建了一个新的 mySQL 用户,具有对远程服务器(Kafka 连接)的完全权限和访问权限。
完成上述步骤后,重新启动 kafka-connect,摄取管道开始工作。