Kafka 连接问题

Kafka-connect issue

我在 centos 7 (confluent) 上安装了 Apache Kafka,正在尝试 运行 filestream kafka 以分布式模式连接,但出现以下错误:

[2017-08-10 05:26:27,355] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "internal.key.converter" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:463)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:197)
at org.apache.kafka.connect.runtime.distributed.DistributedConfig.<init>(DistributedConfig.java:289)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:65)

现在已通过更新 workers.properties 解决,如 http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-config

中所述

Command used:

/home/arun/kafka/confluent-3.3.0/bin/connect-distributed.sh ../../../properties/file-stream-demo-distributed.properties

Filestream properties file (workers.properties):

name=file-stream-demo-distributed
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/tmp/demo-file.txt
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
config.storage.topic=demo-2-distributed
offset.storage.topic=demo-2-distributed
status.storage.topic=demo-2-distributed
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
group.id=""

我添加了以下属性,命令顺利通过。

bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
config.storage.topic=demo-2-distributed
offset.storage.topic=demo-2-distributed
status.storage.topic=demo-2-distributed
group.id=""

但是,现在当我 运行 消费者命令时,我无法看到 /tmp/demo-file.txt 中的消息。请让我知道是否有办法检查消息是否已发布到 kafka 主题和分区?

kafka-console-consumer --zookeeper localhost:2181 --topic demo-2-distributed --from-beginning

我想我在这里遗漏了一些非常基本的东西。有人可以帮忙吗?

您需要为 Kafka connect 框架定义唯一的主题来存储其配置、偏移量和状态。

在您的 workers.properties 文件中将这些参数更改为如下内容:

config.storage.topic=demo-2-distributed-config
offset.storage.topic=demo-2-distributed-offset
status.storage.topic=demo-2-distributed-status

这些主题用于存储连接的状态和配置元数据,用于存储 运行 在连接之上的任何连接器的消息。不要在这三个主题中的任何一个上使用控制台使用者并期望看到消息。

消息存储在连接器配置 json 中配置的主题中,参数名为 "topic"。

示例文件-接收器-config.json 文件

{
  "name": "MyFileSink",
  "config": {
      "topics": "mytopic",
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
      "tasks.max": 1,
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.storage.StringConverter",
      "file": "/tmp/demo-file.txt"
    }
}

分布式工作器 运行ning 后,您需要像这样使用 curl 将配置文件应用到它:

curl -X POST -H "Content-Type: application/json" --data @file-sink-config.json http://localhost:8083/connectors

之后,配置将安全地存储在您创建的配置主题中,供所有分布式工作人员使用。确保配置主题(以及状态和偏移量主题)不会使消息过期,否则当它过期时您将丢失连接器配置。