在 Kafka 中读取字段 'topic_metadata' 时出错
Error reading field 'topic_metadata' in Kafka
我正在尝试在我的 server.properties 文件中使用 auto.create.topics.enable=true 连接到我在 aws 上的经纪人。但是当我尝试使用 Java 客户端生产者连接到代理时,我得到以下 error
.
1197 [kafka-producer-network-thread | producer-1] ERROR
org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
kafka producer I/O thread:
org.apache.kafka.common.protocol.types.SchemaException: Error reading
field 'topic_metadata': Error reading array of size 619631, only 37
bytes available at
org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73) at
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
at java.lang.Thread.run(Unknown Source)
以下是我的客户端生产者代码。
public static void main(String[] argv){
Properties props = new Properties();
props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 0);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("block.on.buffer.full",true);
Producer<String, String> producer = new KafkaProducer<String, String>(props);
try{ for(int i = 0; i < 10; i++)
{ producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i)));
System.out.println("Tried sending:"+i);}
}
catch (Exception e){
e.printStackTrace();
}
producer.close();
}
谁能帮我解决这个问题?
看起来我在客户端设置了错误的属性,我的 server.properties 文件也有一些不适用于客户端的属性我是 using.So 我决定更改 java客户端使用 maven 到 0.9.0 版。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
</dependency>
我的server.properties文件如下。
broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=9000
delete.topic.enable=true
advertised.host.name=<aws public Ip>
advertised.port=9092
我的生产者代码看起来像
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class HelloKafkaProducer
{
public static void main(String args[]) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);
boolean sync = false;
String topic="loader1";
String key = "mykey";
for(int i=0;i<1000;i++)
{
String value = "myvaluehasbeensent"+i+i;
ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(topic, key, value);
if (sync) {
producer.send(producerRecord).get();
} else {
producer.send(producerRecord);
}
}
producer.close();
}
}
我通过编辑解决了这个问题
/etc/hosts file
检查你的主机文件,如果zookeeper或其他代理的ip不在这个文件中。
我遇到过类似的问题。这里的问题是,当 pom 文件中的 kafka 客户端版本与 kafka 服务器不同时不匹配。
我使用的是 kafka 客户端 0.10.0.0_1 但 kafka 服务器仍然是 0.9.0.0。所以我将kafka服务器版本升级到10,问题得到解决。
<dependency>
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
<version>0.10.0.0_1</version>
</dependency>
确保您使用正确的版本。假设您使用以下 Maven 依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
所以工件等于:flink-connector-kafka-0.8_2.10
现在检查您是否使用了正确的 Kafka 版本:
cd /KAFKA_HOME/libs
现在找到kafka_YOUR-VERSION-sources.jar.
在我的例子中,我有 kafka_2.10-0.8.2.1-sources.jar。所以它工作正常! :)
如果您使用不同的版本,只需更改 maven 依赖项或下载正确的 kafka 版本。
我正在尝试在我的 server.properties 文件中使用 auto.create.topics.enable=true 连接到我在 aws 上的经纪人。但是当我尝试使用 Java 客户端生产者连接到代理时,我得到以下 error
.
1197 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topic_metadata': Error reading array of size 619631, only 37 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73) at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) at java.lang.Thread.run(Unknown Source)
以下是我的客户端生产者代码。
public static void main(String[] argv){
Properties props = new Properties();
props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 0);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("block.on.buffer.full",true);
Producer<String, String> producer = new KafkaProducer<String, String>(props);
try{ for(int i = 0; i < 10; i++)
{ producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i)));
System.out.println("Tried sending:"+i);}
}
catch (Exception e){
e.printStackTrace();
}
producer.close();
}
谁能帮我解决这个问题?
看起来我在客户端设置了错误的属性,我的 server.properties 文件也有一些不适用于客户端的属性我是 using.So 我决定更改 java客户端使用 maven 到 0.9.0 版。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
</dependency>
我的server.properties文件如下。
broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=9000
delete.topic.enable=true
advertised.host.name=<aws public Ip>
advertised.port=9092
我的生产者代码看起来像
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class HelloKafkaProducer
{
public static void main(String args[]) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);
boolean sync = false;
String topic="loader1";
String key = "mykey";
for(int i=0;i<1000;i++)
{
String value = "myvaluehasbeensent"+i+i;
ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(topic, key, value);
if (sync) {
producer.send(producerRecord).get();
} else {
producer.send(producerRecord);
}
}
producer.close();
}
}
我通过编辑解决了这个问题
/etc/hosts file
检查你的主机文件,如果zookeeper或其他代理的ip不在这个文件中。
我遇到过类似的问题。这里的问题是,当 pom 文件中的 kafka 客户端版本与 kafka 服务器不同时不匹配。 我使用的是 kafka 客户端 0.10.0.0_1 但 kafka 服务器仍然是 0.9.0.0。所以我将kafka服务器版本升级到10,问题得到解决。
<dependency>
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
<version>0.10.0.0_1</version>
</dependency>
确保您使用正确的版本。假设您使用以下 Maven 依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
所以工件等于:flink-connector-kafka-0.8_2.10
现在检查您是否使用了正确的 Kafka 版本:
cd /KAFKA_HOME/libs
现在找到kafka_YOUR-VERSION-sources.jar.
在我的例子中,我有 kafka_2.10-0.8.2.1-sources.jar。所以它工作正常! :) 如果您使用不同的版本,只需更改 maven 依赖项或下载正确的 kafka 版本。