Debezium SQL 服务器源连接器设置 Kafka 代理

Debezium SQL Server Source Connector set Kafka broker

我正在尝试设置 docker Confluent Kafka Platform 环境并与 Debezium SQL Server Source Connector 集成。

我按照 this Confluent's guide for the Kafka platform, then this Debezium 教程学习 SQL 服务器源连接器。

我的代理容器名为 broker,这个容器和其他容器(indclue connect 容器)在同一网络中,我确保它们可以相互 ping 和 telnet。

在 Debezium 教程中,我卡在了 Start The Debezium SQL Server Connector 的步骤中,因为我收到一个错误,表明连接器正在尝试通过 localhost:9092 而不是 [=20= 访问 Kafka 代理]:

[2020-03-29 10:46:30,907] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:32,114] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:32,969] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:34,127] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:35,333] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:36,238] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

直到最终超时:

[2020-03-29 10:46:38,664] ERROR WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times in 60000ms

有趣的是,我可以在日志的开头看到我的配置已成功接收(查找 broker:9092):

[2020-03-29 10:45:38,618] INFO    database.history.kafka.bootstrap.servers = broker:9092 (io.debezium.connector.common.BaseSourceTask)

...

[2020-03-29 10:45:38,655] INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [broker:9092]
        check.crcs = true
        client.dns.lookup = default
        client.id = server1-dbhistory
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = server1-dbhistory
        group.instance.id = null

这是我的配置文件:register-sqlserver.json

{
 "name": "inventory-connector",
 "config": {
     "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
     "tasks.max" : "1",
     "database.server.name" : "server1",
     "database.hostname" : "sqlserver_1",
     "database.port" : "1433",
     "database.user" : "sa",
     "database.password" : "Password!",
     "database.dbname" : "testDB",
     "database.history.kafka.bootstrap.servers" : "broker:9092",
     "database.history.kafka.topic": "schema-changes.inventory"
     }
 }  

我通过主机添加连接器如下(就像指南一样):

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json  

我显示的日志是 connect 容器日志的输出。

我的完整日志中没有其他 localhost 词,因此不用担心其他默认值 localhost 的配置可能会被我错过。

将不胜感激:)

问题在于广告听众

您正在连接到 9092 上的代理,根据 the config 是针对将其主机通告为 localhost

的侦听器
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092

这意味着客户端(在本例中为 Debezium)最初将连接到您提供给它的 bootstrap 服务器(broker:9092),但经纪人会将广告交还给客户端host (localhost) - 然后客户端将尝试连接到它。由于它们在不同的实例上,localhost 对于 Debeziu,连接器不是代理,因此连接失败。

参考:https://rmoff.net/2018/08/02/kafka-listeners-explained/

解法:

使用端口 29092,根据上述配置绑定到 broker 公布的主机, 从 Debezium 容器正确解析

"database.history.kafka.bootstrap.servers" : "broker:29092"