"Error registering Avro schema" 尝试将数据流式传输到 Kafka 时
"Error registering Avro schema" when trying to stream data to Kafka
我正在尝试重现 Confluent's official documentation 中的序列化程序示例,并将 avro 格式的数据流式传输到 kafka 主题。
代码如下:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
KafkaProducer producer = new KafkaProducer(props);
String key = "key1";
String userSchema = "{\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");
ProducerRecord<Object, Object> record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
producer.send(record);
} catch(Exception e) {
e.printStackTrace();
}
但是 producer.send(record);
导致以下错误:
org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "string"
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register schema operation failed while writing to the Kafka store; error code: 50001
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:299)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:294)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:61)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:100)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:775)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:648)
at com.giorgos.currencies.TestFX.main(TestFX.java:134)
注意我改了
ProducerRecord<Object, Object> record = new ProducerRecord<>("topic1", key, avroRecord);
到
ProducerRecord<Object, Object> record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
为了处理diamond operator is not supported in -source 1.5
错误。
pom.xml
文件的内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.giorgos.currencies</groupId>
<artifactId>giorgos-fx_currencies</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>george-fx_currencies</name>
<url>http://maven.apache.org</url>
<repositories>
<repository>
<id>apache-repo</id>
<name>Apache Repository</name>
<url>https://repository.apache.org/content/repositories/releases</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<properties>
<kafka.version>0.8.2.1</kafka.version>
<kafka.scala.version>2.10</kafka.scala.version>
<confluent.version>4.0.0</confluent.version>
<avro.version>1.7.6</avro.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<finalName>uber-${project.artifactId}-${project.version}</finalName>
</configuration>
</plugin>
</plugins>
</build>
</project>
另请注意,mvn package
会导致构建成功且未报告任何错误。 topic1
也创建成功,但是由于生产者端流数据失败,消费者没有数据出现。
我用
java -cp target/uber-giorgos-fx_currencies-1.0-SNAPSHOT.jar com.giorgos.currencies.TestFX
到运行代码
我猜你的问题是你的消息键是一个字符串而不是一个 avro 对象。尝试以下生产者属性:
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
生产者记录到
ProducerRecord<String, Object> record = new ProducerRecord<String, Object>("topic1", key, avroRecord);
希望对您有所帮助
我正在尝试重现 Confluent's official documentation 中的序列化程序示例,并将 avro 格式的数据流式传输到 kafka 主题。
代码如下:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
KafkaProducer producer = new KafkaProducer(props);
String key = "key1";
String userSchema = "{\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");
ProducerRecord<Object, Object> record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
producer.send(record);
} catch(Exception e) {
e.printStackTrace();
}
但是 producer.send(record);
导致以下错误:
org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "string"
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register schema operation failed while writing to the Kafka store; error code: 50001
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:299)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:294)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:61)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:100)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:775)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:648)
at com.giorgos.currencies.TestFX.main(TestFX.java:134)
注意我改了
ProducerRecord<Object, Object> record = new ProducerRecord<>("topic1", key, avroRecord);
到
ProducerRecord<Object, Object> record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
为了处理diamond operator is not supported in -source 1.5
错误。
pom.xml
文件的内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.giorgos.currencies</groupId>
<artifactId>giorgos-fx_currencies</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>george-fx_currencies</name>
<url>http://maven.apache.org</url>
<repositories>
<repository>
<id>apache-repo</id>
<name>Apache Repository</name>
<url>https://repository.apache.org/content/repositories/releases</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<properties>
<kafka.version>0.8.2.1</kafka.version>
<kafka.scala.version>2.10</kafka.scala.version>
<confluent.version>4.0.0</confluent.version>
<avro.version>1.7.6</avro.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<finalName>uber-${project.artifactId}-${project.version}</finalName>
</configuration>
</plugin>
</plugins>
</build>
</project>
另请注意,mvn package
会导致构建成功且未报告任何错误。 topic1
也创建成功,但是由于生产者端流数据失败,消费者没有数据出现。
我用
java -cp target/uber-giorgos-fx_currencies-1.0-SNAPSHOT.jar com.giorgos.currencies.TestFX
到运行代码
我猜你的问题是你的消息键是一个字符串而不是一个 avro 对象。尝试以下生产者属性:
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
生产者记录到
ProducerRecord<String, Object> record = new ProducerRecord<String, Object>("topic1", key, avroRecord);
希望对您有所帮助