调试自定义 Kafka 连接器的简单有效方法是什么?
What is a simple, effective way to debug custom Kafka connectors?
我正在使用几个 Kafka 连接器,我在控制台输出的 creation/deployment 中没有看到任何错误,但是我没有得到我正在寻找的结果(没有结果就此而言,无论是希望还是其他)。我根据 Kafka 的示例 FileStream 连接器制作了这些连接器,因此我的调试技术基于示例中使用的 SLF4J 记录器的使用。我搜索了我认为会在控制台输出中生成的日志消息,但无济于事。我是不是在错误的地方寻找这些消息?或者是否有更好的方法来调试这些连接器?
我在实施中引用的 SLF4J 记录器的示例用法:
我会尽量笼统地回答你的问题。进行连接器开发的简单方法如下:
- 通过查看众多公开可用的 Kafka 连接器之一来构造和构建您的连接器源代码(您将在此处找到一个广泛的列表:https://www.confluent.io/product/connectors/)
- 从 https://www.confluent.io/download/
下载最新的 Confluent 开源版本 (>= 3.3.0)
通过以下方式之一使您的连接器包可用于 Kafka Connect:
- 将所有连接器 jar 文件(连接器 jar 加上不包括 Connect API jar 的依赖项 jar)存储到文件系统中的某个位置,并通过将此位置添加到
plugin.path
属性 在 Connect worker 属性中。例如,如果您的连接器罐存储在 /opt/connectors/my-first-connector
中,您将在工作人员的属性中设置 plugin.path=/opt/connectors
(见下文)。
- 将所有连接器 jar 文件存储在
${CONFLUENT_HOME}/share/java
下的文件夹中。例如:${CONFLUENT_HOME}/share/java/kafka-connect-my-first-connector
。 (需要以 kafka-connect-
前缀开头才能被启动脚本选中)。 $CONFLUENT_HOME 是您安装 Confluent Platform 的位置。
可选地,通过将 ${CONFLUENT_HOME}/etc/kafka/connect-log4j.properties
中的 Connect 的日志级别更改为 DEBUG
甚至 TRACE
.
[=86 来增加日志记录=]
使用Confluent CLI启动所有服务,包括Kafka Connect。详情在这里:http://docs.confluent.io/current/connect/quickstart.html
简述:confluent start
Note: The Connect worker's properties file currently loaded by the CLI is ${CONFLUENT_HOME}/etc/schema-registry/connect-avro-distributed.properties
. That's the file you should edit if you choose to enable classloading isolation but also if you need to change your Connect worker's properties.
一旦你有 Connect worker 运行ning,通过 运行ning 启动你的连接器:
confluent load <connector_name> -d <connector_config.properties>
或
confluent load <connector_name> -d <connector_config.json>
连接器配置可以是 java 属性或 JSON 格式。
运行
confluent log connect
打开 Connect worker 的日志文件,或直接导航至 运行ning
存储您的日志和数据的位置
cd "$( confluent current )"
Note: change where your logs and data are stored during a session of the Confluent CLI by setting the environment variable CONFLUENT_CURRENT
appropriately. E.g. given that /opt/confluent
exists and is where you want to store your data, run:
export CONFLUENT_CURRENT=/opt/confluent
confluent current
最后,要以交互方式调试您的连接器,一种可能的方法是在开始使用 Confluent CLI 连接之前应用以下内容:
confluent stop connect
export CONNECT_DEBUG=y; export DEBUG_SUSPEND_FLAG=y;
confluent start connect
然后连接你的调试器(例如远程连接到 Connect worker(默认端口:5005)。要停止 运行在调试模式下连接,只需 运行:unset CONNECT_DEBUG; unset DEBUG_SUSPEND_FLAG;
完成后。
希望以上内容能让您的连接器开发更轻松、更有趣!
我喜欢接受的答案。一件事 - 环境变量对我不起作用...我使用的是融合社区版 5.3.1...
这是我所做的工作...
我从这里安装了 confluent cli:
https://docs.confluent.io/current/cli/installing.html#tarball-installation
i 运行 使用命令confluent local start
汇合
我使用命令 ps -ef | grep connect
获取了连接应用的详细信息
我将生成的命令复制到编辑器并添加了 arg(就在 java 之后):
-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
然后我停止使用命令 confluent local stop connect
进行连接
然后我 运行 连接命令与 arg
短暂的中场休息---
vs code development is led by erich gamma - of gang of four
fame, who also wrote eclipse. vs code is becoming a first class java ide see https://en.wikipedia.org/wiki/Erich_Gamma
中场休息结束---
接下来我启动 vs code 并打开 debezium oracle 连接器文件夹(从此处克隆)https://github.com/debezium/debezium-incubator
然后我选择了Debug - Open Configurations
并进入高亮调试配置
然后是 运行 调试器 - 它会命中您的断点!!
连接命令应如下所示:
/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/bin/java -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Xms256M -Xmx2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/logs -Dlog4j.configuration=file:/Users/myuserid/confluent-5.3.1/bin/../etc/kafka/connect-log4j.properties -cp /Users/myuserid/confluent-5.3.1/share/java/kafka/*:/Users/myuserid/confluent-5.3.1/share/java/confluent-common/*:/Users/myuserid/confluent-5.3.1/share/java/kafka-serde-tools/*:/Users/myuserid/confluent-5.3.1/bin/../share/java/kafka/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/dependant-libs-2.12.8/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/libs/*:/usr/share/java/support-metrics-client/* org.apache.kafka.connect.cli.ConnectDistributed /var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/connect.properties
连接器模块由kafka连接器框架执行。对于调试,我们可以使用独立模式。我们可以配置 IDE 以使用 ConnectStandalone 主函数作为入口点。
创建调试配置如下。如果是maven项目需要记得勾选"Include dependencies with "Provided" scope
连接器属性文件需要指定连接器 class 名称 "connector.class" 用于调试
- 可以从 kafka 文件夹复制 worker 属性文件 /usr/local/etc/kafka/connect-standalone.properties
我正在使用几个 Kafka 连接器,我在控制台输出的 creation/deployment 中没有看到任何错误,但是我没有得到我正在寻找的结果(没有结果就此而言,无论是希望还是其他)。我根据 Kafka 的示例 FileStream 连接器制作了这些连接器,因此我的调试技术基于示例中使用的 SLF4J 记录器的使用。我搜索了我认为会在控制台输出中生成的日志消息,但无济于事。我是不是在错误的地方寻找这些消息?或者是否有更好的方法来调试这些连接器?
我在实施中引用的 SLF4J 记录器的示例用法:
我会尽量笼统地回答你的问题。进行连接器开发的简单方法如下:
- 通过查看众多公开可用的 Kafka 连接器之一来构造和构建您的连接器源代码(您将在此处找到一个广泛的列表:https://www.confluent.io/product/connectors/)
- 从 https://www.confluent.io/download/ 下载最新的 Confluent 开源版本 (>= 3.3.0)
通过以下方式之一使您的连接器包可用于 Kafka Connect:
- 将所有连接器 jar 文件(连接器 jar 加上不包括 Connect API jar 的依赖项 jar)存储到文件系统中的某个位置,并通过将此位置添加到
plugin.path
属性 在 Connect worker 属性中。例如,如果您的连接器罐存储在/opt/connectors/my-first-connector
中,您将在工作人员的属性中设置plugin.path=/opt/connectors
(见下文)。 - 将所有连接器 jar 文件存储在
${CONFLUENT_HOME}/share/java
下的文件夹中。例如:${CONFLUENT_HOME}/share/java/kafka-connect-my-first-connector
。 (需要以kafka-connect-
前缀开头才能被启动脚本选中)。 $CONFLUENT_HOME 是您安装 Confluent Platform 的位置。
- 将所有连接器 jar 文件(连接器 jar 加上不包括 Connect API jar 的依赖项 jar)存储到文件系统中的某个位置,并通过将此位置添加到
可选地,通过将
[=86 来增加日志记录=]${CONFLUENT_HOME}/etc/kafka/connect-log4j.properties
中的 Connect 的日志级别更改为DEBUG
甚至TRACE
.使用Confluent CLI启动所有服务,包括Kafka Connect。详情在这里:http://docs.confluent.io/current/connect/quickstart.html
简述:
confluent start
Note: The Connect worker's properties file currently loaded by the CLI is
${CONFLUENT_HOME}/etc/schema-registry/connect-avro-distributed.properties
. That's the file you should edit if you choose to enable classloading isolation but also if you need to change your Connect worker's properties.
一旦你有 Connect worker 运行ning,通过 运行ning 启动你的连接器:
confluent load <connector_name> -d <connector_config.properties>
或
confluent load <connector_name> -d <connector_config.json>
连接器配置可以是 java 属性或 JSON 格式。
运行
存储您的日志和数据的位置confluent log connect
打开 Connect worker 的日志文件,或直接导航至 运行ningcd "$( confluent current )"
Note: change where your logs and data are stored during a session of the Confluent CLI by setting the environment variable
CONFLUENT_CURRENT
appropriately. E.g. given that/opt/confluent
exists and is where you want to store your data, run:
export CONFLUENT_CURRENT=/opt/confluent
confluent current
最后,要以交互方式调试您的连接器,一种可能的方法是在开始使用 Confluent CLI 连接之前应用以下内容:
confluent stop connect
export CONNECT_DEBUG=y; export DEBUG_SUSPEND_FLAG=y;
confluent start connect
然后连接你的调试器(例如远程连接到 Connect worker(默认端口:5005)。要停止 运行在调试模式下连接,只需 运行:
unset CONNECT_DEBUG; unset DEBUG_SUSPEND_FLAG;
完成后。
希望以上内容能让您的连接器开发更轻松、更有趣!
我喜欢接受的答案。一件事 - 环境变量对我不起作用...我使用的是融合社区版 5.3.1...
这是我所做的工作...
我从这里安装了 confluent cli: https://docs.confluent.io/current/cli/installing.html#tarball-installation
i 运行 使用命令confluent local start
我使用命令 ps -ef | grep connect
我将生成的命令复制到编辑器并添加了 arg(就在 java 之后):
-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
然后我停止使用命令 confluent local stop connect
然后我 运行 连接命令与 arg
短暂的中场休息---
vs code development is led by erich gamma - of
gang of four
fame, who also wrote eclipse. vs code is becoming a first class java ide see https://en.wikipedia.org/wiki/Erich_Gamma
中场休息结束---
接下来我启动 vs code 并打开 debezium oracle 连接器文件夹(从此处克隆)https://github.com/debezium/debezium-incubator
然后我选择了Debug - Open Configurations
并进入高亮调试配置
然后是 运行 调试器 - 它会命中您的断点!!
连接命令应如下所示:
/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/bin/java -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Xms256M -Xmx2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/logs -Dlog4j.configuration=file:/Users/myuserid/confluent-5.3.1/bin/../etc/kafka/connect-log4j.properties -cp /Users/myuserid/confluent-5.3.1/share/java/kafka/*:/Users/myuserid/confluent-5.3.1/share/java/confluent-common/*:/Users/myuserid/confluent-5.3.1/share/java/kafka-serde-tools/*:/Users/myuserid/confluent-5.3.1/bin/../share/java/kafka/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/dependant-libs-2.12.8/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/libs/*:/usr/share/java/support-metrics-client/* org.apache.kafka.connect.cli.ConnectDistributed /var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/connect.properties
连接器模块由kafka连接器框架执行。对于调试,我们可以使用独立模式。我们可以配置 IDE 以使用 ConnectStandalone 主函数作为入口点。
创建调试配置如下。如果是maven项目需要记得勾选"Include dependencies with "Provided" scope
连接器属性文件需要指定连接器 class 名称 "connector.class" 用于调试
- 可以从 kafka 文件夹复制 worker 属性文件 /usr/local/etc/kafka/connect-standalone.properties