我在某些计算机上收到 "Topic not present in metadata after 60000 ms" 消息
I'm getting "Topic not present in metadata after 60000 ms" message on some computers
这是我的程序
package kafkaConsumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Scanner;
import java.util.concurrent.Future;
public class KafkaConsumerExample {
private final static String INTOPIC = "my-intopic";
private final static String OUTTOPIC = "my-outtopic";
private final static String BOOTSTRAP_SERVERS = "192.168.10.10:9092";
private static Producer<Long, String> createProducer(String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerExample");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
private static Consumer<Long, String> createConsumer(String intopic, String bootstrapServers) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerExample");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(intopic));
return consumer;
}
static boolean run(
Consumer<Long, String> consumer, Producer<Long, String> producer,
String inTopic, String outTopic) throws InterruptedException {
String valueToSend;
long keyToUse;
if (consumer == null) {
Scanner sc = new Scanner(System.in);
System.out.print("Enter key> ");
keyToUse = sc.nextLong();
valueToSend = sc.nextLine();
System.out.print("Enter value> ");
valueToSend = sc.nextLine();
} else {
Duration delta = Duration.ofSeconds(1);
ConsumerRecords<Long, String> consumerRecords = consumer.poll(delta);
while (consumerRecords.count() == 0) {
consumerRecords = consumer.poll(delta);
}
ConsumerRecord<Long, String> record = consumerRecords.iterator().next();
keyToUse = record.key();
valueToSend = record.value();
if (producer != null)
System.out.println("Got key = " + keyToUse + " and value = " + valueToSend);
}
if (producer == null) {
System.out.println("key = " + keyToUse + " and value = " + valueToSend);
} else {
try {
System.out.println("Creating ProducerRecord");
final ProducerRecord<Long, String> record =
new ProducerRecord<>(outTopic, keyToUse, valueToSend);
System.out.println("Calling producer.send");
Future<RecordMetadata> sent = producer.send(record);
System.out.println("Calling sent.get");
RecordMetadata metadata = sent.get();
System.out.println("Calling flush");
producer.flush();
System.out.println("After flush");
} catch (Exception e) {
System.out.println("Exception sending message: " + e.getMessage());
}
}
return !valueToSend.equals("STOP");
}
public static void usage() {
System.out.println(System.getProperty("sun.java.command"));
System.out.println();
System.out.println("Usage parameters: [--intopic name] [--outtopic name] [--bootstrap-servers servers]");
System.exit(1);
}
public static void main(String... args) throws Exception {
String inTopic = INTOPIC;
String outTopic = OUTTOPIC;
String bootstrapServers = BOOTSTRAP_SERVERS;
for (int i = 0; i < args.length; ++i) {
if (args[i].equals("--intopic")) {
if (i == args.length - 1) {
usage();
}
inTopic = args[++i];
} else if (args[i].equals("--outtopic")) {
if (i == args.length - 1) {
usage();
}
outTopic = args[++i];
} else if (args[i].equals("--bootstrap-servers")) {
if (i == args.length - 1) {
usage();
}
bootstrapServers = args[++i];
} else {
usage();
}
}
final Consumer<Long, String> consumer;
if (inTopic.equals("stdin")) {
consumer = null;
} else {
consumer = createConsumer(inTopic, bootstrapServers);
}
final Producer<Long, String> producer;
if (outTopic.equals("stdout")) {
producer = null;
} else {
producer = createProducer(bootstrapServers);
}
while (true) {
if (!run(consumer, producer, inTopic, outTopic)) {
break;
}
}
if (consumer != null)
consumer.close();
if (producer != null)
producer.close();
}
}
我 运行 它在 Windows 和 Linux 上。在某些计算机上它 运行 没问题,但在其他计算机上,特别是 Linux 不是 kafka 机器的机器,它总是给我这个错误:
Exception sending message: org.apache.kafka.common.errors.TimeoutException: Topic outtopic not present in metadata after 60000 ms.
当然,当尝试在 run()
函数中发送消息时会发生这种情况,特别是在句子 RecordMetadata metadata = sent.get()
.
中
这个 kafka 安装允许自动创建新主题。事实上,如果我在 --outtopic 参数中输入一个新名称,即使发送消息失败,也会创建主题。
有什么线索吗?我在配置中缺少什么?
谢谢
西蒙
192.168.10.10:9092
这似乎是一个内部IP。查看无法访问的客户端是否在其网络范围内,即能否访问该IP
尝试从您的客户端计算机进行远程登录..
telnet 192.168.10.10 9092
如果您不能 telnet
然后提供您的客户可以访问的 IP 并确保在 advertised.listeners
中也相同。
同时检查您的 advertised.listeners
配置。当我们连接到 bootstrap.servers
中给定的 url 时,通常应该与 advertised.listeners
配置中的
相提并论。
主题元数据不存在意味着您的客户端无法获取有关给定主题的任何信息,即它无法通过给定的 bootstrap.servers
属性.
获取元数据
这是我的程序
package kafkaConsumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Scanner;
import java.util.concurrent.Future;
public class KafkaConsumerExample {
private final static String INTOPIC = "my-intopic";
private final static String OUTTOPIC = "my-outtopic";
private final static String BOOTSTRAP_SERVERS = "192.168.10.10:9092";
private static Producer<Long, String> createProducer(String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerExample");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
private static Consumer<Long, String> createConsumer(String intopic, String bootstrapServers) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerExample");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(intopic));
return consumer;
}
static boolean run(
Consumer<Long, String> consumer, Producer<Long, String> producer,
String inTopic, String outTopic) throws InterruptedException {
String valueToSend;
long keyToUse;
if (consumer == null) {
Scanner sc = new Scanner(System.in);
System.out.print("Enter key> ");
keyToUse = sc.nextLong();
valueToSend = sc.nextLine();
System.out.print("Enter value> ");
valueToSend = sc.nextLine();
} else {
Duration delta = Duration.ofSeconds(1);
ConsumerRecords<Long, String> consumerRecords = consumer.poll(delta);
while (consumerRecords.count() == 0) {
consumerRecords = consumer.poll(delta);
}
ConsumerRecord<Long, String> record = consumerRecords.iterator().next();
keyToUse = record.key();
valueToSend = record.value();
if (producer != null)
System.out.println("Got key = " + keyToUse + " and value = " + valueToSend);
}
if (producer == null) {
System.out.println("key = " + keyToUse + " and value = " + valueToSend);
} else {
try {
System.out.println("Creating ProducerRecord");
final ProducerRecord<Long, String> record =
new ProducerRecord<>(outTopic, keyToUse, valueToSend);
System.out.println("Calling producer.send");
Future<RecordMetadata> sent = producer.send(record);
System.out.println("Calling sent.get");
RecordMetadata metadata = sent.get();
System.out.println("Calling flush");
producer.flush();
System.out.println("After flush");
} catch (Exception e) {
System.out.println("Exception sending message: " + e.getMessage());
}
}
return !valueToSend.equals("STOP");
}
public static void usage() {
System.out.println(System.getProperty("sun.java.command"));
System.out.println();
System.out.println("Usage parameters: [--intopic name] [--outtopic name] [--bootstrap-servers servers]");
System.exit(1);
}
public static void main(String... args) throws Exception {
String inTopic = INTOPIC;
String outTopic = OUTTOPIC;
String bootstrapServers = BOOTSTRAP_SERVERS;
for (int i = 0; i < args.length; ++i) {
if (args[i].equals("--intopic")) {
if (i == args.length - 1) {
usage();
}
inTopic = args[++i];
} else if (args[i].equals("--outtopic")) {
if (i == args.length - 1) {
usage();
}
outTopic = args[++i];
} else if (args[i].equals("--bootstrap-servers")) {
if (i == args.length - 1) {
usage();
}
bootstrapServers = args[++i];
} else {
usage();
}
}
final Consumer<Long, String> consumer;
if (inTopic.equals("stdin")) {
consumer = null;
} else {
consumer = createConsumer(inTopic, bootstrapServers);
}
final Producer<Long, String> producer;
if (outTopic.equals("stdout")) {
producer = null;
} else {
producer = createProducer(bootstrapServers);
}
while (true) {
if (!run(consumer, producer, inTopic, outTopic)) {
break;
}
}
if (consumer != null)
consumer.close();
if (producer != null)
producer.close();
}
}
我 运行 它在 Windows 和 Linux 上。在某些计算机上它 运行 没问题,但在其他计算机上,特别是 Linux 不是 kafka 机器的机器,它总是给我这个错误:
Exception sending message: org.apache.kafka.common.errors.TimeoutException: Topic outtopic not present in metadata after 60000 ms.
当然,当尝试在 run()
函数中发送消息时会发生这种情况,特别是在句子 RecordMetadata metadata = sent.get()
.
这个 kafka 安装允许自动创建新主题。事实上,如果我在 --outtopic 参数中输入一个新名称,即使发送消息失败,也会创建主题。
有什么线索吗?我在配置中缺少什么?
谢谢
西蒙
192.168.10.10:9092
这似乎是一个内部IP。查看无法访问的客户端是否在其网络范围内,即能否访问该IP
尝试从您的客户端计算机进行远程登录..
telnet 192.168.10.10 9092
如果您不能 telnet
然后提供您的客户可以访问的 IP 并确保在 advertised.listeners
中也相同。
同时检查您的 advertised.listeners
配置。当我们连接到 bootstrap.servers
中给定的 url 时,通常应该与 advertised.listeners
配置中的
主题元数据不存在意味着您的客户端无法获取有关给定主题的任何信息,即它无法通过给定的 bootstrap.servers
属性.