Vertica-Kafka vkconfig 连接器

Vertica-Kafka vkconfig connector

我正在尝试定义一个 Vertica-Kafka 调度程序。我 运行 前几个命令成功,但在以下命令上失败:

$ /opt/vertica/packages/kafka/bin/vkconfig source --create --cluster kafka_nms_cluster --source test --partitions 1 --conf /home/vertica/vkconfig/vkconfig.conf

我得到的错误

Exception in thread "main" com.vertica.solutions.kafka.exception.ConfigurationException: ERROR: [[Vertica][VJDBC](5861) ERROR: Error calling processPartition() in User Function KafkaListTopics at [/data/qb_workspaces/jenkins2/ReleaseBuilds/Grader/REL-9_2_1-x_grader/build/udx/supported/kafka/KafkaUtil.cpp:163], error code: 0, message: Error getting metadata: [Local: Broker transport failure]]
        at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:248)
        at com.vertica.solutions.kafka.model.StreamSource.setFromMapAndValidate(StreamSource.java:194)
        at com.vertica.solutions.kafka.model.StreamModel.<init>(StreamModel.java:93)
        at com.vertica.solutions.kafka.model.StreamSource.<init>(StreamSource.java:44)
        at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:62)
        at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:13)
        at com.vertica.solutions.kafka.cli.CLI.run(CLI.java:59)
        at com.vertica.solutions.kafka.cli.CLI._main(CLI.java:141)
        at com.vertica.solutions.kafka.cli.SourceCLI.main(SourceCLI.java:29)
Caused by: java.sql.SQLNonTransientException: [Vertica][VJDBC](5861) ERROR: Error calling processPartition() in User Function KafkaListTopics at [/data/qb_workspaces/jenkins2/ReleaseBuilds/Grader/REL-9_2_1-x_grader/build/udx/supported/kafka/KafkaUtil.cpp:163], error code: 0, message: Error getting metadata: [Local: Broker transport failure]
        at com.vertica.util.ServerErrorData.buildException(Unknown Source)
        at com.vertica.dataengine.VResultSet.fetchChunk(Unknown Source)
        at com.vertica.dataengine.VResultSet.initialize(Unknown Source)
        at com.vertica.dataengine.VQueryExecutor.readExecuteResponse(Unknown Source)
        at com.vertica.dataengine.VQueryExecutor.handleExecuteResponse(Unknown Source)
        at com.vertica.dataengine.VQueryExecutor.execute(Unknown Source)
        at com.vertica.jdbc.common.SPreparedStatement.executeWithParams(Unknown Source)
        at com.vertica.jdbc.common.SPreparedStatement.executeQuery(Unknown Source)
        at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:227)
        ... 8 more
Caused by: com.vertica.support.exceptions.NonTransientException: [Vertica][VJDBC](5861) ERROR: Error calling processPartition() in User Function KafkaListTopics at [/data/qb_workspaces/jenkins2/ReleaseBuilds/Grader/REL-9_2_1-x_grader/build/udx/supported/kafka/KafkaUtil.cpp:163], error code: 0, message: Error getting metadata: [Local: Broker transport failure]
        ... 17 more

但是,当我尝试使用 vsql 运行 KafkaListTopics 时,结果集显示具有 1 个分区的测试主题。

[root@dal_server1 ~]# /opt/vertica/bin/vsql -U vertica -c "SELECT KafkaListTopics(USING PARAMETERS brokers='10.22.2.38:9092') OVER ();"

       topic        | num_partitions
--------------------+----------------
 __consumer_offsets |             50
 test               |              1
 TutorialTopic      |              1
(3 rows)

可能导致此错误的原因是什么?

谢谢 阿维

问题实际上可能与您在尝试创建源之前创建的集群有关。我在测试 Vertica/Kafka 集成时遇到了同样的问题,其中测试 Kafka 集群没有 DNS 条目,但 DNS 名称存储在 stream_clsuters table.

查询<scheduler_config_schema>.stream_clusterstable。如果存储的是 DNS 名称而不是简单的 IP 地址,那么您可以做两件事。

  1. stream_clusters table 进行手动更新,如果只有一个 Kafka 节点,则将其更改为 <ip_address>:<port>,如果有多个,则更改为 <ip_address1>:<port>,...,<ip_addressN>:<port>
  2. 或者,将域名添加到所有 Vertica 节点上的 /etc/hosts

例如,在 stream_clusters table 中,您看到 domain_name_1:9092、运行 这个 UPDATE 语句:

UPDATE <scheduler_config_schema>.stream_clusters
SET hosts = '10.22.2.38:9092'
WHERE id = <some_id>

通常,我建议不要对这些调度程序配置执行任何类型的手动 DML table,但我之前已经完成了此特定更新并且它是安全的(尤其是在测试中)。

当然,在真正的生产环境中,Kafka 集群应该在您的网络中有 DNS 条目,您不必担心这个错误,但是为了使用 VM 或 Docker 容器进行测试,我已经遇到过几次,上面的建议已经解决了。