向 kafka 连接添加新的接收器配置

Adding new sink config to kafka connect

我正忙于学习 kafka,特别是使用接收器连接器 (jdbc) 将数据从 kafka 主题发送到 mysql 的下游元素。

我已经使用 jdbc 连接器和 mysql jdbc 驱动程序从汇合的 kafka-connect-base 图像构建了一个图像,在这样的 Dockerfile 中:

FROM confluentinc/cp-kafka-connect-base

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.4.1

ENV MYSQL_DRIVER_VERSION 5.1.39

RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-${MYSQL_DRIVER_VERSION}.tar.gz" \
     | tar -xzf - -C /usr/share/java/kafka/ --strip-components=1 mysql-connector-java-5.1.39/mysql-connector-java-${MYSQL_DRIVER_VERSION}-bin.jar

老实说,我有点迷失在文档中,我已经到了想要包含接收器配置的地步,但我不知道如何包含它或复制到哪里。我已经创建了接收器配置文件,但不确定将它放在哪里。作为映像构建的一部分,或者当 运行 kafka-connect 容器时?

最终目标是为此连接器创建 helm 部署,但我还没有完成。

在我学习所有关于 Kafka 和 Kafka Connect 的过程中,我们将不胜感激。

由于您在分布式模式下使用 Kafka Connect(即 generally the best option),因此您将连接器配置作为 REST 调用传递。

这是一个示例 JDBC 接收器配置 - 您需要修改它以适合您的源主题、serialisation 等:

curl -X PUT http://localhost:8083/connectors/sink_postgres_foo_00/config -H "Content-Type: application/json" -d '{
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url": "jdbc:mysql://mysql-host:3306/",
      "connection.user": "user",
      "connection.password": "pw",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "true",
      "tasks.max": "1",
      "topics": "foo",
      "auto.create": "true",
      "auto.evolve":"true",
      "pk.mode":"none"          
    }'

如果你想在实例化容器时传递你的连接器配置,你可以将它嵌入到启动命令中,它应该启动 worker,等待它可用,然后传递配置。 Here's an example. Also see https://rmoff.net/2018/12/15/docker-tips-and-tricks-with-ksql-and-kafka/.