读取字段 'topic_metadata' 时出错:读取大小为 1139567 的数组时出错,只有 45 个字节可用
Error reading field 'topic_metadata': Error reading array of size 1139567, only 45 bytes available
--消费者
Properties props = new Properties();
String groupId = "consumer-tutorial-group";
List<String> topics = Arrays.asList("consumer-tutorial");
props.put("bootstrap.servers", "192.168.1.75:9092");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
} catch (Exception e) {
System.out.println(e.toString());
} finally {
consumer.close();
}
}
我正在尝试编写 运行 上面的代码,它是一个简单的消费者代码,它试图从一个主题中读取,但我遇到了一个奇怪的异常,我无法处理它。
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topic_metadata': Error reading array of size 1139567, only 45 bytes available
我也引用你我的生产者代码
--制片人
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.7:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("consumer-tutorial", Integer.toString(i), Integer.toString(i)));
producer.close();
这是kafka配置
--启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
--启动Kafka服务器
bin/kafka-server-start.sh config/server.properties
-- 创建话题
bin/kafka-topics.sh --create --topic consumer-tutorial --replication-factor 1 --partitions 3 --zookeeper 192.168.1.75:2181
--卡夫卡0.10.0
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
我通过降级到 kafka 0.9.0 解决了我的问题,但它对我来说仍然不是一个有效的解决方案。如果有人知道如何在 kafka 0.10.0 版本中修复此问题的有效方法,请随时 post 它。在那之前这是我的解决方案
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
</dependency>
我在使用版本为 0.10.0.0 的 kafka_2.11 工件时也遇到了同样的问题。但是一旦我将 kafka 服务器更改为 0.10.0.0,这个问题就解决了。之前我指的是 0.9.0.1。看起来服务器和您的 pom 版本应该是同步的。
我在使用 Kafka 服务器 9.0.0 和 Kafka 客户端 10.0 时遇到了相同的 issue.Client jar 兼容性问题。0.Basically Kafka 0.10.0 引入了一种新的消息格式并且无法读取旧版本的主题元数据。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.0.RELEASE</version> <!-- changed due lower version of the kafka server -->
</dependency>
--消费者
Properties props = new Properties();
String groupId = "consumer-tutorial-group";
List<String> topics = Arrays.asList("consumer-tutorial");
props.put("bootstrap.servers", "192.168.1.75:9092");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
} catch (Exception e) {
System.out.println(e.toString());
} finally {
consumer.close();
}
}
我正在尝试编写 运行 上面的代码,它是一个简单的消费者代码,它试图从一个主题中读取,但我遇到了一个奇怪的异常,我无法处理它。
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topic_metadata': Error reading array of size 1139567, only 45 bytes available
我也引用你我的生产者代码
--制片人
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.7:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("consumer-tutorial", Integer.toString(i), Integer.toString(i)));
producer.close();
这是kafka配置
--启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
--启动Kafka服务器
bin/kafka-server-start.sh config/server.properties
-- 创建话题
bin/kafka-topics.sh --create --topic consumer-tutorial --replication-factor 1 --partitions 3 --zookeeper 192.168.1.75:2181
--卡夫卡0.10.0
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
我通过降级到 kafka 0.9.0 解决了我的问题,但它对我来说仍然不是一个有效的解决方案。如果有人知道如何在 kafka 0.10.0 版本中修复此问题的有效方法,请随时 post 它。在那之前这是我的解决方案
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
</dependency>
我在使用版本为 0.10.0.0 的 kafka_2.11 工件时也遇到了同样的问题。但是一旦我将 kafka 服务器更改为 0.10.0.0,这个问题就解决了。之前我指的是 0.9.0.1。看起来服务器和您的 pom 版本应该是同步的。
我在使用 Kafka 服务器 9.0.0 和 Kafka 客户端 10.0 时遇到了相同的 issue.Client jar 兼容性问题。0.Basically Kafka 0.10.0 引入了一种新的消息格式并且无法读取旧版本的主题元数据。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.0.RELEASE</version> <!-- changed due lower version of the kafka server -->
</dependency>