WARN 获取相关 ID 为 1 的元数据时出错:{MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

WARN Error while fetching metadata with correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

当我 运行 使用 kafka 0.9.0.1 执行以下命令时,我收到此警告[1]。你能告诉我我的主题有什么问题吗? (我正在与 运行 在 ec2 中的 kafka 经纪人交谈)

#./kafka-console-consumer.sh --new-consumer --bootstrap-server kafka.xx.com:9092 --topic MY_TOPIC?

[1]

[2016-04-06 10:57:45,839] WARN Error while fetching metadata with correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)
[2016-04-06 10:57:46,066] WARN Error while fetching metadata with correlation id 3 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)
[2016-04-06 10:57:46,188] WARN Error while fetching metadata with correlation id 5 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)
[2016-04-06 10:57:46,311] WARN Error while fetching metadata with correlation id 7 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

您的主题名称无效,因为它包含字符“?”这不是主题名称的合法字符。

当我们的生产者无法生产到相应的地址时,请检查 /kafka/config/server.properties 广告听众的价值, 如果它被注释掉,还有其他问题。 但如果不是,请将您的 ip 地址替换为 localhost,然后重新启动 zookeeper 和 kafka 尝试启动控制台制作者,希望它能正常工作。

以防万一有人遇到与逗号“,”和 logstash 输出到 kafka 或计算的主题名称相关的问题:

在 logstash 输出到 kafka 的 topic_id 中,我们尝试创建 topic_id 附加我们在过滤器中计算的变量。

问题是这个字段已经存在于源文档中,我们稍后将其“再次”添加到 logstash 过滤器中,将字符串字段转换为散列 (array/list)。

正如我们在 logstash 输出中使用的那样

topic_id => ["topicName_%{field}"]

我们最终得到:

topic_id : "topicName_fieldItem1,FieldItem2"

导致logstash日志异常的原因

[WARN ][org.apache.kafka.clients.NetworkClient] [Producer clientId=logstash] Error while fetching metadata with correlation id 3605264 : {topicName_fieldItem1,FieldItem2=INVALID_TOPIC_EXCEPTION}

我遇到了同样的错误。在我的例子中,问题是 space 在我的代码中逗号分隔的主题之间:

@source(type='kafka',
    topic.list="p1, p2, p3",
    partition.no.list='0',
    threading.option='single.thread',
    group.id="group",
    bootstrap.servers='kafka:9092',
    @map(type='json')
)

终于找到解决方案:

@source(type='kafka',
    topic.list="p1,p2,p3",
    partition.no.list='0',
    threading.option='single.thread',
    group.id="group",
    bootstrap.servers='kafka:9092',
    @map(type='json')
)