org.apache.kafka.common.errors.TimeoutException:主题在 60000 毫秒后不存在于元数据中

org.apache.kafka.common.errors.TimeoutException: Topic not present in metadata after 60000 ms

我遇到错误:

 org.apache.kafka.common.errors.TimeoutException: Topic testtopic2 not present in metadata after 60000 ms.

当尝试使用 Java 在 windows 上我的本地 kafka 实例中生成主题时。请注意,主题 testtopic2 存在,我可以使用 windows 控制台生成器向它生成消息。

在我使用的代码下方:

import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Kafka_Producer {

    public static void main(String[] args){

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        TestCallback callback = new TestCallback();
        for (long i = 0; i < 100 ; i++) {
            ProducerRecord<String, String> data = new ProducerRecord<String, String>(
                    "testtopic2", "key-" + i, "message-"+i );
            producer.send(data, callback);
        }

        producer.close();
    }


    private static class TestCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {
                System.out.println("Error while producing message to topic :" + recordMetadata);
                e.printStackTrace();
            } else {
                String message = String.format("sent message to topic:%s partition:%s  offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
                System.out.println(message);
            }
        }
    }

}

Pom 依赖项:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

列表和描述的输出:

我今天遇到了同样的问题。我是 Kafka 的新手,只是想获取样本 Java 生产者和消费者 运行。我能够让消费者工作,但不断收到与生产者相同的“元数据中不存在主题”错误。

最后,迫于无奈,我给我的制作人加了一些代码来转储话题。当我这样做时,由于 jackson-databind 和 jackson-core 包中缺少 类,我得到了运行时错误。添加它们后,我不再收到“主题不存在”错误。我删除了我临时添加的 topic-dumping 代码,它仍然有效。

首先,我想感谢 Bobb Dobbs 的回答,我今天也为此苦苦挣扎了一段时间。我只想补充一点,我必须添加的唯一依赖项是 jackson-databind。除了 kafka-clients.

之外,这是我项目中唯一的依赖项

更新:我对正在发生的事情有了更多的了解。 kafka-clients 将其 jackson-databind 依赖项的范围设置为“已提供”,这意味着它希望它在运行时由 JDK 或容器提供。有关提供的 Maven 范围的更多详细信息,请参阅 this article

This scope is used to mark dependencies that should be provided at runtime by JDK or a container, hence the name. A good use case for this scope would be a web application deployed in some container, where the container already provides some libraries itself.

我不确定将其范围设置为 provided 的确切原因,除了也许这个库是人们通常希望自己提供的,以使其保持最新版本以进行安全修复等。

  1. 我创建了一个只有一个分区的主题,并试图将该主题填充到 10 个分区中。我遇到了这个问题。

  2. 我使用 kafka-topics.sh 脚本删除了主题,但没等多久就完成了清理工作。我开始填充这个话题。当我查看主题元数据时,它有一个分区,我遇到的问题与本答案第一部分中提到的完全相同。

您可能需要检查 metadata.max.idle.ms

的生产商属性

生产者缓存的元数据与上述配置值一样长。代理端对元数据的任何更改都不会立即在客户端(生产者)上可用。但是,重启生产者应该在启动时读取元数据。

更新:在此处检查默认值.. https://kafka.apache.org/documentation.html#metadata.max.idle.ms

我也有类似的问题,我在我的 macbook 上的本地环境中尝试这个。这很令人沮丧,我尝试了一些方法

  1. 停止 Zookeeper,停止 Kafka,重新启动 ZK 和 Kafka。 (没有帮助)
  2. 停止 ZK。删除了 ZK 数据目录。删除了 Kafka logs.dirs 并重新启动了 Kafka(没有帮助)
  3. 重新启动我的 macbook - 成功了。

我在生产中使用 Kafka 已经 3 年多了,但是在集群上没有遇到这个问题,只在我的本地环境中发生过。但是,重新启动为我修复了它。

此错误也可能由于目标 Kafka 实例“死亡”或 URL 错误而出现。

在这种情况下,向 Kafka 发送消息的线程将在 max.block.ms 时间被阻塞,该时间默认为 60000 毫秒。

您可以通过传递更改的值来检查是否是因为上述 属性:

Properties props = new Properties();
...(among others)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30000); # 30 sec or any other value of your choice 

如果 TimeoutException 在您指定的时间后抛出,那么您应该检查您的 URL 到 Kafka 是否正确或 Kafka 实例是否存活。

请注意,这也可能发生,因为 kafka-client 和 Spring 的版本不兼容

https://spring.io/projects/spring-kafka“Kafka 客户端兼容性”矩阵中的更多信息

当我的团队中有人更改了 spring.kafka.security.protocol 配置的值时,我看到了这个问题(我们在我的项目中使用 Spring)。以前它在我们的配置中是“SSL”,但已更新为 PLAINTEXT。在我们连接到使用 SSL 的集群的更高环境中,我们看到错误 OP 运行 into.

为什么我们看到这个错误而不是 SSL 错误或身份验证错误我不明白,但是如果你 运行 遇到这个错误,可能值得仔细检查你的 Kafka 集群的客户端身份验证配置。

kafka-topic --bootstrap-server 127.0.0.1:9092 --topic my_first --create --partitions 3

首先尝试使用上述命令在Kafka流中插入主题

这里my_first是题目名称

这个错误是一个明显的错误,它可能是由以下深层条件触发的。

  1. 首先也是最常见的情况是你的kafka生产者配置错误,检查你的kafka属性BOOTSTRAP_SERVERS_CONFIG天气是正确的服务器地址。
  2. 在docker环境中,您可能会检查您的端口映射。
  3. 检查防火墙是否开放了broker所在服务器的9092端口
  4. 如果您的经纪人 运行 在 ssl 中,请检查有关 SSL_TRUSTSTROE_LOCATION_CONFIGSECURITY_PROTOCOL_CONFIGSSL_TRUSTSTORE_TYPE_CONFIG 的生产者配置。 并且,一些代理在 ssl 和 PLAINTEXT 中都配置 运行,请确保您需要哪个端口。

也可能是不存在的分区造成的。

例如如果您只有一个分区 [0],而您的生产者尝试发送到分区 [1],您将得到同样的错误。本例中的主题存在,但分区不存在。