Spring Kafka 与 Avro 解串器
Spring Kafka with Avro Deserializer
我正在尝试将 Spring Kafka 与 Confluent 模式注册表和 Kafka Avro Deserializer 一起使用。
使用 gradle 和 .avsc 我生成了 avro classes。使用生成的 class 我正在发送通用记录并使用它。
我在 kafka 侦听器中遇到以下错误:
Error while processing: ConsumerRecord(topic = topi_name, partition = 2, offset = 149, CreateTime = 1592288763784, serialized key size = 16, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = event_test, value = {"eventType": "test", "EventDataRequest": {"user": "54321", "panId": "1234", "empId": "5"}})
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void className(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, org.apache.avro.generic.GenericRecord>) throws com.test.kafka.exception.ConsumerException,org.apache.xmlbeans.XmlException,java.io.IOException,java.lang.ClassNotFoundException' threw exception; nested exception is java.lang.ClassCastException: com.test.MyPojo cannot be cast to com.test.MyPojo; nested exception is java.lang.ClassCastException: com.test.MyPojo cannot be cast to com.test.MyPojo
消费者配置
@Bean
@DependsOn("consumerFactory")
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory(@Qualifier("consumerFactory") ConsumerFactory<String, GenericRecord> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean(name = "consumerFactory")
public ConsumerFactory<String, GenericRecord> consumerFactory() {
Map<String, Object> config = new HashMap<>(kafkaProperties.getConsumer().buildProperties());
config.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
return new DefaultKafkaConsumerFactory<>(config);
}
Kafka 监听器
@KafkaListener(topics = "${topic}",groupId = "${group-id}",containerFactory = "kafkaListenerContainerFactory")
public void avroConsumer(ConsumerRecord<String, GenericRecord> record){
System.out.printf("Listener value = %s%n", (GeneratedAvroPojoClass)record.value());**//here it throws class cast exception**
}
生产者配置
@Bean(name = "customProducerFactory")
public ProducerFactory<String, GenericRecord> customProducerFactory() {
Map<String, Object> config = new HashMap<>(kafkaProperties.getProducer().buildProperties());
config.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
return new DefaultKafkaProducerFactory<>(config);
}
@Bean(name = "kafkaTemplate")
@DependsOn("customProducerFactory")
public KafkaTemplate<String, GenericRecord> kafkaTemplate(@Qualifier("customProducerFactory") ProducerFactory<String, GenericRecord> customProducerFactory){
return new KafkaTemplate<>(customProducerFactory, true);
}
YML 属性
custom:
kafka:
topic: topic_name
bootstrap-servers: ******
producer:
acks: all
client-id: client_id
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: *****
auto.register.schema: true
value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
consumer:
enable-auto-commit: true
auto-offset-reset: earliest
group-id: group_id_consumer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: ******
specific.avro.reader: true
value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
记录消费者配置值
ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = ******
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
group.id = ********
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
security.protocol = PLAINTEXT
value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
KafkaAvroDeserializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
specific.avro.reader = true
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
schema.registry.url = [*****]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
记录生产者配置值
ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [*******]
buffer.memory = 33554432
client.id = client_id
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class io.confluent.kafka.serializers.KafkaAvroSerializer
KafkaAvroSerializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
schema.registry.url = [*******]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
com.test.MyPojo cannot be cast to com.test.MyPojo
这通常意味着存在 class 加载器问题 - 反序列化器是使用与 @KafkaListener
方法不同的 class 加载器实例化的。
你需要弄清楚为什么;仅凭静态信息是不可能有人回答的。
编辑:
确保在消费者属性中 specific.avro.reader = true
几天前,我在 Spring Kafka 应用程序中使用 @KafkaListener
注释使用 Avro 消息时遇到了相同的 class 加载问题。
找到我的应用程序的根本原因并不容易,因为我只有在 debug-mode 从我的 IDE 启动应用程序时才发现这个问题,否则 运行 成功。
在一些内部 Spring 消息传递框架处理程序中进行一些调试后,我发现我的 Avro classes 被加载到一个名为“RestartClassModule”的模块中,并且当然 classes 的原始引用并不相同。
所以我从我的 pom.xml 文件中删除了这个依赖项:
<!-- hot reload - press Ctrl+F9 in IntelliJ after a code change while application is running -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
现在一切正常!
HTH.
我正在尝试将 Spring Kafka 与 Confluent 模式注册表和 Kafka Avro Deserializer 一起使用。 使用 gradle 和 .avsc 我生成了 avro classes。使用生成的 class 我正在发送通用记录并使用它。 我在 kafka 侦听器中遇到以下错误:
Error while processing: ConsumerRecord(topic = topi_name, partition = 2, offset = 149, CreateTime = 1592288763784, serialized key size = 16, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = event_test, value = {"eventType": "test", "EventDataRequest": {"user": "54321", "panId": "1234", "empId": "5"}})
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void className(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, org.apache.avro.generic.GenericRecord>) throws com.test.kafka.exception.ConsumerException,org.apache.xmlbeans.XmlException,java.io.IOException,java.lang.ClassNotFoundException' threw exception; nested exception is java.lang.ClassCastException: com.test.MyPojo cannot be cast to com.test.MyPojo; nested exception is java.lang.ClassCastException: com.test.MyPojo cannot be cast to com.test.MyPojo
消费者配置
@Bean
@DependsOn("consumerFactory")
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory(@Qualifier("consumerFactory") ConsumerFactory<String, GenericRecord> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean(name = "consumerFactory")
public ConsumerFactory<String, GenericRecord> consumerFactory() {
Map<String, Object> config = new HashMap<>(kafkaProperties.getConsumer().buildProperties());
config.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
return new DefaultKafkaConsumerFactory<>(config);
}
Kafka 监听器
@KafkaListener(topics = "${topic}",groupId = "${group-id}",containerFactory = "kafkaListenerContainerFactory")
public void avroConsumer(ConsumerRecord<String, GenericRecord> record){
System.out.printf("Listener value = %s%n", (GeneratedAvroPojoClass)record.value());**//here it throws class cast exception**
}
生产者配置
@Bean(name = "customProducerFactory")
public ProducerFactory<String, GenericRecord> customProducerFactory() {
Map<String, Object> config = new HashMap<>(kafkaProperties.getProducer().buildProperties());
config.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
return new DefaultKafkaProducerFactory<>(config);
}
@Bean(name = "kafkaTemplate")
@DependsOn("customProducerFactory")
public KafkaTemplate<String, GenericRecord> kafkaTemplate(@Qualifier("customProducerFactory") ProducerFactory<String, GenericRecord> customProducerFactory){
return new KafkaTemplate<>(customProducerFactory, true);
}
YML 属性
custom:
kafka:
topic: topic_name
bootstrap-servers: ******
producer:
acks: all
client-id: client_id
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: *****
auto.register.schema: true
value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
consumer:
enable-auto-commit: true
auto-offset-reset: earliest
group-id: group_id_consumer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: ******
specific.avro.reader: true
value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
记录消费者配置值
ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = ******
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
group.id = ********
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
security.protocol = PLAINTEXT
value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
KafkaAvroDeserializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
specific.avro.reader = true
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
schema.registry.url = [*****]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
记录生产者配置值
ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [*******]
buffer.memory = 33554432
client.id = client_id
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class io.confluent.kafka.serializers.KafkaAvroSerializer
KafkaAvroSerializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
schema.registry.url = [*******]
basic.auth.user.info = [hidden]
proxy.host =
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
com.test.MyPojo cannot be cast to com.test.MyPojo
这通常意味着存在 class 加载器问题 - 反序列化器是使用与 @KafkaListener
方法不同的 class 加载器实例化的。
你需要弄清楚为什么;仅凭静态信息是不可能有人回答的。
编辑:
确保在消费者属性中 specific.avro.reader = true
几天前,我在 Spring Kafka 应用程序中使用 @KafkaListener
注释使用 Avro 消息时遇到了相同的 class 加载问题。
找到我的应用程序的根本原因并不容易,因为我只有在 debug-mode 从我的 IDE 启动应用程序时才发现这个问题,否则 运行 成功。
在一些内部 Spring 消息传递框架处理程序中进行一些调试后,我发现我的 Avro classes 被加载到一个名为“RestartClassModule”的模块中,并且当然 classes 的原始引用并不相同。
所以我从我的 pom.xml 文件中删除了这个依赖项:
<!-- hot reload - press Ctrl+F9 in IntelliJ after a code change while application is running -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
现在一切正常!
HTH.