如何创建 json 以创建带有转换的分布式 Kafka Connect 实例?
How do I create the json for creating a distributed Kafka Connect Instance with a transformation?
使用独立模式我创建了一个连接器和我的自定义转换:
name=rabbitmq-source
connector.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector
tasks.max=1
rabbitmq.host=rabbitmq-server
rabbitmq.queue=answers
kafka.topic=net.gutefrage.answers
transforms=extractFields
transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value
transforms.extractFields.fields=body,envelope.routingKey
transforms.extractFields.structName=net.gutefrage.events
但是对于分布式连接器,向 Connect REST API 发出 PUT 请求的语法是什么?我在文档中找不到任何示例。
已经尝试过一些方法,例如:
cat <<EOF >/tmp/connector
{
"name": "rabbitmq-source",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max": "1",
"rabbitmq.host": "rabbitmq-server",
"rabbitmq.queue": "answers",
"kafka.topic": "net.gutefrage.answers",
"transforms": "extractFields",
"transforms.extractFields": {
"type": "net.gutefrage.connector.transforms.ExtractFields$Value",
"fields": "body,envelope.routingKey",
"structName": "net.gutefrage.events"
}
}
}
EOF
curl -vs --stderr - -X POST -H "Content-Type: application/json" --data @/tmp/connector "http://localhost:8083/connectors"
rm /tmp/connector
或者这也不起作用:
{
"name": "rabbitmq-source",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max": "1",
"rabbitmq.host": "rabbitmq-server",
"rabbitmq.queue": "answers",
"kafka.topic": "net.gutefrage.answers",
"transforms": "extractFields",
"transforms.extractFields.type": "net.gutefrage.connector.transforms.ExtractFields$Value",
"transforms.extractFields.fields": "body,envelope.routingKey",
"transforms.extractFields.structName": "net.gutefrage.events"
}
}
对于最后一个变体,我收到以下错误:
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class net.gutefrage.connector.transforms.ExtractFields for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}
请注意,使用属性格式它可以很好地工作(在 fast-data-dev 中使用 Landoops 创建新连接器 UI。有趣的是 Landoop 的 Ui 功能 'translate to curl' 产生与我的第二个示例非常相同 json)
更新
为了确保这不是 Landoop 的问题,docker 以及我的自定义转换,我已经使用 COP 的标准分布式属性以分布式模式启动了 zookeeper、broker、schema registry 和 Kafka Connect 3.3.0
bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
哪个日志
[2017-09-13 14:07:52,930] INFO Loading plugin from: /opt/connectors/confluent-oss-gf-assembly-1.0.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:176)
[2017-09-13 14:07:53,711] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/connectors/confluent-oss-gf-assembly-1.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-09-13 14:07:53,711] INFO Added plugin 'com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-09-13 14:07:53,712] INFO Added plugin 'net.gutefrage.connector.transforms.ExtractFields$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-09-13 14:07:53,712] INFO Added plugin 'net.gutefrage.connector.transforms.ExtractFields$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
到目前为止一切都很好。然后我创建了一个连接器配置:
猫 <<EOF >/tmp/connector<br>
{
"name": "rabbitmq-source",
"config":{
"connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max": "1",
"rabbitmq.host": "rabbitmq-server",
"rabbitmq.queue": "answers",
"kafka.topic": "net.gutefrage.answers",
"transforms": "extractFields",
"transforms.extractFields.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractFields.field": "body"
}
}
结束符
请注意,我现在使用标准(捆绑)提取字段转换。
当我 post 与 curl -vs --stderr - -X POST -H "Content-Type: application/json" --data @/tmp/connector "http://localhost:8083/connectors"
我也一样
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class org.apache.kafka.connect.transforms.ExtractField for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}*
要使用 json 格式的连接器配置和 CP 连接 CLI,必须在 Kafka-Connect 集群所在的机器上安装 jq 工具 运行。
例如对于 Landoops 快速数据开发环境,您必须
docker exec rabbitmqconnect_fast-data-dev_1 apk add --no-cache jq
那么这将起作用:
docker exec rabbitmqconnect_fast-data-dev_1 /opt/confluent-3.3.0/bin/confluent config rabbitmq-source -d /tmp/connector-config.json
虽然这并不能解决使用连接器 REST 端点时的问题。
如果你想 运行 Kafka Connect worker 处于 standalone 模式,那么你必须启动 worker 并提供 worker 配置文件 and 一个或多个连接器配置文件。所有这些配置文件都是 Java 属性格式,因此您提供的第一个配置示例是正确的格式:
name=rabbitmq-source
connect.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector
tasks.max=1
rabbitmq.host=rabbitmq-server
rabbitmq.queue=answers
kafka.topic=net.gutefrage.answers
transforms=extractFields
transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value
transforms.extractFields.fields=body,envelope.routingKey
transforms.extractFields.structName=net.gutefrage.events
如果你想运行 分布式模式下的Kafka Connect worker,那么你必须首先启动分布式worker,然后创建连接器作为第二个使用 the REST API 和带有 JSON 文档的 PUT
请求到 /connectors
端点的步骤。该 JSON 文档将与您的第二个 JSON 文档的格式相匹配:
{
"name": "rabbitmq-source",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max": "1",
"rabbitmq.host": "rabbitmq-server",
"rabbitmq.queue": "answers",
"kafka.topic": "net.gutefrage.answers",
"transforms": "extractFields",
"transforms.extractFields.type": "net.gutefrage.connector.transforms.ExtractFields$Value",
"transforms.extractFields.fields": "body,envelope.routingKey",
"transforms.extractFields.structName": "net.gutefrage.events"
}
}
Confluent CLI 包含在包含 Kafka 的 Confluent 开源平台中,是一种开发工具,可通过 运行Ning Zookeeper 实例、Kafka 代理、Confluent Schema Registry、 REST 代理和分布式模式下的 Connect worker。加载连接器时,将连接器配置指定为 JSON 文件或 属性 文件,使用 jq
.[=26= 将后者转换为 JSON 格式]
但是,您报错是:
{
"error_code":400,
"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class net.gutefrage.connector.transforms.ExtractFields for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}
此错误消息的重要部分是 "Error getting config definition from Transformation: null"。虽然这有点太神秘了,但它意味着 net.gutefrage.connector.transforms.ExtractFields
Java class 的 config()
方法是 returning null。
确保您指定的 net.gutefrage.connector.transforms.ExtractFields$Value
字符串是嵌套静态 class Value
的正确完全限定名称,并且 Value
class 完全正确地实现了 org.apache.kafka.connect.transforms.Transformation<? extends ConnectRecord<R>>
接口。请注意,config()
方法必须 return 一个非空 ConfigDef
对象。
查看 this example of a Single Message Transform (SMT) that ships with Apache Kafka, or Robin's blog post 了解其他示例。
使用 fast-data-dev
您可以为任何连接器构建 JAR 文件,然后只需按照
处的说明将其添加到类路径中
https://github.com/Landoop/fast-data-dev#enable-additional-connectors
UI 将自动检测新连接器 - 并在您点击新连接器时为您提供说明:
http://localhost:3030/kafka-connect-ui
什么也值得尝试 - 因为 fast-data-dev
已经带有通用 MQTT 接收器连接器,正在尝试。请参阅 http://docs.datamountaineer.com/en/latest/mqtt-sink.html
中的说明
你实际上需要做
connect.mqtt.kcql=INSERT INTO /answers SELECT body FROM net.gutefrage.answers
因为这是一个通用的 MQTT 连接器 - 您可能需要使用 enable-additional-connectors
说明
添加 rabbitmq 客户端库
确保 transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value 中的 $Value 未被 bash 命令 cat 解释为变量。它对我有用。
使用独立模式我创建了一个连接器和我的自定义转换:
name=rabbitmq-source
connector.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector
tasks.max=1
rabbitmq.host=rabbitmq-server
rabbitmq.queue=answers
kafka.topic=net.gutefrage.answers
transforms=extractFields
transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value
transforms.extractFields.fields=body,envelope.routingKey
transforms.extractFields.structName=net.gutefrage.events
但是对于分布式连接器,向 Connect REST API 发出 PUT 请求的语法是什么?我在文档中找不到任何示例。
已经尝试过一些方法,例如:
cat <<EOF >/tmp/connector
{
"name": "rabbitmq-source",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max": "1",
"rabbitmq.host": "rabbitmq-server",
"rabbitmq.queue": "answers",
"kafka.topic": "net.gutefrage.answers",
"transforms": "extractFields",
"transforms.extractFields": {
"type": "net.gutefrage.connector.transforms.ExtractFields$Value",
"fields": "body,envelope.routingKey",
"structName": "net.gutefrage.events"
}
}
}
EOF
curl -vs --stderr - -X POST -H "Content-Type: application/json" --data @/tmp/connector "http://localhost:8083/connectors"
rm /tmp/connector
或者这也不起作用:
{
"name": "rabbitmq-source",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max": "1",
"rabbitmq.host": "rabbitmq-server",
"rabbitmq.queue": "answers",
"kafka.topic": "net.gutefrage.answers",
"transforms": "extractFields",
"transforms.extractFields.type": "net.gutefrage.connector.transforms.ExtractFields$Value",
"transforms.extractFields.fields": "body,envelope.routingKey",
"transforms.extractFields.structName": "net.gutefrage.events"
}
}
对于最后一个变体,我收到以下错误:
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class net.gutefrage.connector.transforms.ExtractFields for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}
请注意,使用属性格式它可以很好地工作(在 fast-data-dev 中使用 Landoops 创建新连接器 UI。有趣的是 Landoop 的 Ui 功能 'translate to curl' 产生与我的第二个示例非常相同 json)
更新
为了确保这不是 Landoop 的问题,docker 以及我的自定义转换,我已经使用 COP 的标准分布式属性以分布式模式启动了 zookeeper、broker、schema registry 和 Kafka Connect 3.3.0
bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
哪个日志
[2017-09-13 14:07:52,930] INFO Loading plugin from: /opt/connectors/confluent-oss-gf-assembly-1.0.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:176)
[2017-09-13 14:07:53,711] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/connectors/confluent-oss-gf-assembly-1.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-09-13 14:07:53,711] INFO Added plugin 'com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-09-13 14:07:53,712] INFO Added plugin 'net.gutefrage.connector.transforms.ExtractFields$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-09-13 14:07:53,712] INFO Added plugin 'net.gutefrage.connector.transforms.ExtractFields$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
到目前为止一切都很好。然后我创建了一个连接器配置:
猫 <<EOF >/tmp/connector<br>
{
"name": "rabbitmq-source",
"config":{
"connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max": "1",
"rabbitmq.host": "rabbitmq-server",
"rabbitmq.queue": "answers",
"kafka.topic": "net.gutefrage.answers",
"transforms": "extractFields",
"transforms.extractFields.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractFields.field": "body"
}
}
结束符
请注意,我现在使用标准(捆绑)提取字段转换。
当我 post 与 curl -vs --stderr - -X POST -H "Content-Type: application/json" --data @/tmp/connector "http://localhost:8083/connectors"
我也一样
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class org.apache.kafka.connect.transforms.ExtractField for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}*
要使用 json 格式的连接器配置和 CP 连接 CLI,必须在 Kafka-Connect 集群所在的机器上安装 jq 工具 运行。
例如对于 Landoops 快速数据开发环境,您必须
docker exec rabbitmqconnect_fast-data-dev_1 apk add --no-cache jq
那么这将起作用:
docker exec rabbitmqconnect_fast-data-dev_1 /opt/confluent-3.3.0/bin/confluent config rabbitmq-source -d /tmp/connector-config.json
虽然这并不能解决使用连接器 REST 端点时的问题。
如果你想 运行 Kafka Connect worker 处于 standalone 模式,那么你必须启动 worker 并提供 worker 配置文件 and 一个或多个连接器配置文件。所有这些配置文件都是 Java 属性格式,因此您提供的第一个配置示例是正确的格式:
name=rabbitmq-source
connect.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector
tasks.max=1
rabbitmq.host=rabbitmq-server
rabbitmq.queue=answers
kafka.topic=net.gutefrage.answers
transforms=extractFields
transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value
transforms.extractFields.fields=body,envelope.routingKey
transforms.extractFields.structName=net.gutefrage.events
如果你想运行 分布式模式下的Kafka Connect worker,那么你必须首先启动分布式worker,然后创建连接器作为第二个使用 the REST API 和带有 JSON 文档的 PUT
请求到 /connectors
端点的步骤。该 JSON 文档将与您的第二个 JSON 文档的格式相匹配:
{
"name": "rabbitmq-source",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max": "1",
"rabbitmq.host": "rabbitmq-server",
"rabbitmq.queue": "answers",
"kafka.topic": "net.gutefrage.answers",
"transforms": "extractFields",
"transforms.extractFields.type": "net.gutefrage.connector.transforms.ExtractFields$Value",
"transforms.extractFields.fields": "body,envelope.routingKey",
"transforms.extractFields.structName": "net.gutefrage.events"
}
}
Confluent CLI 包含在包含 Kafka 的 Confluent 开源平台中,是一种开发工具,可通过 运行Ning Zookeeper 实例、Kafka 代理、Confluent Schema Registry、 REST 代理和分布式模式下的 Connect worker。加载连接器时,将连接器配置指定为 JSON 文件或 属性 文件,使用 jq
.[=26= 将后者转换为 JSON 格式]
但是,您报错是:
{
"error_code":400,
"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class net.gutefrage.connector.transforms.ExtractFields for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}
此错误消息的重要部分是 "Error getting config definition from Transformation: null"。虽然这有点太神秘了,但它意味着 net.gutefrage.connector.transforms.ExtractFields
Java class 的 config()
方法是 returning null。
确保您指定的 net.gutefrage.connector.transforms.ExtractFields$Value
字符串是嵌套静态 class Value
的正确完全限定名称,并且 Value
class 完全正确地实现了 org.apache.kafka.connect.transforms.Transformation<? extends ConnectRecord<R>>
接口。请注意,config()
方法必须 return 一个非空 ConfigDef
对象。
查看 this example of a Single Message Transform (SMT) that ships with Apache Kafka, or Robin's blog post 了解其他示例。
使用 fast-data-dev
您可以为任何连接器构建 JAR 文件,然后只需按照
https://github.com/Landoop/fast-data-dev#enable-additional-connectors
UI 将自动检测新连接器 - 并在您点击新连接器时为您提供说明:
http://localhost:3030/kafka-connect-ui
什么也值得尝试 - 因为 fast-data-dev
已经带有通用 MQTT 接收器连接器,正在尝试。请参阅 http://docs.datamountaineer.com/en/latest/mqtt-sink.html
你实际上需要做
connect.mqtt.kcql=INSERT INTO /answers SELECT body FROM net.gutefrage.answers
因为这是一个通用的 MQTT 连接器 - 您可能需要使用 enable-additional-connectors
说明
确保 transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value 中的 $Value 未被 bash 命令 cat 解释为变量。它对我有用。