如何创建 json 以创建带有转换的分布式 Kafka Connect 实例?

How do I create the json for creating a distributed Kafka Connect Instance with a transformation?

使用独立模式我创建了一个连接器和我的自定义转换:

name=rabbitmq-source
connector.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector
tasks.max=1
rabbitmq.host=rabbitmq-server
rabbitmq.queue=answers
kafka.topic=net.gutefrage.answers
transforms=extractFields
transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value
transforms.extractFields.fields=body,envelope.routingKey
transforms.extractFields.structName=net.gutefrage.events

但是对于分布式连接器,向 Connect REST API 发出 PUT 请求的语法是什么?我在文档中找不到任何示例。

已经尝试过一些方法,例如:

cat <<EOF >/tmp/connector
{
  "name": "rabbitmq-source",
  "config": {
    "connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
    "tasks.max": "1",
    "rabbitmq.host": "rabbitmq-server",
    "rabbitmq.queue": "answers",
    "kafka.topic": "net.gutefrage.answers",
    "transforms": "extractFields",
    "transforms.extractFields": {
      "type": "net.gutefrage.connector.transforms.ExtractFields$Value",
      "fields": "body,envelope.routingKey",
      "structName": "net.gutefrage.events"
    }
  }
}
EOF

curl -vs --stderr - -X POST -H "Content-Type: application/json" --data @/tmp/connector "http://localhost:8083/connectors"
rm /tmp/connector

或者这也不起作用:

{
  "name": "rabbitmq-source",
  "config": {
    "connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
    "tasks.max": "1",
    "rabbitmq.host": "rabbitmq-server",
    "rabbitmq.queue": "answers",
    "kafka.topic": "net.gutefrage.answers",
    "transforms": "extractFields",
    "transforms.extractFields.type": "net.gutefrage.connector.transforms.ExtractFields$Value",
    "transforms.extractFields.fields": "body,envelope.routingKey",
    "transforms.extractFields.structName": "net.gutefrage.events"
  }
}

对于最后一个变体,我收到以下错误:

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class net.gutefrage.connector.transforms.ExtractFields for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}

请注意,使用属性格式它可以很好地工作(在 fast-data-dev 中使用 Landoops 创建新连接器 UI。有趣的是 Landoop 的 Ui 功能 'translate to curl' 产生与我的第二个示例非常相同 json)

更新

为了确保这不是 Landoop 的问题,docker 以及我的自定义转换,我已经使用 COP 的标准分布式属性以分布式模式启动了 zookeeper、broker、schema registry 和 Kafka Connect 3.3.0

bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties

哪个日志 [2017-09-13 14:07:52,930] INFO Loading plugin from: /opt/connectors/confluent-oss-gf-assembly-1.0.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:176) [2017-09-13 14:07:53,711] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/connectors/confluent-oss-gf-assembly-1.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199) [2017-09-13 14:07:53,711] INFO Added plugin 'com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-09-13 14:07:53,712] INFO Added plugin 'net.gutefrage.connector.transforms.ExtractFields$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-09-13 14:07:53,712] INFO Added plugin 'net.gutefrage.connector.transforms.ExtractFields$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) 到目前为止一切都很好。然后我创建了一个连接器配置:

猫 <<EOF >/tmp/connector<br> { "name": "rabbitmq-source", "config":{ "connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector", "tasks.max": "1", "rabbitmq.host": "rabbitmq-server", "rabbitmq.queue": "answers", "kafka.topic": "net.gutefrage.answers", "transforms": "extractFields", "transforms.extractFields.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.extractFields.field": "body" } } 结束符 请注意,我现在使用标准(捆绑)提取字段转换。 当我 post 与 curl -vs --stderr - -X POST -H "Content-Type: application/json" --data @/tmp/connector "http://localhost:8083/connectors" 我也一样

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class org.apache.kafka.connect.transforms.ExtractField for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}*

要使用 json 格式的连接器配置和 CP 连接 CLI,必须在 Kafka-Connect 集群所在的机器上安装 jq 工具 运行。

