读取字段 'topics' 时出错:Kafka 中的 java.nio.BufferUnderflowException
Error reading field 'topics': java.nio.BufferUnderflowException in Kafka
9.0 客户端消费来自远程 运行ning 的两个代理的消息 system.My 生产者工作正常,能够向代理发送消息,但我的消费者无法消费这些 messages.Consumer 和生产者在我的本地系统上 运行ning 并且两个经纪人在 aws 上。
每当我尝试 运行 消费时。代理日志中出现以下错误。
ERROR Closing socket for /122.172.17.81 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
at kafka.api.RequestKeys$$anonfun.apply(RequestKeys.scala:50)
at kafka.api.RequestKeys$$anonfun.apply(RequestKeys.scala:50)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
at kafka.network.Processor.read(SocketServer.scala:450)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
我的Consumer代码如下>
package Kafka1.K1;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
public class HelloKafkaConsumer
{
public static void main(String args[]) throws InterruptedException, ExecutionException {
String a[] = new String[]{"loader1"};
//topik.add("loader1");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP1:9092,IP2:9093");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put("heartbeat.interval.ms", "500");
props.put("session.timeout.ms", "1000");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "10000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(a));
while (true) {
// Poll for ConsumerRecords for a certain amount of time
ConsumerRecords<String, String> records = consumer.poll(1000);
// Process the ConsumerRecords, if any, that came back
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
System.out.println(key+":"+value);
// Do something with message
}
}
}
}
有人可以帮忙吗?
当您机器上的 kafka 集群 运行 是旧版本即 0 时会出现此问题。8.x.x 而用于从集群访问数据的客户端是更高版本即 0。 9.x.x.
根据需求有两种简单的解决方案:
- 降级客户端版本。
- 升级kafka集群
9.0 客户端消费来自远程 运行ning 的两个代理的消息 system.My 生产者工作正常,能够向代理发送消息,但我的消费者无法消费这些 messages.Consumer 和生产者在我的本地系统上 运行ning 并且两个经纪人在 aws 上。 每当我尝试 运行 消费时。代理日志中出现以下错误。
ERROR Closing socket for /122.172.17.81 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
at kafka.api.RequestKeys$$anonfun.apply(RequestKeys.scala:50)
at kafka.api.RequestKeys$$anonfun.apply(RequestKeys.scala:50)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
at kafka.network.Processor.read(SocketServer.scala:450)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
我的Consumer代码如下>
package Kafka1.K1;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
public class HelloKafkaConsumer
{
public static void main(String args[]) throws InterruptedException, ExecutionException {
String a[] = new String[]{"loader1"};
//topik.add("loader1");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP1:9092,IP2:9093");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put("heartbeat.interval.ms", "500");
props.put("session.timeout.ms", "1000");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "10000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(a));
while (true) {
// Poll for ConsumerRecords for a certain amount of time
ConsumerRecords<String, String> records = consumer.poll(1000);
// Process the ConsumerRecords, if any, that came back
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
System.out.println(key+":"+value);
// Do something with message
}
}
}
}
有人可以帮忙吗?
当您机器上的 kafka 集群 运行 是旧版本即 0 时会出现此问题。8.x.x 而用于从集群访问数据的客户端是更高版本即 0。 9.x.x.
根据需求有两种简单的解决方案:
- 降级客户端版本。
- 升级kafka集群