如何将 HDFS 连接器添加到 Kafka Connect API?

How add HDFS connector to Kafka Connect API?

我是 Kafka Connect 功能的新手,我在如何使用 Kafka Connect 配置 Kafka 和 HDFS 方面遇到困难。

我一直在关注 Debezium 网站上的教程,在那里我可以测试新事件并查看系统如何工作。在教程中,他们解释了我们如何在 MySql 和 Kafka 之间创建连接器,我尝试做同样的事情,但对于 HDFS。

我已在线完成研究并启动以下命令:

*curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"hdfs-sink","config":{"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max":1,
"topics":"dbserver1,dbserver1.inventory.products,dbserver1.inventory.products_on_hand,dbserver1.inventory.customers,dbserver1.inventory.orders",
"hdfs.url":"hdfs://172.18.0.2:9870",
"flush.size":3,
"logs.dir":"logs",
"topics.dir":"kafka",
"format.class":"io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class":"io.confluent.connect.hdfs.partitioner.DefaultPartitioner",
"partition.field.name":"day"}}'*

在此命令中,我添加了由 Kafka 自动生成的主题,URL 我尝试使用来自容器名称节点的 IP 地址(我不确定这是否正确)。总而言之,我正在测试,但这里的最终目标是让每个事件都进入 HDFS。

{"error_code":500,"message":"Failed to find any class that implements Connector and which name matches io.confluent.connect.hdfs.HdfsSinkConnector, available connectors are: PluginDesc{klass=class io.confluent.connect.activemq.ActiveMQSourceConnector, name='io.confluent.connect.activemq.ActiveMQSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-activemq/'}, PluginDesc{klass=class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, name='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', version='5.3.1', encodedVersion=5.3.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-elasticsearch/'}, PluginDesc{klass=class io.confluent.connect.gcs.GcsSinkConnector, name='io.confluent.connect.gcs.GcsSinkConnector', version='5.0.3', encodedVersion=5.0.3, type=sink, typeName='sink', location='file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-gcs/'}, PluginDesc{klass=class io.confluent.connect.ibm.mq.IbmMQSourceConnector, name='io.confluent.connect.ibm.mq.IbmMQSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-ibmmq/'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSinkConnector, name='io.confluent.connect.jdbc.JdbcSinkConnector', version='5.3.1', encodedVersion=5.3.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-jdbc/'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSourceConnector, name='io.confluent.connect.jdbc.JdbcSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-jdbc/'}, PluginDesc{klass=class io.confluent.connect.jms.JmsSourceConnector, name='io.confluent.connect.jms.JmsSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-activemq/'}, PluginDesc{klass=class io.confluent.connect.s3.S3SinkConnector, name='io.confluent.connect.s3.S3SinkConnector', version='5.3.1', encodedVersion=5.3.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-s3/'}, PluginDesc{klass=class io.confluent.connect.storage.tools.SchemaSourceConnector, name='io.confluent.connect.storage.tools.SchemaSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-s3/'}, PluginDesc{klass=class io.confluent.kafka.connect.datagen.DatagenConnector, name='io.confluent.kafka.connect.datagen.DatagenConnector', version='null', encodedVersion=null, type=source, typeName='source', location='file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=sink, typeName='sink', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=connector, typeName='connector', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=sink, typeName='sink', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}"}

这是终端报错,我认为HDFS的插件没有正确安装(我在网上看了很多例子,但仍然不确定是否正确安装)。

我不确定是否真的有必要使用 Confluent 的这个插件?

我不知道从 docker 安装 HDFS 是否也是一个好主意?

希望您分享一些关于此问题的知识,在此先感谢。

教程link:https://debezium.io/documentation/reference/1.0/tutorial.html

欢迎使用 Whosebug!

插件安装有问题。所以首先,请使用 Kafka Connect REST 接口检查它(查看详情 here). And then you can install connector manually.

HDFS2 接收器连接器已弃用并从 Confluent Platform 安装中删除。

你仍然可以从 Confluent Hub 找到它并安装它,我建议使用官方 Apache Kafka 站点来了解 Kafka Connect 的核心,而不是其他地方

在 Confluent 的 HDFS 2 接收器连接器暂存文档中查看此 link: https://docs.confluent.io/current/connect/kafka-connect-hdfs/index.html

它涵盖了几乎所有内容,如果您使用的是 Confluent Platform,还将帮助您解决问题。

docker 映像提供了一种使用 HDFS 的可靠方法。使用这张图片:https://hub.docker.com/r/sequenceiq/hadoop-docker/