例如对于 Landoops 快速数据开发环境,您必须

docker exec rabbitmqconnect_fast-data-dev_1 apk add --no-cache jq

那么这将起作用:

docker exec rabbitmqconnect_fast-data-dev_1 /opt/confluent-3.3.0/bin/confluent config rabbitmq-source -d /tmp/connector-config.json

虽然这并不能解决使用连接器 REST 端点时的问题。

如果你想 运行 Kafka Connect worker 处于 standalone 模式,那么你必须启动 worker 并提供 worker 配置文件 and 一个或多个连接器配置文件。所有这些配置文件都是 Java 属性格式,因此您提供的第一个配置示例是正确的格式:

name=rabbitmq-source
connect.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector
tasks.max=1
rabbitmq.host=rabbitmq-server
rabbitmq.queue=answers
kafka.topic=net.gutefrage.answers
transforms=extractFields
transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value
transforms.extractFields.fields=body,envelope.routingKey
transforms.extractFields.structName=net.gutefrage.events

如果你想运行 分布式模式下的Kafka Connect worker,那么你必须首先启动分布式worker,然后创建连接器作为第二个使用 the REST API 和带有 JSON 文档的 PUT 请求到 /connectors 端点的步骤。该 JSON 文档将与您的第二个 JSON 文档的格式相匹配:

{
  "name": "rabbitmq-source",
  "config": {
    "connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
    "tasks.max": "1",
    "rabbitmq.host": "rabbitmq-server",
    "rabbitmq.queue": "answers",
    "kafka.topic": "net.gutefrage.answers",
    "transforms": "extractFields",
    "transforms.extractFields.type": "net.gutefrage.connector.transforms.ExtractFields$Value",
    "transforms.extractFields.fields": "body,envelope.routingKey",
    "transforms.extractFields.structName": "net.gutefrage.events"
  }
}

Confluent CLI 包含在包含 Kafka 的 Confluent 开源平台中,是一种开发工具,可通过 运行Ning Zookeeper 实例、Kafka 代理、Confluent Schema Registry、 REST 代理和分布式模式下的 Connect worker。加载连接器时,将连接器配置指定为 JSON 文件或 属性 文件,使用 jq.[=26= 将后者转换为 JSON 格式]

但是,您报错是:

{
  "error_code":400,
  "message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class net.gutefrage.connector.transforms.ExtractFields for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}

此错误消息的重要部分是 "Error getting config definition from Transformation: null"。虽然这有点太神秘了,但它意味着 net.gutefrage.connector.transforms.ExtractFields Java class 的 config() 方法是 returning null。

确保您指定的 net.gutefrage.connector.transforms.ExtractFields$Value 字符串是嵌套静态 class Value 的正确完全限定名称,并且 Value class 完全正确地实现了 org.apache.kafka.connect.transforms.Transformation<? extends ConnectRecord<R>> 接口。请注意,config() 方法必须 return 一个非空 ConfigDef 对象。

查看 this example of a Single Message Transform (SMT) that ships with Apache Kafka, or Robin's blog post 了解其他示例。

使用 fast-data-dev 您可以为任何连接器构建 JAR 文件,然后只需按照

处的说明将其添加到类路径中

https://github.com/Landoop/fast-data-dev#enable-additional-connectors

UI 将自动检测新连接器 - 并在您点击新连接器时为您提供说明:

http://localhost:3030/kafka-connect-ui

什么也值得尝试 - 因为 fast-data-dev 已经带有通用 MQTT 接收器连接器,正在尝试。请参阅 http://docs.datamountaineer.com/en/latest/mqtt-sink.html

中的说明

你实际上需要做 connect.mqtt.kcql=INSERT INTO /answers SELECT body FROM net.gutefrage.answers

因为这是一个通用的 MQTT 连接器 - 您可能需要使用 enable-additional-connectors 说明

添加 rabbitmq 客户端库

确保 transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value 中的 $Value 未被 bash 命令 cat 解释为变量。它对我有用。