无法连接:通信 link 失败
Unable to connect: Communications link failure
我正在努力学习教程 Deploying Debezium using the new KafkaConnector resource。
根据教程,我也在使用 minikube,但使用 docker 驱动程序。基本上就是一步一步来。
但是,对于“创建连接器”步骤,在通过
创建连接器之后
cat <<EOF | kubectl -n kafka apply -f -
apiVersion: "kafka.strimzi.io/v1alpha1"
kind: "KafkaConnector"
metadata:
name: "inventory-connector"
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
database.hostname: 192.168.99.1
database.port: "3306"
database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}"
database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}"
database.server.id: "184054"
database.server.name: "dbserver1"
database.whitelist: "inventory"
database.history.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
database.history.kafka.topic: "schema-changes.inventory"
include.schema.changes: "true"
EOF
并通过
检查
kubectl -n kafka get kctr inventory-connector -o yaml
我收到错误
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"192.168.49.2","database.password":"","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
creationTimestamp: "2021-09-29T18:20:11Z"
generation: 1
labels:
strimzi.io/cluster: my-connect-cluster
name: inventory-connector
namespace: kafka
resourceVersion: "12777"
uid: 083df9a3-83ce-4170-a9bc-9573dafdb286
spec:
class: io.debezium.connector.mysql.MySqlConnector
config:
database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
database.history.kafka.topic: schema-changes.inventory
database.hostname: 192.168.49.2
database.password: ""
database.port: "3306"
database.server.id: "184054"
database.server.name: dbserver1
database.user: ""
database.whitelist: inventory
include.schema.changes: "true"
tasksMax: 1
status:
conditions:
- lastTransitionTime: "2021-09-29T18:20:11.548Z"
message: |-
PUT /connectors/inventory-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s):
A value is required
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
reason: ConnectRestException
status: "True"
type: NotReady
observedGeneration: 1
我试着改变
database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}"
database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}"
至
database.user: "debezium"
database.password: "dbz"
根据“保护数据库凭据”步骤中的用户和密码信息直接重新申请。
另外,根据教程中的描述
I’m using database.hostname: 192.168.99.1
as IP address for connecting to MySQL because I’m using minikube with the virtualbox VM driver If you’re using a different VM driver with minikube
you might need a different IP address.
其实我对上面的描述有点疑惑。 demo中的MySQL部署在Docker中,其余部分如Kafka等部署在minikube中。为什么关于 database.hostname
的描述说的是 minikube 而不是 Docker?
反正当我运行minikube ip
的时候,我得到了192.168.49.2
。但是,在我将 database.hostname
更改为 192.168.49.2
和 运行 kubectl get kctr inventory-connector -o yaml -n kafka
之后,我得到了
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"192.168.49.2","database.password":"","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
creationTimestamp: "2021-09-29T18:20:11Z"
generation: 1
labels:
strimzi.io/cluster: my-connect-cluster
name: inventory-connector
namespace: kafka
resourceVersion: "12777"
uid: 083df9a3-83ce-4170-a9bc-9573dafdb286
spec:
class: io.debezium.connector.mysql.MySqlConnector
config:
database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
database.history.kafka.topic: schema-changes.inventory
database.hostname: 192.168.49.2
database.password: ""
database.port: "3306"
database.server.id: "184054"
database.server.name: dbserver1
database.user: ""
database.whitelist: inventory
include.schema.changes: "true"
tasksMax: 1
status:
conditions:
- lastTransitionTime: "2021-09-29T18:20:11.548Z"
message: |-
PUT /connectors/inventory-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s):
A value is required
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
reason: ConnectRestException
status: "True"
type: NotReady
observedGeneration: 1
我可以通过 localhost
访问 MySQL,因为它托管在 Docker 中。
但是,当我将 database.hostname
更改为 localhost
.
时,我仍然出现同样的错误
有什么想法吗?谢谢!
问题与 minikube 中的服务无法与 docker 中的 MySQL 通信有关。
关于如何从 Kubernetes 集群内部访问主机的 localhost
,我发现
然而,我最终通过
在 Kubernetes 方向部署 MySQL
kubectl apply -f https://k8s.io/examples/application/mysql/mysql-pv.yaml
kubectl apply -f https://k8s.io/examples/application/mysql/mysql-deployment.yaml
(复制自 https://kubernetes.io/docs/tasks/run-application/run-single-instance-stateful-application/)
和
database.hostname: "mysql.default" # service `mysql` in namespace `default`
database.port: "3306"
database.user: "root"
database.password: "password"
现在当我运行
kubectl -n kafka get kctr inventory-connector -o yaml
我收到一个新错误,提示 MySQL 未启用行级二进制日志,但是,这意味着它现在可以连接 MySQL。
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"mysql.default","database.password":"password","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"root","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
creationTimestamp: "2021-09-29T19:36:52Z"
generation: 1
labels:
strimzi.io/cluster: my-connect-cluster
name: inventory-connector
namespace: kafka
resourceVersion: "2918"
uid: 48bb46e1-42bb-4574-a3dc-221ae7d6a803
spec:
class: io.debezium.connector.mysql.MySqlConnector
config:
database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
database.history.kafka.topic: schema-changes.inventory
database.hostname: mysql.default
database.password: password
database.port: "3306"
database.server.id: "184054"
database.server.name: dbserver1
database.user: root
database.whitelist: inventory
include.schema.changes: "true"
tasksMax: 1
status:
conditions:
- lastTransitionTime: "2021-09-29T19:36:53.605Z"
status: "True"
type: Ready
connectorStatus:
connector:
state: UNASSIGNED
worker_id: 172.17.0.8:8083
name: inventory-connector
tasks:
- id: 0
state: FAILED
trace: "org.apache.kafka.connect.errors.ConnectException: The MySQL server is
not configured to use a row-level binlog, which is required for this connector
to work properly. Change the MySQL configuration to use a row-level binlog
and restart the connector.\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:207)\n\tat
io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)\n\tat
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
java.lang.Thread.run(Thread.java:748)\n"
worker_id: 172.17.0.8:8083
type: source
observedGeneration: 1
我正在努力学习教程 Deploying Debezium using the new KafkaConnector resource。 根据教程,我也在使用 minikube,但使用 docker 驱动程序。基本上就是一步一步来。
但是,对于“创建连接器”步骤,在通过
创建连接器之后cat <<EOF | kubectl -n kafka apply -f -
apiVersion: "kafka.strimzi.io/v1alpha1"
kind: "KafkaConnector"
metadata:
name: "inventory-connector"
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
database.hostname: 192.168.99.1
database.port: "3306"
database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}"
database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}"
database.server.id: "184054"
database.server.name: "dbserver1"
database.whitelist: "inventory"
database.history.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
database.history.kafka.topic: "schema-changes.inventory"
include.schema.changes: "true"
EOF
并通过
检查kubectl -n kafka get kctr inventory-connector -o yaml
我收到错误
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"192.168.49.2","database.password":"","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
creationTimestamp: "2021-09-29T18:20:11Z"
generation: 1
labels:
strimzi.io/cluster: my-connect-cluster
name: inventory-connector
namespace: kafka
resourceVersion: "12777"
uid: 083df9a3-83ce-4170-a9bc-9573dafdb286
spec:
class: io.debezium.connector.mysql.MySqlConnector
config:
database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
database.history.kafka.topic: schema-changes.inventory
database.hostname: 192.168.49.2
database.password: ""
database.port: "3306"
database.server.id: "184054"
database.server.name: dbserver1
database.user: ""
database.whitelist: inventory
include.schema.changes: "true"
tasksMax: 1
status:
conditions:
- lastTransitionTime: "2021-09-29T18:20:11.548Z"
message: |-
PUT /connectors/inventory-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s):
A value is required
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
reason: ConnectRestException
status: "True"
type: NotReady
observedGeneration: 1
我试着改变
database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}"
database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}"
至
database.user: "debezium"
database.password: "dbz"
根据“保护数据库凭据”步骤中的用户和密码信息直接重新申请。
另外,根据教程中的描述
I’m using
database.hostname: 192.168.99.1
as IP address for connecting to MySQL because I’m using minikube with the virtualbox VM driver If you’re using a different VM driver withminikube
you might need a different IP address.
其实我对上面的描述有点疑惑。 demo中的MySQL部署在Docker中,其余部分如Kafka等部署在minikube中。为什么关于 database.hostname
的描述说的是 minikube 而不是 Docker?
反正当我运行minikube ip
的时候,我得到了192.168.49.2
。但是,在我将 database.hostname
更改为 192.168.49.2
和 运行 kubectl get kctr inventory-connector -o yaml -n kafka
之后,我得到了
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"192.168.49.2","database.password":"","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
creationTimestamp: "2021-09-29T18:20:11Z"
generation: 1
labels:
strimzi.io/cluster: my-connect-cluster
name: inventory-connector
namespace: kafka
resourceVersion: "12777"
uid: 083df9a3-83ce-4170-a9bc-9573dafdb286
spec:
class: io.debezium.connector.mysql.MySqlConnector
config:
database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
database.history.kafka.topic: schema-changes.inventory
database.hostname: 192.168.49.2
database.password: ""
database.port: "3306"
database.server.id: "184054"
database.server.name: dbserver1
database.user: ""
database.whitelist: inventory
include.schema.changes: "true"
tasksMax: 1
status:
conditions:
- lastTransitionTime: "2021-09-29T18:20:11.548Z"
message: |-
PUT /connectors/inventory-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s):
A value is required
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
reason: ConnectRestException
status: "True"
type: NotReady
observedGeneration: 1
我可以通过 localhost
访问 MySQL,因为它托管在 Docker 中。
但是,当我将 database.hostname
更改为 localhost
.
有什么想法吗?谢谢!
问题与 minikube 中的服务无法与 docker 中的 MySQL 通信有关。
关于如何从 Kubernetes 集群内部访问主机的 localhost
,我发现
然而,我最终通过
在 Kubernetes 方向部署 MySQLkubectl apply -f https://k8s.io/examples/application/mysql/mysql-pv.yaml
kubectl apply -f https://k8s.io/examples/application/mysql/mysql-deployment.yaml
(复制自 https://kubernetes.io/docs/tasks/run-application/run-single-instance-stateful-application/)
和
database.hostname: "mysql.default" # service `mysql` in namespace `default`
database.port: "3306"
database.user: "root"
database.password: "password"
现在当我运行
kubectl -n kafka get kctr inventory-connector -o yaml
我收到一个新错误,提示 MySQL 未启用行级二进制日志,但是,这意味着它现在可以连接 MySQL。
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"mysql.default","database.password":"password","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"root","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
creationTimestamp: "2021-09-29T19:36:52Z"
generation: 1
labels:
strimzi.io/cluster: my-connect-cluster
name: inventory-connector
namespace: kafka
resourceVersion: "2918"
uid: 48bb46e1-42bb-4574-a3dc-221ae7d6a803
spec:
class: io.debezium.connector.mysql.MySqlConnector
config:
database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
database.history.kafka.topic: schema-changes.inventory
database.hostname: mysql.default
database.password: password
database.port: "3306"
database.server.id: "184054"
database.server.name: dbserver1
database.user: root
database.whitelist: inventory
include.schema.changes: "true"
tasksMax: 1
status:
conditions:
- lastTransitionTime: "2021-09-29T19:36:53.605Z"
status: "True"
type: Ready
connectorStatus:
connector:
state: UNASSIGNED
worker_id: 172.17.0.8:8083
name: inventory-connector
tasks:
- id: 0
state: FAILED
trace: "org.apache.kafka.connect.errors.ConnectException: The MySQL server is
not configured to use a row-level binlog, which is required for this connector
to work properly. Change the MySQL configuration to use a row-level binlog
and restart the connector.\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:207)\n\tat
io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)\n\tat
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
java.lang.Thread.run(Thread.java:748)\n"
worker_id: 172.17.0.8:8083
type: source
observedGeneration: 1