Spring 生成的 kafka AVRO class 未找到
Spring kafka AVRO generated class not found
我正在使用 confluent JDBC 连接器连接到 postgres 数据库以检索更改并将它们放入 kafka 主题中。现在我想使用 spring 引导使用者使用这些消息。这些消息采用 AVRO 格式。我有来自连接器的架构,并使用 avro maven 插件为其生成了一个 POJO class。
但是监听器启动的时候只有下面的错误
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.2.jar:2.7.2]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.2.jar:2.7.2]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) ~[spring-kafka-2.7.2.jar:2.7.2]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.2.jar:2.7.2]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ps_git_repo-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class ps_git_repo specified in writer's schema whilst finding reader's schema for a SpecificRecord.
当我不使用 avro 反序列化数据时,我将收到数据但无法读取。
在 pom.xml 我有以下依赖项
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>6.2.0</version>
<exclusions>
<exclusion>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
</exclusion>
</exclusions>
</dependency>
并且在 application.properties 中我添加了反序列化器和模式注册表 url。
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.bootstrap-servers = http://localhost:9092
spring.kafka.consumer.properties.specific.avro.reader = true
spring.kafka.consumer.properties.schema.registry.url = http://localhost:8081
在构建中,我使用 avro maven 插件从连接器创建的模式生成 POJO。
pom.xml
中的插件
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.10.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
我已将以下架构放入文件夹并使用 mvn generate-sources
生成到 pojo
Schema.avsc
{
"connect.name": "ps_git_repo",
"fields": [
{
"name": "id",
"type": "long"
},
{
"default": null,
"name": "name",
"type": [
"null",
"string"
]
}
],
"name": "ps_git_repo",
"namespace": "com.company.api.kafkademo",
"type": "record"
}
我得到了正确的 ps_git_repo.java class 然后我有这个侦听器来检索消息。
@SpringBootApplication
@EnableKafka
public class KafkaDemoApplication {
@KafkaListener(groupId = "test123", topics = "ps_git_repo_test")
public void handleMessage(ps_git_repo message) {
System.out.println(message);
}
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
找不到架构。
有人知道哪里出了问题吗?
反序列化器使用 connect.name 字段而不是命名空间来找到正确的 class。
我已将以下行添加到 JDBC_connector 的配置中,让连接器生成正确的命名空间
"transforms":"AddNamespace",
"transforms.AddNamespace.type":"org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.AddNamespace.schema.name": "com.company.api.kafkademo.ps_git_repo"
我正在使用 confluent JDBC 连接器连接到 postgres 数据库以检索更改并将它们放入 kafka 主题中。现在我想使用 spring 引导使用者使用这些消息。这些消息采用 AVRO 格式。我有来自连接器的架构,并使用 avro maven 插件为其生成了一个 POJO class。
但是监听器启动的时候只有下面的错误
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.2.jar:2.7.2]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.2.jar:2.7.2]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) ~[spring-kafka-2.7.2.jar:2.7.2]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.2.jar:2.7.2]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ps_git_repo-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class ps_git_repo specified in writer's schema whilst finding reader's schema for a SpecificRecord.
当我不使用 avro 反序列化数据时,我将收到数据但无法读取。
在 pom.xml 我有以下依赖项
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>6.2.0</version>
<exclusions>
<exclusion>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
</exclusion>
</exclusions>
</dependency>
并且在 application.properties 中我添加了反序列化器和模式注册表 url。
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.bootstrap-servers = http://localhost:9092
spring.kafka.consumer.properties.specific.avro.reader = true
spring.kafka.consumer.properties.schema.registry.url = http://localhost:8081
在构建中,我使用 avro maven 插件从连接器创建的模式生成 POJO。
pom.xml
中的插件
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.10.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
我已将以下架构放入文件夹并使用 mvn generate-sources
Schema.avsc
{
"connect.name": "ps_git_repo",
"fields": [
{
"name": "id",
"type": "long"
},
{
"default": null,
"name": "name",
"type": [
"null",
"string"
]
}
],
"name": "ps_git_repo",
"namespace": "com.company.api.kafkademo",
"type": "record"
}
我得到了正确的 ps_git_repo.java class 然后我有这个侦听器来检索消息。
@SpringBootApplication
@EnableKafka
public class KafkaDemoApplication {
@KafkaListener(groupId = "test123", topics = "ps_git_repo_test")
public void handleMessage(ps_git_repo message) {
System.out.println(message);
}
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
找不到架构。
有人知道哪里出了问题吗?
反序列化器使用 connect.name 字段而不是命名空间来找到正确的 class。
我已将以下行添加到 JDBC_connector 的配置中,让连接器生成正确的命名空间
"transforms":"AddNamespace",
"transforms.AddNamespace.type":"org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.AddNamespace.schema.name": "com.company.api.kafkademo.ps_git_repo"