Kafka Consumer Poll 无限期运行并且 return 没有任何结果
Kafka Consumer Poll runs indefinitely and doesn't return anything
我在 KafkaConsumer.poll(持续时间超时)方面遇到困难,它无限期地运行并且永远不会退出该方法。了解这可能与连接有关,有时我发现它有点不一致。如果 poll 停止响应,我该如何处理?下面给出的是 KafkaConsumer.poll()
的片段
public ConsumerRecords<K, V> poll(final Duration timeout) {
return poll(time.timer(timeout), true);
}
我从这里调用上面的内容:
Duration timeout = Duration.ofSeconds(30);
while (true) {
final ConsumerRecords<recordID, topicName> records = consumer.poll(timeout);
System.out.println("record count is" + records.count());
}
我收到以下错误:
org.apache.kafka.common.errors.SerializationException: Error
deserializing key/value for partition at offset 2. If
needed, please seek past the record to continue consumption.
我在尝试解决上述问题时偶然发现了一些有用的信息。我将提供应该能够处理此问题的代码段,但在此之前了解导致此问题的原因很重要。
在向 Apache Kafka 生成或使用消息或数据时,我们需要该消息或数据的模式结构,在我的例子中是 Avro 模式。如果向 Kafka 生成的消息与该消息模式发生冲突,则会对消费产生影响。
在您的消费者主题中使用记录的方法中添加以下代码 --
记得导入以下包:
import org.apache.kafka.common.TopicPartition;
import org.jsoup.SerializationException;
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = null;
try {
records = consumer.poll(10000);
} catch (SerializationException e) {
String s = e.getMessage().split("Error deserializing key/value
for partition ")[1].split(". If needed, please seek past the record to
continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at") .
[0]);
TopicPartition topicPartition = new TopicPartition(topics,
partition);
//log.info("Skipping " + topic + "-" + partition + " offset "
+ offset);
consumer.seek(topicPartition, offset + 1);
}
for (ConsumerRecord<String, GenericRecord> record : records) {
System.out.printf("value = %s \n", record.value());
}
}
} finally {
consumer.close();
}
我在搭建测试环境的时候运行进入了这个。
运行 代理上的以下命令打印出存储的记录,正如人们所期望的那样:
bin/kafka-console-consumer.sh --bootstrap-server="localhost:9092" --topic="foo" --from-beginning
原来是Kafka服务器配置错误。从外部连接
IP 地址 listeners
必须在 kafka/config/server.properties
中具有有效值,例如
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
我在 KafkaConsumer.poll(持续时间超时)方面遇到困难,它无限期地运行并且永远不会退出该方法。了解这可能与连接有关,有时我发现它有点不一致。如果 poll 停止响应,我该如何处理?下面给出的是 KafkaConsumer.poll()
的片段public ConsumerRecords<K, V> poll(final Duration timeout) {
return poll(time.timer(timeout), true);
}
我从这里调用上面的内容:
Duration timeout = Duration.ofSeconds(30);
while (true) {
final ConsumerRecords<recordID, topicName> records = consumer.poll(timeout);
System.out.println("record count is" + records.count());
}
我收到以下错误:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition at offset 2. If needed, please seek past the record to continue consumption.
我在尝试解决上述问题时偶然发现了一些有用的信息。我将提供应该能够处理此问题的代码段,但在此之前了解导致此问题的原因很重要。
在向 Apache Kafka 生成或使用消息或数据时,我们需要该消息或数据的模式结构,在我的例子中是 Avro 模式。如果向 Kafka 生成的消息与该消息模式发生冲突,则会对消费产生影响。
在您的消费者主题中使用记录的方法中添加以下代码 --
记得导入以下包:
import org.apache.kafka.common.TopicPartition;
import org.jsoup.SerializationException;
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = null;
try {
records = consumer.poll(10000);
} catch (SerializationException e) {
String s = e.getMessage().split("Error deserializing key/value
for partition ")[1].split(". If needed, please seek past the record to
continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at") .
[0]);
TopicPartition topicPartition = new TopicPartition(topics,
partition);
//log.info("Skipping " + topic + "-" + partition + " offset "
+ offset);
consumer.seek(topicPartition, offset + 1);
}
for (ConsumerRecord<String, GenericRecord> record : records) {
System.out.printf("value = %s \n", record.value());
}
}
} finally {
consumer.close();
}
我在搭建测试环境的时候运行进入了这个。
运行 代理上的以下命令打印出存储的记录,正如人们所期望的那样:
bin/kafka-console-consumer.sh --bootstrap-server="localhost:9092" --topic="foo" --from-beginning
原来是Kafka服务器配置错误。从外部连接
IP 地址 listeners
必须在 kafka/config/server.properties
中具有有效值,例如
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092