在 docker 中使用 debezium 的 CDC
CDC with debezium in docker
目前我正在尝试在 docker 中使用 debezium、kafka connect 和 kafka 设置 CDC。
我一直在关注这个指南:https://debezium.io/documentation/reference/tutorial.html
我跳过了启动 MsSQL 数据库部分,因为我有一个本地 SQL 服务器数据库,其配置如 link 中所示:
https://debezium.io/documentation/reference/1.2/connectors/sqlserver.html#setting-up-sqlserver
我当前的 docker 撰写文件如下所示:
version: '2'
services:
zookeeper:
image: "debezium/zookeeper:${DEBEZIUM_VERSION}"
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: "debezium/kafka:${DEBEZIUM_VERSION}"
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
connect:
image: "debezium/connect:${DEBEZIUM_VERSION}"
ports:
- 8083:8083
links:
- kafka
- zookeeper
environment:
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_source_connect_statuses
我在单独的文件中有一个环境变量
DEBEZIUM_VERSION=1.5
当我尝试 运行 这个撰写文件时 returns:
connect_1 | 2021-04-30 07:41:41,568 WARN || [AdminClient clientId=adminclient-1] Connection to node -1 (/0.0.0.0:9092) could not be established. Broker may not be available. [org.apache.kafka.clients.NetworkClient]
connect_1 | 2021-04-30 07:41:42,673 WARN || [AdminClient clientId=adminclient-1] Connection to node -1 (/0.0.0.0:9092) could not be established. Broker may not be available. [org.apache.kafka.clients.NetworkClient]
connect_1 | 2021-04-30 07:41:43,879 WARN || [AdminClient clientId=adminclient-1] Connection to node -1 (/0.0.0.0:9092) could not be established. Broker may not be available. [org.apache.kafka.clients.NetworkClient]
connect_1 | 2021-04-30 07:41:44,844 INFO || [AdminClient clientId=adminclient-1] Metadata update failed [org.apache.kafka.clients.admin.internals.AdminMetadataManager]
connect_1 | org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1619768504843, tries=1, nextAllowedTryMs=1619768504944) timed out at 1619768504844 after 1 attempt(s)
connect_1 | Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata
connect_1 | 2021-04-30 07:41:44,845 INFO || App info kafka.admin.client for adminclient-1 unregistered [org.apache.kafka.common.utils.AppInfoParser]
connect_1 | 2021-04-30 07:41:44,846 INFO || [AdminClient clientId=adminclient-1] Metadata update failed [org.apache.kafka.clients.admin.internals.AdminMetadataManager]
connect_1 | org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1619768534844, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
connect_1 | Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: fetchMetadata
connect_1 | 2021-04-30 07:41:44,853 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics]
connect_1 | 2021-04-30 07:41:44,853 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics]
connect_1 | 2021-04-30 07:41:44,853 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics]
connect_1 | 2021-04-30 07:41:44,853 ERROR || Stopping due to error [org.apache.kafka.connect.cli.ConnectDistributed]
connect_1 | org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
connect_1 | at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
connect_1 | at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
connect_1 | at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:95)
connect_1 | at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
connect_1 | Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1619768504842, tries=1, nextAllowedTryMs=1619768504943) timed out at 1619768504843 after 1 attempt(s)
connect_1 | at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
connect_1 | at org.apache.kafka.common.internals.KafkaFutureImpl.access[=13=]0(KafkaFutureImpl.java:32)
connect_1 | at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
connect_1 | at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
connect_1 | at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
connect_1 | ... 3 more
connect_1 | Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1619768504842, tries=1, nextAllowedTryMs=1619768504943) timed out at 1619768504843 after 1 attempt(s)
connect_1 | Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
docker_connect_1 exited with code 2
编辑:
我在尝试注册 sql 服务器数据库时遇到了一个新问题 运行。我的连接字符串:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d "{ \"name\": \"store-connector\", \"config\": { \"connector.class\": \"io.debezium.connector.sqlserver.SqlServerConnector\", \"database.hostname\": \"localhost\", \"database.port\": \"1433\", \"database.user\": \"sa\", \"database.password\": \"*********\", \"database.dbname\": \"DebeziumKafkaDataBase\", \"database.server.name\": \"MSSQLSERVER\", \"table.whitelist\": \"dbo.Customers\", \"database.history.kafka.bootstrap.servers\": \"kafka:9092\", \"database.history.kafka.topic\": \"dbhistory.MSSQLSERVER\" } }"
在 debezium 连接容器中出现错误:
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The TCP/IP connection to the host localhost, port 1433 has failed. Error: "Connection refused (Connection refused). Verify the connection properties. Make sure that an instance of SQL Server is running on the host and accepting TCP/IP connections at the port. Make sure that TCP connections to the port are not blocked by a firewall.".
我已经尝试向防火墙添加规则以允许端口 1433 并在 SQL 服务器配置管理器中为我的数据库设置协议,如下所示:
JDBC connection failed, error: TCP/IP connection to host failed
我不知道我是否应该为此打开新问题所以我只是把它贴在这里。
您应该将 ADVERTISED_LISTENERS=kafka:9092
添加到 Kafka 服务并将 BOOTSTRAP_SERVERS=kafka:9092
添加到 Debezium 服务
目前我正在尝试在 docker 中使用 debezium、kafka connect 和 kafka 设置 CDC。
我一直在关注这个指南:https://debezium.io/documentation/reference/tutorial.html
我跳过了启动 MsSQL 数据库部分,因为我有一个本地 SQL 服务器数据库,其配置如 link 中所示:
https://debezium.io/documentation/reference/1.2/connectors/sqlserver.html#setting-up-sqlserver
我当前的 docker 撰写文件如下所示:
version: '2'
services:
zookeeper:
image: "debezium/zookeeper:${DEBEZIUM_VERSION}"
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: "debezium/kafka:${DEBEZIUM_VERSION}"
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
connect:
image: "debezium/connect:${DEBEZIUM_VERSION}"
ports:
- 8083:8083
links:
- kafka
- zookeeper
environment:
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_source_connect_statuses
我在单独的文件中有一个环境变量
DEBEZIUM_VERSION=1.5
当我尝试 运行 这个撰写文件时 returns:
connect_1 | 2021-04-30 07:41:41,568 WARN || [AdminClient clientId=adminclient-1] Connection to node -1 (/0.0.0.0:9092) could not be established. Broker may not be available. [org.apache.kafka.clients.NetworkClient]
connect_1 | 2021-04-30 07:41:42,673 WARN || [AdminClient clientId=adminclient-1] Connection to node -1 (/0.0.0.0:9092) could not be established. Broker may not be available. [org.apache.kafka.clients.NetworkClient]
connect_1 | 2021-04-30 07:41:43,879 WARN || [AdminClient clientId=adminclient-1] Connection to node -1 (/0.0.0.0:9092) could not be established. Broker may not be available. [org.apache.kafka.clients.NetworkClient]
connect_1 | 2021-04-30 07:41:44,844 INFO || [AdminClient clientId=adminclient-1] Metadata update failed [org.apache.kafka.clients.admin.internals.AdminMetadataManager]
connect_1 | org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1619768504843, tries=1, nextAllowedTryMs=1619768504944) timed out at 1619768504844 after 1 attempt(s)
connect_1 | Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata
connect_1 | 2021-04-30 07:41:44,845 INFO || App info kafka.admin.client for adminclient-1 unregistered [org.apache.kafka.common.utils.AppInfoParser]
connect_1 | 2021-04-30 07:41:44,846 INFO || [AdminClient clientId=adminclient-1] Metadata update failed [org.apache.kafka.clients.admin.internals.AdminMetadataManager]
connect_1 | org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1619768534844, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
connect_1 | Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: fetchMetadata
connect_1 | 2021-04-30 07:41:44,853 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics]
connect_1 | 2021-04-30 07:41:44,853 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics]
connect_1 | 2021-04-30 07:41:44,853 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics]
connect_1 | 2021-04-30 07:41:44,853 ERROR || Stopping due to error [org.apache.kafka.connect.cli.ConnectDistributed]
connect_1 | org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
connect_1 | at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
connect_1 | at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
connect_1 | at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:95)
connect_1 | at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
connect_1 | Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1619768504842, tries=1, nextAllowedTryMs=1619768504943) timed out at 1619768504843 after 1 attempt(s)
connect_1 | at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
connect_1 | at org.apache.kafka.common.internals.KafkaFutureImpl.access[=13=]0(KafkaFutureImpl.java:32)
connect_1 | at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
connect_1 | at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
connect_1 | at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
connect_1 | ... 3 more
connect_1 | Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1619768504842, tries=1, nextAllowedTryMs=1619768504943) timed out at 1619768504843 after 1 attempt(s)
connect_1 | Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
docker_connect_1 exited with code 2
编辑:
我在尝试注册 sql 服务器数据库时遇到了一个新问题 运行。我的连接字符串:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d "{ \"name\": \"store-connector\", \"config\": { \"connector.class\": \"io.debezium.connector.sqlserver.SqlServerConnector\", \"database.hostname\": \"localhost\", \"database.port\": \"1433\", \"database.user\": \"sa\", \"database.password\": \"*********\", \"database.dbname\": \"DebeziumKafkaDataBase\", \"database.server.name\": \"MSSQLSERVER\", \"table.whitelist\": \"dbo.Customers\", \"database.history.kafka.bootstrap.servers\": \"kafka:9092\", \"database.history.kafka.topic\": \"dbhistory.MSSQLSERVER\" } }"
在 debezium 连接容器中出现错误:
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The TCP/IP connection to the host localhost, port 1433 has failed. Error: "Connection refused (Connection refused). Verify the connection properties. Make sure that an instance of SQL Server is running on the host and accepting TCP/IP connections at the port. Make sure that TCP connections to the port are not blocked by a firewall.".
我已经尝试向防火墙添加规则以允许端口 1433 并在 SQL 服务器配置管理器中为我的数据库设置协议,如下所示: JDBC connection failed, error: TCP/IP connection to host failed
我不知道我是否应该为此打开新问题所以我只是把它贴在这里。
您应该将 ADVERTISED_LISTENERS=kafka:9092
添加到 Kafka 服务并将 BOOTSTRAP_SERVERS=kafka:9092
添加到 Debezium 服务