org.apache.kafka.common.errors.TimeoutException:主题在 60000 毫秒后不存在于元数据中
org.apache.kafka.common.errors.TimeoutException: Topic not present in metadata after 60000 ms
我遇到错误:
org.apache.kafka.common.errors.TimeoutException: Topic testtopic2 not present in metadata after 60000 ms.
当尝试使用 Java 在 windows 上我的本地 kafka 实例中生成主题时。请注意,主题 testtopic2 存在,我可以使用 windows 控制台生成器向它生成消息。
在我使用的代码下方:
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class Kafka_Producer {
public static void main(String[] args){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
TestCallback callback = new TestCallback();
for (long i = 0; i < 100 ; i++) {
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
"testtopic2", "key-" + i, "message-"+i );
producer.send(data, callback);
}
producer.close();
}
private static class TestCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println("Error while producing message to topic :" + recordMetadata);
e.printStackTrace();
} else {
String message = String.format("sent message to topic:%s partition:%s offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
System.out.println(message);
}
}
}
}
Pom 依赖项:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
列表和描述的输出:
我今天遇到了同样的问题。我是 Kafka 的新手,只是想获取样本 Java 生产者和消费者 运行。我能够让消费者工作,但不断收到与生产者相同的“元数据中不存在主题”错误。
最后,迫于无奈,我给我的制作人加了一些代码来转储话题。当我这样做时,由于 jackson-databind 和 jackson-core 包中缺少 类,我得到了运行时错误。添加它们后,我不再收到“主题不存在”错误。我删除了我临时添加的 topic-dumping 代码,它仍然有效。
首先,我想感谢 Bobb Dobbs 的回答,我今天也为此苦苦挣扎了一段时间。我只想补充一点,我必须添加的唯一依赖项是 jackson-databind
。除了 kafka-clients
.
之外,这是我项目中唯一的依赖项
更新:我对正在发生的事情有了更多的了解。 kafka-clients
将其 jackson-databind
依赖项的范围设置为“已提供”,这意味着它希望它在运行时由 JDK 或容器提供。有关提供的 Maven 范围的更多详细信息,请参阅 this article。
This scope is used to mark dependencies that should be provided at runtime by JDK or a container, hence the name.
A good use case for this scope would be a web application deployed in some container, where the container already provides some libraries itself.
我不确定将其范围设置为 provided 的确切原因,除了也许这个库是人们通常希望自己提供的,以使其保持最新版本以进行安全修复等。
我创建了一个只有一个分区的主题,并试图将该主题填充到 10 个分区中。我遇到了这个问题。
我使用 kafka-topics.sh 脚本删除了主题,但没等多久就完成了清理工作。我开始填充这个话题。当我查看主题元数据时,它有一个分区,我遇到的问题与本答案第一部分中提到的完全相同。
您可能需要检查 metadata.max.idle.ms
的生产商属性
生产者缓存的元数据与上述配置值一样长。代理端对元数据的任何更改都不会立即在客户端(生产者)上可用。但是,重启生产者应该在启动时读取元数据。
更新:在此处检查默认值.. https://kafka.apache.org/documentation.html#metadata.max.idle.ms
我也有类似的问题,我在我的 macbook 上的本地环境中尝试这个。这很令人沮丧,我尝试了一些方法
- 停止 Zookeeper,停止 Kafka,重新启动 ZK 和 Kafka。 (没有帮助)
- 停止 ZK。删除了 ZK 数据目录。删除了 Kafka logs.dirs 并重新启动了 Kafka(没有帮助)
- 重新启动我的 macbook - 成功了。
我在生产中使用 Kafka 已经 3 年多了,但是在集群上没有遇到这个问题,只在我的本地环境中发生过。但是,重新启动为我修复了它。
此错误也可能由于目标 Kafka 实例“死亡”或 URL 错误而出现。
在这种情况下,向 Kafka 发送消息的线程将在 max.block.ms
时间被阻塞,该时间默认为 60000 毫秒。
您可以通过传递更改的值来检查是否是因为上述 属性:
Properties props = new Properties();
...(among others)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30000); # 30 sec or any other value of your choice
如果 TimeoutException
在您指定的时间后抛出,那么您应该检查您的 URL 到 Kafka 是否正确或 Kafka 实例是否存活。
请注意,这也可能发生,因为 kafka-client 和 Spring 的版本不兼容
https://spring.io/projects/spring-kafka“Kafka 客户端兼容性”矩阵中的更多信息
当我的团队中有人更改了 spring.kafka.security.protocol 配置的值时,我看到了这个问题(我们在我的项目中使用 Spring)。以前它在我们的配置中是“SSL”,但已更新为 PLAINTEXT
。在我们连接到使用 SSL 的集群的更高环境中,我们看到错误 OP 运行 into.
为什么我们看到这个错误而不是 SSL 错误或身份验证错误我不明白,但是如果你 运行 遇到这个错误,可能值得仔细检查你的 Kafka 集群的客户端身份验证配置。
kafka-topic --bootstrap-server 127.0.0.1:9092 --topic my_first --create --partitions 3
首先尝试使用上述命令在Kafka流中插入主题
这里my_first是题目名称
这个错误是一个明显的错误,它可能是由以下深层条件触发的。
- 首先也是最常见的情况是你的kafka生产者配置错误,检查你的kafka属性
BOOTSTRAP_SERVERS_CONFIG
天气是正确的服务器地址。
- 在docker环境中,您可能会检查您的端口映射。
- 检查防火墙是否开放了broker所在服务器的9092端口
- 如果您的经纪人 运行 在 ssl 中,请检查有关
SSL_TRUSTSTROE_LOCATION_CONFIG
、SECURITY_PROTOCOL_CONFIG
、SSL_TRUSTSTORE_TYPE_CONFIG
的生产者配置。
并且,一些代理在 ssl 和 PLAINTEXT 中都配置 运行,请确保您需要哪个端口。
也可能是不存在的分区造成的。
例如如果您只有一个分区 [0]
,而您的生产者尝试发送到分区 [1]
,您将得到同样的错误。本例中的主题存在,但分区不存在。
我遇到错误:
org.apache.kafka.common.errors.TimeoutException: Topic testtopic2 not present in metadata after 60000 ms.
当尝试使用 Java 在 windows 上我的本地 kafka 实例中生成主题时。请注意,主题 testtopic2 存在,我可以使用 windows 控制台生成器向它生成消息。
在我使用的代码下方:
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class Kafka_Producer {
public static void main(String[] args){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
TestCallback callback = new TestCallback();
for (long i = 0; i < 100 ; i++) {
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
"testtopic2", "key-" + i, "message-"+i );
producer.send(data, callback);
}
producer.close();
}
private static class TestCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println("Error while producing message to topic :" + recordMetadata);
e.printStackTrace();
} else {
String message = String.format("sent message to topic:%s partition:%s offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
System.out.println(message);
}
}
}
}
Pom 依赖项:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
列表和描述的输出:
我今天遇到了同样的问题。我是 Kafka 的新手,只是想获取样本 Java 生产者和消费者 运行。我能够让消费者工作,但不断收到与生产者相同的“元数据中不存在主题”错误。
最后,迫于无奈,我给我的制作人加了一些代码来转储话题。当我这样做时,由于 jackson-databind 和 jackson-core 包中缺少 类,我得到了运行时错误。添加它们后,我不再收到“主题不存在”错误。我删除了我临时添加的 topic-dumping 代码,它仍然有效。
首先,我想感谢 Bobb Dobbs 的回答,我今天也为此苦苦挣扎了一段时间。我只想补充一点,我必须添加的唯一依赖项是 jackson-databind
。除了 kafka-clients
.
更新:我对正在发生的事情有了更多的了解。 kafka-clients
将其 jackson-databind
依赖项的范围设置为“已提供”,这意味着它希望它在运行时由 JDK 或容器提供。有关提供的 Maven 范围的更多详细信息,请参阅 this article。
This scope is used to mark dependencies that should be provided at runtime by JDK or a container, hence the name. A good use case for this scope would be a web application deployed in some container, where the container already provides some libraries itself.
我不确定将其范围设置为 provided 的确切原因,除了也许这个库是人们通常希望自己提供的,以使其保持最新版本以进行安全修复等。
我创建了一个只有一个分区的主题,并试图将该主题填充到 10 个分区中。我遇到了这个问题。
我使用 kafka-topics.sh 脚本删除了主题,但没等多久就完成了清理工作。我开始填充这个话题。当我查看主题元数据时,它有一个分区,我遇到的问题与本答案第一部分中提到的完全相同。
您可能需要检查 metadata.max.idle.ms
的生产商属性生产者缓存的元数据与上述配置值一样长。代理端对元数据的任何更改都不会立即在客户端(生产者)上可用。但是,重启生产者应该在启动时读取元数据。
更新:在此处检查默认值.. https://kafka.apache.org/documentation.html#metadata.max.idle.ms
我也有类似的问题,我在我的 macbook 上的本地环境中尝试这个。这很令人沮丧,我尝试了一些方法
- 停止 Zookeeper,停止 Kafka,重新启动 ZK 和 Kafka。 (没有帮助)
- 停止 ZK。删除了 ZK 数据目录。删除了 Kafka logs.dirs 并重新启动了 Kafka(没有帮助)
- 重新启动我的 macbook - 成功了。
我在生产中使用 Kafka 已经 3 年多了,但是在集群上没有遇到这个问题,只在我的本地环境中发生过。但是,重新启动为我修复了它。
此错误也可能由于目标 Kafka 实例“死亡”或 URL 错误而出现。
在这种情况下,向 Kafka 发送消息的线程将在 max.block.ms
时间被阻塞,该时间默认为 60000 毫秒。
您可以通过传递更改的值来检查是否是因为上述 属性:
Properties props = new Properties();
...(among others)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30000); # 30 sec or any other value of your choice
如果 TimeoutException
在您指定的时间后抛出,那么您应该检查您的 URL 到 Kafka 是否正确或 Kafka 实例是否存活。
请注意,这也可能发生,因为 kafka-client 和 Spring 的版本不兼容
https://spring.io/projects/spring-kafka“Kafka 客户端兼容性”矩阵中的更多信息
当我的团队中有人更改了 spring.kafka.security.protocol 配置的值时,我看到了这个问题(我们在我的项目中使用 Spring)。以前它在我们的配置中是“SSL”,但已更新为 PLAINTEXT
。在我们连接到使用 SSL 的集群的更高环境中,我们看到错误 OP 运行 into.
为什么我们看到这个错误而不是 SSL 错误或身份验证错误我不明白,但是如果你 运行 遇到这个错误,可能值得仔细检查你的 Kafka 集群的客户端身份验证配置。
kafka-topic --bootstrap-server 127.0.0.1:9092 --topic my_first --create --partitions 3
首先尝试使用上述命令在Kafka流中插入主题
这里my_first是题目名称
这个错误是一个明显的错误,它可能是由以下深层条件触发的。
- 首先也是最常见的情况是你的kafka生产者配置错误,检查你的kafka属性
BOOTSTRAP_SERVERS_CONFIG
天气是正确的服务器地址。 - 在docker环境中,您可能会检查您的端口映射。
- 检查防火墙是否开放了broker所在服务器的9092端口
- 如果您的经纪人 运行 在 ssl 中,请检查有关
SSL_TRUSTSTROE_LOCATION_CONFIG
、SECURITY_PROTOCOL_CONFIG
、SSL_TRUSTSTORE_TYPE_CONFIG
的生产者配置。 并且,一些代理在 ssl 和 PLAINTEXT 中都配置 运行,请确保您需要哪个端口。
也可能是不存在的分区造成的。
例如如果您只有一个分区 [0]
,而您的生产者尝试发送到分区 [1]
,您将得到同样的错误。本例中的主题存在,但分区不存在。