调试自定义 Kafka 连接器的简单有效方法是什么?

What is a simple, effective way to debug custom Kafka connectors?

我正在使用几个 Kafka 连接器,我在控制台输出的 creation/deployment 中没有看到任何错误,但是我没有得到我正在寻找的结果(没有结果就此而言,无论是希望还是其他)。我根据 Kafka 的示例 FileStream 连接器制作了这些连接器,因此我的调试技术基于示例中使用的 SLF4J 记录器的使用。我搜索了我认为会在控制台输出中生成的日志消息,但无济于事。我是不是在错误的地方寻找这些消息?或者是否有更好的方法来调试这些连接器?

我在实施中引用的 SLF4J 记录器的示例用法:

Kafka FileStreamSinkTask

Kafka FileStreamSourceTask

我会尽量笼统地回答你的问题。进行连接器开发的简单方法如下:

  • 通过查看众多公开可用的 Kafka 连接器之一来构造和构建您的连接器源代码(您将在此处找到一个广泛的列表:https://www.confluent.io/product/connectors/
  • https://www.confluent.io/download/
  • 下载最新的 Confluent 开源版本 (>= 3.3.0)
  • 通过以下方式之一使您的连接器包可用于 Kafka Connect:

    1. 将所有连接器 jar 文件(连接器 jar 加上不包括 Connect API jar 的依赖项 jar)存储到文件系统中的某个位置,并通过将此位置添加到 plugin.path 属性 在 Connect worker 属性中。例如,如果您的连接器罐存储在 /opt/connectors/my-first-connector 中,您将在工作人员的属性中设置 plugin.path=/opt/connectors(见下文)。
    2. 将所有连接器 jar 文件存储在 ${CONFLUENT_HOME}/share/java 下的文件夹中。例如:${CONFLUENT_HOME}/share/java/kafka-connect-my-first-connector。 (需要以 kafka-connect- 前缀开头才能被启动脚本选中)。 $CONFLUENT_HOME 是您安装 Confluent Platform 的位置。
  • 可选地,通过将​​ ${CONFLUENT_HOME}/etc/kafka/connect-log4j.properties 中的 Connect 的日志级别更改为 DEBUG 甚至 TRACE.

    [=86 来增加日志记录=]
  • 使用Confluent CLI启动所有服务,包括Kafka Connect。详情在这里:http://docs.confluent.io/current/connect/quickstart.html

    简述:confluent start

Note: The Connect worker's properties file currently loaded by the CLI is ${CONFLUENT_HOME}/etc/schema-registry/connect-avro-distributed.properties. That's the file you should edit if you choose to enable classloading isolation but also if you need to change your Connect worker's properties.

  • 一旦你有 Connect worker 运行ning,通过 运行ning 启动你的连接器:

    confluent load <connector_name> -d <connector_config.properties>

    confluent load <connector_name> -d <connector_config.json>

    连接器配置可以是 java 属性或 JSON 格式。

  • 运行 confluent log connect 打开 Connect worker 的日志文件,或直接导航至 运行ning

    存储您的日志和数据的位置

    cd "$( confluent current )"

Note: change where your logs and data are stored during a session of the Confluent CLI by setting the environment variable CONFLUENT_CURRENT appropriately. E.g. given that /opt/confluent exists and is where you want to store your data, run:

export CONFLUENT_CURRENT=/opt/confluent
confluent current

  • 最后,要以交互方式调试您的连接器,一种可能的方法是在开始使用 Confluent CLI 连接之前应用以下内容:

    confluent stop connect
    export CONNECT_DEBUG=y; export DEBUG_SUSPEND_FLAG=y;
    confluent start connect

    然后连接你的调试器(例如远程连接到 Connect worker(默认端口:5005)。要停止 运行在调试模式下连接,只需 运行:unset CONNECT_DEBUG; unset DEBUG_SUSPEND_FLAG; 完成后。

希望以上内容能让您的连接器开发更轻松、更有趣!

我喜欢接受的答案。一件事 - 环境变量对我不起作用...我使用的是融合社区版 5.3.1...

这是我所做的工作...

我从这里安装了 confluent cli: https://docs.confluent.io/current/cli/installing.html#tarball-installation

i 运行 使用命令confluent local start

汇合

我使用命令 ps -ef | grep connect

获取了连接应用的详细信息

我将生成的命令复制到编辑器并添加了 arg(就在 java 之后):

-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005

然后我停止使用命令 confluent local stop connect

进行连接

然后我 运行 连接命令与 arg

短暂的中场休息---

vs code development is led by erich gamma - of gang of four fame, who also wrote eclipse. vs code is becoming a first class java ide see https://en.wikipedia.org/wiki/Erich_Gamma

中场休息结束---

接下来我启动 vs code 并打开 debezium oracle 连接器文件夹(从此处克隆)https://github.com/debezium/debezium-incubator

然后我选择了Debug - Open Configurations

并进入高亮调试配置

然后是 运行 调试器 - 它会命中您的断点!!

连接命令应如下所示:

/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/bin/java -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Xms256M -Xmx2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/logs -Dlog4j.configuration=file:/Users/myuserid/confluent-5.3.1/bin/../etc/kafka/connect-log4j.properties -cp /Users/myuserid/confluent-5.3.1/share/java/kafka/*:/Users/myuserid/confluent-5.3.1/share/java/confluent-common/*:/Users/myuserid/confluent-5.3.1/share/java/kafka-serde-tools/*:/Users/myuserid/confluent-5.3.1/bin/../share/java/kafka/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/dependant-libs-2.12.8/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/libs/*:/usr/share/java/support-metrics-client/* org.apache.kafka.connect.cli.ConnectDistributed /var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/connect.properties

连接器模块由kafka连接器框架执行。对于调试,我们可以使用独立模式。我们可以配置 IDE 以使用 ConnectStandalone 主函数作为入口点。

  1. 创建调试配置如下。如果是maven项目需要记得勾选"Include dependencies with "Provided" scope

  2. 连接器属性文件需要指定连接器 class 名称 "connector.class" 用于调试

  3. 可以从 kafka 文件夹复制 worker 属性文件 /usr/local/etc/kafka/connect-standalone.properties