流式 KafkaConnect 和连接器错误,未加载

Strimzi KafkaConnect & Connector Error, Won't Load

我不确定还有什么地方可以转向,因为我看到的所有示例我已经复制了很多,但仍然无法正常工作。连接器将不会安装并声明空密码。我已经验证了每个步骤,但无法正常工作。这是我采取的步骤。

容器

FROM strimzi/kafka:0.16.1-kafka-2.4.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./debezium-connector-mysql/ /opt/kafka/plugins/debezium/
USER 1001

接下来,我创建用于 mySQL 的密钥。

cat <<EOF | kubectl apply -n kafka-cloud -f - 
apiVersion: v1
kind: Secret
metadata:
  name: mysql-auth
type: Opaque
stringData:
  mysql-auth.properties: |-
    username: root
    password: supersecret
EOF

验证

% kubectl -n kafka-cloud get secrets | grep mysql-auth
mysql-auth                                     Opaque                                1      14m

仔细检查以确保用户和密码不为空,因为连接器状态错误。

% kubectl -n kafka-cloud get secret mysql-auth -o yaml

apiVersion: v1
data:
  mysql-auth.properties: dXNlcm5hbWU6IHJvb3QKcGFzc3dvcmQ6IHN1cGVyc2VjcmV0
kind: Secret
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"v1","kind":"Secret","metadata":{"annotations":{},"name":"mysql-auth","namespace":"kafka-cloud"},"stringData":{"mysql-auth.properties":"username: root\npassword: supersecret"},"type":"Opaque"}
  creationTimestamp: "2022-03-02T23:48:55Z"
  name: mysql-auth
  namespace: kafka-cloud
  resourceVersion: "4041"
  uid: 14a7a878-d01f-4899-8dc7-81b515278f32
type: Opaque

添加连接集群

cat <<EOF | kubectl apply -n kafka-cloud -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
#  # use-connector-resources configures this KafkaConnect
#  # to use KafkaConnector resources to avoid
#  # needing to call the Connect REST API directly
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  image: connect-debezium
  replicas: 1
  bootstrapServers: my-kafka-cluster-kafka-bootstrap:9092
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
  externalConfiguration:
    volumes:
      - name: mysql-auth-config
        secret:
          secretName: mysql-auth
EOF

添加连接器

cat <<EOF | kubectl apply -n kafka-cloud -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
    name: mysql-test-connector
    labels:
        strimzi.io/cluster: my-connect-cluster
spec:
    class: io.debezium.connector.mysql.MySqlConnector
    tasksMax: 1
    config:
        database.hostname: 172.17.0.13
        database.port: 3306
        database.user:  "${file:/opt/kafka/external-configuration/mysql-auth-config/mysql-auth.properties:username}"
        database.password: "${file:/opt/kafka/external-configuration/mysql-auth-config/mysql-auth.properties:password}"
        database.server.id: 184054
        database.server.name: mysql-pod
        database.whitelist: sample
        database.history.kafka.bootstrap.servers: my-kafka-cluster-kafka-bootstrap:9092
        database.history.kafka.topic: "schema-changes.sample"
        key.converter: "org.apache.kafka.connect.storage.StringConverter"
        value.converter: "org.apache.kafka.connect.storage.StringConverter"
EOF

错误

而且无论我尝试什么都得到这个错误。我不知道我错过了什么。我知道这是一个简单的配置,但我无法弄清楚。我卡住了。

% kubectl -n kafka-cloud describe kafkaconnector mysql-test-connector

Name:         mysql-test-connector
Namespace:    kafka-cloud
Labels:       strimzi.io/cluster=my-connect-cluster
Annotations:  <none>
API Version:  kafka.strimzi.io/v1beta2
Kind:         KafkaConnector
Metadata:
  Creation Timestamp:  2022-03-02T23:44:20Z
  Generation:          1
  Managed Fields:
    API Version:  kafka.strimzi.io/v1beta2
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:kubectl.kubernetes.io/last-applied-configuration:
        f:labels:
          .:
          f:strimzi.io/cluster:
      f:spec:
        .:
        f:class:
        f:config:
          .:
          f:database.history.kafka.bootstrap.servers:
          f:database.history.kafka.topic:
          f:database.hostname:
          f:database.password:
          f:database.port:
          f:database.server.id:
          f:database.server.name:
          f:database.user:
          f:database.whitelist:
          f:key.converter:
          f:value.converter:
        f:tasksMax:
    Manager:      kubectl-client-side-apply
    Operation:    Update
    Time:         2022-03-02T23:44:20Z
    API Version:  kafka.strimzi.io/v1beta2
    Fields Type:  FieldsV1
    fieldsV1:
      f:status:
        .:
        f:conditions:
        f:observedGeneration:
        f:tasksMax:
        f:topics:
    Manager:         okhttp
    Operation:       Update
    Subresource:     status
    Time:            2022-03-02T23:44:20Z
  Resource Version:  3874
  UID:               c70ffe4e-3777-4524-af82-dad3a57ca25e
Spec:
  Class:  io.debezium.connector.mysql.MySqlConnector
  Config:
    database.history.kafka.bootstrap.servers:  my-kafka-cluster-kafka-bootstrap:9092
    database.history.kafka.topic:              schema-changes.sample
    database.hostname:                         172.17.0.13
    database.password:                         
    database.port:                             3306
    database.server.id:                        184054
    database.server.name:                      mysql-pod
    database.user:                             
    database.whitelist:                        sample
    key.converter:                             org.apache.kafka.connect.storage.StringConverter
    value.converter:                           org.apache.kafka.connect.storage.StringConverter
  Tasks Max:                                   1
Status:
  Conditions:
    Last Transition Time:  2022-03-02T23:45:00.097311Z
    Message:               PUT /connectors/mysql-test-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s):
A value is required
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
    Reason:             ConnectRestException
    Status:             True
    Type:               NotReady
  Observed Generation:  1
  Tasks Max:            1
  Topics:
Events:  <none>

mySQL 连接器所需的配置参数是:

database.allowPublicKeyRetrieval: true

问题已解决。