Kafka Consumer Poll 无限期运行并且 return 没有任何结果

Kafka Consumer Poll runs indefinitely and doesn't return anything

我在 KafkaConsumer.poll(持续时间超时)方面遇到困难,它无限期地运行并且永远不会退出该方法。了解这可能与连接有关,有时我发现它有点不一致。如果 poll 停止响应,我该如何处理?下面给出的是 KafkaConsumer.poll()

的片段
public ConsumerRecords<K, V> poll(final Duration timeout) {
    return poll(time.timer(timeout), true);
}

我从这里调用上面的内容:

Duration timeout = Duration.ofSeconds(30);
    while (true) {
        final ConsumerRecords<recordID, topicName> records = consumer.poll(timeout);
        System.out.println("record count is" + records.count());
}

我收到以下错误:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition at offset 2. If needed, please seek past the record to continue consumption.

我在尝试解决上述问题时偶然发现了一些有用的信息。我将提供应该能够处理此问题的代码段,但在此之前了解导致此问题的原因很重要。

在向 Apache Kafka 生成或使用消息或数据时,我们需要该消息或数据的模式结构,在我的例子中是 Avro 模式。如果向 Kafka 生成的消息与该消息模式发生冲突,则会对消费产生影响。

在您的消费者主题中使用记录的方法中添加以下代码 --

记得导入以下包:

import org.apache.kafka.common.TopicPartition;
import org.jsoup.SerializationException;

try {
        while (true) {
            ConsumerRecords<String, GenericRecord> records = null;
            try {
                records = consumer.poll(10000);
            } catch (SerializationException e) {
                String s = e.getMessage().split("Error deserializing key/value 
for partition ")[1].split(". If needed, please seek past the record to 
continue consumption.")[0];
                String topics = s.split("-")[0];
                int offset = Integer.valueOf(s.split("offset ")[1]);
                int partition = Integer.valueOf(s.split("-")[1].split(" at") . 
   [0]);

                TopicPartition topicPartition = new TopicPartition(topics, 
 partition);
                //log.info("Skipping " + topic + "-" + partition + " offset " 
 + offset);
                consumer.seek(topicPartition, offset + 1);
            }


            for (ConsumerRecord<String, GenericRecord> record : records) {

                System.out.printf("value = %s \n", record.value());


            }

        }


    } finally {
        consumer.close();
    }

我在搭建测试环境的时候运行进入了这个。

运行 代理上的以下命令打印出存储的记录,正如人们所期望的那样:

bin/kafka-console-consumer.sh --bootstrap-server="localhost:9092" --topic="foo" --from-beginning

原来是Kafka服务器配置错误。从外部连接 IP 地址 listeners 必须在 kafka/config/server.properties 中具有有效值,例如

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092