读取字段 'topics' 时出错:Kafka 中的 java.nio.BufferUnderflowException

Error reading field 'topics': java.nio.BufferUnderflowException in Kafka

9.0 客户端消费来自远程 运行ning 的两个代理的消息 system.My 生产者工作正常,能够向代理发送消息,但我的消费者无法消费这些 messages.Consumer 和生产者在我的本地系统上 运行ning 并且两个经纪人在 aws 上。 每当我尝试 运行 消费时。代理日志中出现以下错误。

ERROR Closing socket for /122.172.17.81 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
        at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
        at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
        at kafka.api.RequestKeys$$anonfun.apply(RequestKeys.scala:50)
        at kafka.api.RequestKeys$$anonfun.apply(RequestKeys.scala:50)
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
        at kafka.network.Processor.read(SocketServer.scala:450)
        at kafka.network.Processor.run(SocketServer.scala:340)
        at java.lang.Thread.run(Thread.java:745)

我的Consumer代码如下>

package Kafka1.K1;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

public class HelloKafkaConsumer 
{
    public static void main(String args[]) throws InterruptedException, ExecutionException {
        String a[] = new String[]{"loader1"};
        //topik.add("loader1");
Properties props = new Properties();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP1:9092,IP2:9093");
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
 props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
 props.put("heartbeat.interval.ms", "500");
 props.put("session.timeout.ms", "1000");
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "10000");
 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
 consumer.subscribe(Arrays.asList(a));
 while (true) {
        // Poll for ConsumerRecords for a certain amount of time
        ConsumerRecords<String, String> records = consumer.poll(1000);

        // Process the ConsumerRecords, if any, that came back
        for (ConsumerRecord<String, String> record : records) {
                String key = record.key();
                String value = record.value();
                System.out.println(key+":"+value);
                // Do something with message
        }
      }

    }
}

有人可以帮忙吗?

当您机器上的 kafka 集群 运行 是旧版本即 0 时会出现此问题。8.x.x 而用于从集群访问数据的客户端是更高版本即 0。 9.x.x.

根据需求有两种简单的解决方案:

  1. 降级客户端版本。
  2. 升级kafka集群