无法连接:通信 link 失败

Unable to connect: Communications link failure

我正在努力学习教程 Deploying Debezium using the new KafkaConnector resource。 根据教程,我也在使用 minikube,但使用 docker 驱动程序。基本上就是一步一步来。

但是,对于“创建连接器”步骤,在通过

创建连接器之后
cat <<EOF | kubectl -n kafka apply -f -
apiVersion: "kafka.strimzi.io/v1alpha1"
kind: "KafkaConnector"
metadata:
  name: "inventory-connector"
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    database.hostname: 192.168.99.1
    database.port: "3306"
    database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}"
    database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}"
    database.server.id: "184054"
    database.server.name: "dbserver1"
    database.whitelist: "inventory"
    database.history.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
    database.history.kafka.topic: "schema-changes.inventory"
    include.schema.changes: "true" 
EOF

并通过

检查
kubectl -n kafka get kctr inventory-connector -o yaml

我收到错误

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"192.168.49.2","database.password":"","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
  creationTimestamp: "2021-09-29T18:20:11Z"
  generation: 1
  labels:
    strimzi.io/cluster: my-connect-cluster
  name: inventory-connector
  namespace: kafka
  resourceVersion: "12777"
  uid: 083df9a3-83ce-4170-a9bc-9573dafdb286
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
    database.history.kafka.topic: schema-changes.inventory
    database.hostname: 192.168.49.2
    database.password: ""
    database.port: "3306"
    database.server.id: "184054"
    database.server.name: dbserver1
    database.user: ""
    database.whitelist: inventory
    include.schema.changes: "true"
  tasksMax: 1
status:
  conditions:
  - lastTransitionTime: "2021-09-29T18:20:11.548Z"
    message: |-
      PUT /connectors/inventory-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s):
      A value is required
      You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
    reason: ConnectRestException
    status: "True"
    type: NotReady
  observedGeneration: 1

我试着改变

database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}"
database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}"

database.user: "debezium"
database.password: "dbz"

根据“保护数据库凭据”步骤中的用户和密码信息直接重新申请。

另外,根据教程中的描述

I’m using database.hostname: 192.168.99.1 as IP address for connecting to MySQL because I’m using minikube with the virtualbox VM driver If you’re using a different VM driver with minikube you might need a different IP address.

其实我对上面的描述有点疑惑。 demo中的MySQL部署在Docker中,其余部分如Kafka等部署在minikube中。为什么关于 database.hostname 的描述说的是 minikube 而不是 Docker?

反正当我运行minikube ip的时候,我得到了192.168.49.2。但是,在我将 database.hostname 更改为 192.168.49.2 和 运行 kubectl get kctr inventory-connector -o yaml -n kafka 之后,我得到了

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"192.168.49.2","database.password":"","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
  creationTimestamp: "2021-09-29T18:20:11Z"
  generation: 1
  labels:
    strimzi.io/cluster: my-connect-cluster
  name: inventory-connector
  namespace: kafka
  resourceVersion: "12777"
  uid: 083df9a3-83ce-4170-a9bc-9573dafdb286
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
    database.history.kafka.topic: schema-changes.inventory
    database.hostname: 192.168.49.2
    database.password: ""
    database.port: "3306"
    database.server.id: "184054"
    database.server.name: dbserver1
    database.user: ""
    database.whitelist: inventory
    include.schema.changes: "true"
  tasksMax: 1
status:
  conditions:
  - lastTransitionTime: "2021-09-29T18:20:11.548Z"
    message: |-
      PUT /connectors/inventory-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s):
      A value is required
      You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
    reason: ConnectRestException
    status: "True"
    type: NotReady
  observedGeneration: 1

我可以通过 localhost 访问 MySQL,因为它托管在 Docker 中。 但是,当我将 database.hostname 更改为 localhost.

时,我仍然出现同样的错误

有什么想法吗?谢谢!

问题与 minikube 中的服务无法与 docker 中的 MySQL 通信有关。

关于如何从 Kubernetes 集群内部访问主机的 localhost,我发现

然而,我最终通过

在 Kubernetes 方向部署 MySQL
kubectl apply -f https://k8s.io/examples/application/mysql/mysql-pv.yaml
kubectl apply -f https://k8s.io/examples/application/mysql/mysql-deployment.yaml

(复制自 https://kubernetes.io/docs/tasks/run-application/run-single-instance-stateful-application/

database.hostname: "mysql.default" # service `mysql` in namespace `default`
database.port: "3306"
database.user: "root"
database.password: "password"

现在当我运行

kubectl -n kafka get kctr inventory-connector -o yaml

我收到一个新错误,提示 MySQL 未启用行级二进制日志,但是,这意味着它现在可以连接 MySQL。

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"mysql.default","database.password":"password","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"root","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}}
  creationTimestamp: "2021-09-29T19:36:52Z"
  generation: 1
  labels:
    strimzi.io/cluster: my-connect-cluster
  name: inventory-connector
  namespace: kafka
  resourceVersion: "2918"
  uid: 48bb46e1-42bb-4574-a3dc-221ae7d6a803
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
    database.history.kafka.topic: schema-changes.inventory
    database.hostname: mysql.default
    database.password: password
    database.port: "3306"
    database.server.id: "184054"
    database.server.name: dbserver1
    database.user: root
    database.whitelist: inventory
    include.schema.changes: "true"
  tasksMax: 1
status:
  conditions:
  - lastTransitionTime: "2021-09-29T19:36:53.605Z"
    status: "True"
    type: Ready
  connectorStatus:
    connector:
      state: UNASSIGNED
      worker_id: 172.17.0.8:8083
    name: inventory-connector
    tasks:
    - id: 0
      state: FAILED
      trace: "org.apache.kafka.connect.errors.ConnectException: The MySQL server is
        not configured to use a row-level binlog, which is required for this connector
        to work properly. Change the MySQL configuration to use a row-level binlog
        and restart the connector.\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:207)\n\tat
        io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)\n\tat
        org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)\n\tat
        org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat
        org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat
        java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
        java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
        java.lang.Thread.run(Thread.java:748)\n"
      worker_id: 172.17.0.8:8083
    type: source
  observedGeneration: 1