无法在启用 SSL 的 Kafka 集群中注册 Debezium(Kafka-Connect)连接器

Unable to register Debezium (Kafka-Connect) connector in SSL enabled Kafka cluster

我正在尝试在启用 SSL 的 Kafka 集群中注册一个 MySql Debezium 连接器。我为此目的使用的卷曲是:

curl -k -X POST -H "Accept:application/json"  -H "Content-Type:application/json" https://<IPADDRESS>:8083/connectors/  -d '{ "name": "test-eds-extactor-profile", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<DBHOSTNAME>", "database.port": "3306", "database.user": "debezium", "database.password": "*****", "database.server.id": "1", "database.server.name": "MySQL-Database-Docker", "database.history.kafka.bootstrap.servers": "<IPADDRESS>:9094", "database.history.kafka.topic": "dbhistory.profile" , "include.schema.changes": "true", "table.whitelist": "test_eds_extraction_src_db_mock.profile", "database.history.producer.security.protocol": "SASL_PLAINTEXT", "database.history.producer.ssl.keystore.location": "path/to/server.jks", "database.history.producer.ssl.keystore.password": "******", "database.history.producer.ssl.truststore.location": "path/to//server.jks", "database.history.producer.ssl.truststore.password": "******", "database.history.producer.ssl.key.password": "******", "database.history.consumer.security.protocol": "SASL_PLAINTEXT", "database.history.consumer.ssl.keystore.location": "path/to/server.jks", "database.history.consumer.ssl.keystore.password": "******", "database.history.consumer.ssl.truststore.location": "path/to/server.jks", "database.history.consumer.ssl.truststore.password": "******", "database.history.consumer.ssl.key.password": "******" } }'

Debezium 无法创建 database.history 主题,失败并出现以下错误:

{"name":"test-eds-extactor-profile","connector":{"state":"RUNNING","worker_id":"<IPADDRESS>:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"<IPADDRESS>:8083","trace":"org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:273)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:297)\n\tat io.debezium.relational.history.KafkaDatabaseHistory.start(KafkaDatabaseHistory.java:171)\n\tat io.debezium.connector.mysql.MySqlSchema.start(MySqlSchema.java:161)\n\tat io.debezium.connector.mysql.MySqlTaskContext.start(MySqlTaskContext.java:255)\n\tat io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:330)\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:136)\n\t... 9 more\nCaused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config\n\tat org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:153)\n\tat org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)\n\tat org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)\n\tat org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:108)\n\tat org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:437)\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:418)\n\t... 15 more\nCaused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config\n\tat org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)\n\tat org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)\n\tat org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:60)\n\tat org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:111)\n\tat org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:142)\n\t... 20 more\n"}],"type":"source"}

美化错误:

Failed to construct kafka producer
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:273)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:297)
    at io.debezium.relational.history.KafkaDatabaseHistory.start(KafkaDatabaseHistory.java:171)
    at io.debezium.connector.mysql.MySqlSchema.start(MySqlSchema.java:161)
    at io.debezium.connector.mysql.MySqlTaskContext.start(MySqlTaskContext.java:255)
    at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:330)
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:136)
    ... 9 more
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:153)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:108)
    at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:437)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:418)
    ... 15 more
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
    at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)\n\tat org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:60)
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:111)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:142)
    ... 20 more

此错误表明 jaas 配置对您的 kafka 客户端不可见。要解决此问题,您可以导出以下变量:

export KAFKA_OPTS="-Djava.security.auth.login.config=path/to/jaas.conf"

注册连接器时,您需要在 JSON 正文中添加 SSL 属性:

database.history.producer.security.protocol=SSL
database.history.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.producer.ssl.keystore.password=test1234
database.history.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.producer.ssl.truststore.password=test1234
database.history.producer.ssl.key.password=test1234
database.history.consumer.security.protocol=SSL
database.history.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.consumer.ssl.keystore.password=test1234
database.history.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.consumer.ssl.truststore.password=test1234
database.history.consumer.ssl.key.password=test1234

参考:https://debezium.io/docs/connectors/mysql/(滚动到末尾)