Avro class 无法转换为 class org.springframework.messaging.Message
Avro class cannot be cast to class org.springframework.messaging.Message
我正在使用来自 Debezium 的 Avro 数据
我做了kafka消费者如下:
JavaPOJO
import lombok.Data;
@Data
public class Shop {
Long shopId;
Double latitude, longitude;
String name;
String phoneNumber;
String placeId;
double rating;
String website;
int addressId;
String closingHours;
String email;
int maxAttendance;
String opening_hours;
String businessHours;
String closeDay;
String description;
boolean open;
String setWeekendBusinessHours;
Long userShopId;
}
Avro 消息格式
{
"type": "record",
"name": "ShopMessage",
"namespace": "com.example.kafka.avro",
"fields": [
{
"name": "SHOP_ID",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "LATITUDE",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "LONGITUDE",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "NAME",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "PHONENUMBER",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "PLACEID",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "RATING",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "WEBSITE",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ADDRESSID",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "CLOSINGHOUR",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "EMAIL",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "MAXATTENDANCE",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "OPENINGHOURS",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BUSINESSHOURS",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "CLOSEDAY",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "DESCRIPTION",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ISOPEN",
"type": [
"null",
"boolean"
],
"default": null
},
{
"name": "WEEKENDBUSINESSHOURS",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "USERSHOPID",
"type": [
"null",
"long"
],
"default": null
}
]
}
- ShopConsumer
@Component
public class ShopConsumer {
private final ShopMapper shopMapper;
private final Logger log = LogManager.getLogger(ShopConsumer.class);
public ShopConsumer(ShopMapper shopMapper) {
this.shopMapper = shopMapper;
}
@KafkaListener(
groupId = "${spring.kafka.consumer.group-id}",
topics = "${spring.kafka.consumer.topic}"
)
public void listen(List<Message<ShopMessage>> messages, Acknowledgment ack){
log.info("Received batch of messages with size: {}", messages.size());
List<Shop> shops = messages.stream()
.peek(this::logMessageReceived)
.map(message -> shopMapper.toChange(message.getPayload()))
.collect(Collectors.toList());
//do remove redis cache
ack.acknowledge();
}
private void logMessageReceived(Message<ShopMessage> message) {
log.info("Received shopId {} with a name of '{} and place id {}', partition={}, offset={}",
message.getPayload().getSHOPID(),
message.getPayload().getNAME(),
message.getPayload().getPLACEID(),
message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID),
message.getHeaders().get(KafkaHeaders.OFFSET));
}
- 消费者配置 - ShopConsumerConfig.java
@EnableKafka
@Configuration
public class ShopsConsumerConfig {
private final KafkaProperties kafkaProperties;
public ShopsConsumerConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ShopMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ShopMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(false);
factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());
return factory;
}
@Bean
public ConsumerFactory<String, ShopMessage> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroWithSchemaDeserializer.class);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaProperties.getProperties().get("schema-registry-url"));
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, ShopMessage.class);
return props;
}
}
- 模式反序列化器
public class SpecificAvroWithSchemaDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> {
public static final String AVRO_KEY_RECORD_TYPE = "avro.key.record.type";
public static final String AVRO_VALUE_RECORD_TYPE = "avro.value.record.type";
private Schema readerSchema;
public SpecificAvroWithSchemaDeserializer() { }
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.configure(new KafkaAvroDeserializerConfig(configs));
readerSchema = getSchema(getRecordClass(configs, isKey));
}
private Class<?> getRecordClass(Map<String, ?> configs, boolean isKey) {
String configsKey = isKey ? AVRO_KEY_RECORD_TYPE : AVRO_VALUE_RECORD_TYPE;
Object configsValue = configs.get(configsKey);
if (configsValue instanceof Class) {
return (Class<?>) configsValue;
} else if (configsValue instanceof String) {
String recordClassName = (String) configsValue;
try {
return Class.forName(recordClassName);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(String.format("Unable to find the class '%s'", recordClassName));
}
} else {
throw new IllegalArgumentException(
String.format("A class or a string must be informed into ConsumerConfig properties: '%s' and/or '%s'",
AVRO_KEY_RECORD_TYPE, AVRO_VALUE_RECORD_TYPE));
}
}
private Schema getSchema(Class<?> targetType) {
try {
Field field = targetType.getDeclaredField("SCHEMA$");
return (Schema) field.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new IllegalArgumentException(
String.format("Unable to get Avro Schema from the class '%s'", targetType.getName()), e);
}
}
@Override
public Object deserialize(String topic, byte[] bytes) {
return super.deserialize(bytes, readerSchema);
}
@Override
public void close() {
}
}
- 映射器Class
@Mapper(componentModel = "spring")
public interface ShopMapper {
default Shop toChange(ShopMessage shopMessage){
if(shopMessage == null){
return null;
}
Shop shop = new Shop();
shop.setDescription(shopMessage.getDESCRIPTION().toString());
shop.setMaxAttendance(shopMessage.getMAXATTENDANCE());
shop.setSetWeekendBusinessHours(shopMessage.getWEEKENDBUSINESSHOURS().toString());
shop.setOpen(shopMessage.getISOPEN());
shop.setWebsite(shopMessage.getWEBSITE().toString());
shop.setRating(shopMessage.getRATING());
shop.setLatitude(shopMessage.getLATITUDE());
shop.setLongitude(shopMessage.getLONGITUDE());
shop.setCloseDay(shopMessage.getCLOSEDAY().toString());
shop.setBusinessHours(shopMessage.getBUSINESSHOURS().toString());
shop.setPhoneNumber(shopMessage.getPHONENUMBER().toString());
shop.setEmail(shopMessage.getEMAIL().toString());
shop.setPlaceId(shopMessage.getPLACEID().toString());
return shop;
}
}
配置存在于 application.properties
文件中,但在消息使用期间,Spring 抛出
错误
Caused by: java.lang.ClassCastException: class com.example.kafka.avro.ShopMessage cannot be cast to class org.springframework.messaging.Message (com.example.kafka.avro.ShopMessage and org.springframework.messaging.Message are in unnamed module of loader 'app')
有人可以给我一个正确的方向来解决这个问题吗?看起来从 Avro 的 POJO 转换有问题,但我找不到根。
提前致谢。
更新
经过几次尝试,上述错误的问题似乎是由于从单个消息转换为消息列表造成的。我改变了监听函数如下。
public void listen(ConsumerRecord<Integer ,?> messages, Acknowledgment ack){
//log.info("Received batch of messages with size: {}", messages.size());
log.info(messages.key());
log.info(messages.value());
ack.acknowledge();
}
并从 Kafka 主题获取值。
{"before": {"id": 6, "latitude": 2.921318, "longitude": 101.655938, "name": "XYZ", "phone_number": "+12345678", "place_id": "P007", "rating": 5.0, "type": "Food", "website": "https://xyz.me", "address_id": 5, "closing_hours": null, "email": "info@xyz.me", "max_attendance": 11, "opening_hours": null, "business_hours": "09-18", "close_day": "Saturday", "description": "Some Dummy", "is_open": true, "weekend_business_hours": "08-12", "user_shop_id": 0}, "after": {"id": 6, "latitude": 2.921318, "longitude": 101.655938, "name": "XYZ - edited", "phone_number": "+12345678", "place_id": "P007", "rating": 5.0, "type": "Food 2", "website": "https://xyz.me", "address_id": 5, "closing_hours": null, "email": "info@xyz.me", "max_attendance": 11, "opening_hours": null, "business_hours": "09-18", "close_day": "Saturday", "description": "Some dummy", "is_open": true, "weekend_business_hours": "08-12", "user_shop_id": 0}, "source": {"version": "1.6.0.Final", "connector": "mysql", "name": "bookingdev_sqip_local", "ts_ms": 1629267837000, "snapshot": "false", "db": "booking", "sequence": null, "table": "shop", "server_id": 1, "gtid": null, "file": "mysql-bin.000044", "pos": 26432, "row": 0, "thread": null, "query": null}, "op": "u", "ts_ms": 1629267836453, "transaction": null}
除此之外,我还删除了自定义反序列化器和自定义 POJO,因为模式注册表中已经安装了模式。
现在问题仍然存在,我如何获取从模式注册表生成的 debezium 模式并将消息转换为正确的 Java POJO 以进一步执行?
更新 19.08.2021
经过与@OneCricketeer的讨论,我对Consumer的逻辑做了如下调整
public void listen(ConsumerRecord<Integer, GenericRecord> messages, Acknowledgment ack) throws JsonProcessingException {
log.info(messages.key());
log.info(messages.value());
Shop shop = new ObjectMapper().readValue(messages.value().get("after").toString(), Shop.class);
log.info("NEW VALUE #####-> " + shop.getName());
//other logic here.
ack.acknowledge();
}
但是我又遇到了一个错误:
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.0.jar:2.7.0]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition bookingdev_sqip_local.booking.shop-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class bookingdev_sqip_local.booking.shop.Key specified in writer's schema whilst finding reader's schema for a SpecificRecord.
检查了 Schema-Registry Debezium 创建的两个主题 - 一个用于键,一个用于值。
["bookingdev_sqip_local.booking.shop-value","bookingdev_sqip_local.booking.shop-key"]
似乎是由于无法映射键的架构而导致的错误。
异常是因为您的 Kafka 侦听器方法应该接收 List<ShopMessage>
而不是 List<Message<ShopMessage>>
。
尝试更改此行:
public void listen(List<Message<ShopMessage>> messages, Acknowledgment ack){
收件人:
public void listen(List<ShopMessage> messages, Acknowledgment ack){
还有shopMapper.toChange(message.getPayload())
到shopMapper.toChange(message)
在努力解决这个 Spring 启动 <-> Kafka 连接器 <-> Debezium CDC MySQL 之后,好的。我得到了一个有效的应用程序。
架构:
MySQL(Producer) <-> Debezium CDC Kafka Connect <-> Kafka <-> SpringBoot (Consumer)
我正在使用 Schema-Registry 来存储架构配置。
ShopConsumerConfig.java
@EnableKafka
@Configuration
public class ShopsConsumerConfig {
private final KafkaProperties kafkaProperties;
public ShopsConsumerConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Shop> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Shop> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(false);
factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());
return factory;
}
@Bean
public ConsumerFactory<String, Shop> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaProperties.getProperties().get("schema-registry-url"));
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); //this thing is nasty do not turn to true for this!
props.put(KafkaAvroDeserializerConfig.USE_LATEST_VERSION, true);
return props;
}
}
Shop.java
或 Java POJO
import lombok.Data;
@Data
public class Shop {
Long id;
Double latitude, longitude;
String name;
String phone_number;
String place_id;
String type;
double rating;
String website;
int address_id;
String closing_hours;
String email;
int max_attendance;
String opening_hours;
String business_hours;
String close_day;
String description;
String is_open;
String weekend_business_hours;
Long user_shop_id;
}
最后是消费者 ShopConsumer.java
@Component
public class ShopConsumer {
private final Logger log = LogManager.getLogger(ShopConsumer.class);
@KafkaListener(
groupId = "${spring.kafka.consumer.group-id}",
topics = "${spring.kafka.consumer.topic}"
)
public void listen(ConsumerRecord<?, GenericRecord> messages, Acknowledgment ack) throws JsonProcessingException {
//debugging purposes only TODO remove me
log.info(messages.key());
log.info(messages.value());
log.info(messages.value().getSchema().getField("after"));
//convert the message, obtain the "after" section to get the newly updated value and parse it to Java POJO (in this case Shop.java)
Shop shop = new ObjectMapper().readValue(messages.value().get("after").toString(), Shop.class);
//debugging purposes only.
log.info("NEW VALUE #####-> " + shop.getName());
//other logic goes here...
ack.acknowledge();
}
}
我希望这对正在努力了解如何使用 Debezium 消息的任何人有所帮助。
我正在使用来自 Debezium 的 Avro 数据
我做了kafka消费者如下:
JavaPOJO
import lombok.Data; @Data public class Shop { Long shopId; Double latitude, longitude; String name; String phoneNumber; String placeId; double rating; String website; int addressId; String closingHours; String email; int maxAttendance; String opening_hours; String businessHours; String closeDay; String description; boolean open; String setWeekendBusinessHours; Long userShopId; }
Avro 消息格式
{
"type": "record",
"name": "ShopMessage",
"namespace": "com.example.kafka.avro",
"fields": [
{
"name": "SHOP_ID",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "LATITUDE",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "LONGITUDE",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "NAME",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "PHONENUMBER",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "PLACEID",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "RATING",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "WEBSITE",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ADDRESSID",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "CLOSINGHOUR",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "EMAIL",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "MAXATTENDANCE",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "OPENINGHOURS",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BUSINESSHOURS",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "CLOSEDAY",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "DESCRIPTION",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ISOPEN",
"type": [
"null",
"boolean"
],
"default": null
},
{
"name": "WEEKENDBUSINESSHOURS",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "USERSHOPID",
"type": [
"null",
"long"
],
"default": null
}
]
}
- ShopConsumer
@Component
public class ShopConsumer {
private final ShopMapper shopMapper;
private final Logger log = LogManager.getLogger(ShopConsumer.class);
public ShopConsumer(ShopMapper shopMapper) {
this.shopMapper = shopMapper;
}
@KafkaListener(
groupId = "${spring.kafka.consumer.group-id}",
topics = "${spring.kafka.consumer.topic}"
)
public void listen(List<Message<ShopMessage>> messages, Acknowledgment ack){
log.info("Received batch of messages with size: {}", messages.size());
List<Shop> shops = messages.stream()
.peek(this::logMessageReceived)
.map(message -> shopMapper.toChange(message.getPayload()))
.collect(Collectors.toList());
//do remove redis cache
ack.acknowledge();
}
private void logMessageReceived(Message<ShopMessage> message) {
log.info("Received shopId {} with a name of '{} and place id {}', partition={}, offset={}",
message.getPayload().getSHOPID(),
message.getPayload().getNAME(),
message.getPayload().getPLACEID(),
message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID),
message.getHeaders().get(KafkaHeaders.OFFSET));
}
- 消费者配置 - ShopConsumerConfig.java
@EnableKafka
@Configuration
public class ShopsConsumerConfig {
private final KafkaProperties kafkaProperties;
public ShopsConsumerConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ShopMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ShopMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(false);
factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());
return factory;
}
@Bean
public ConsumerFactory<String, ShopMessage> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroWithSchemaDeserializer.class);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaProperties.getProperties().get("schema-registry-url"));
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, ShopMessage.class);
return props;
}
}
- 模式反序列化器
public class SpecificAvroWithSchemaDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> {
public static final String AVRO_KEY_RECORD_TYPE = "avro.key.record.type";
public static final String AVRO_VALUE_RECORD_TYPE = "avro.value.record.type";
private Schema readerSchema;
public SpecificAvroWithSchemaDeserializer() { }
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.configure(new KafkaAvroDeserializerConfig(configs));
readerSchema = getSchema(getRecordClass(configs, isKey));
}
private Class<?> getRecordClass(Map<String, ?> configs, boolean isKey) {
String configsKey = isKey ? AVRO_KEY_RECORD_TYPE : AVRO_VALUE_RECORD_TYPE;
Object configsValue = configs.get(configsKey);
if (configsValue instanceof Class) {
return (Class<?>) configsValue;
} else if (configsValue instanceof String) {
String recordClassName = (String) configsValue;
try {
return Class.forName(recordClassName);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(String.format("Unable to find the class '%s'", recordClassName));
}
} else {
throw new IllegalArgumentException(
String.format("A class or a string must be informed into ConsumerConfig properties: '%s' and/or '%s'",
AVRO_KEY_RECORD_TYPE, AVRO_VALUE_RECORD_TYPE));
}
}
private Schema getSchema(Class<?> targetType) {
try {
Field field = targetType.getDeclaredField("SCHEMA$");
return (Schema) field.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new IllegalArgumentException(
String.format("Unable to get Avro Schema from the class '%s'", targetType.getName()), e);
}
}
@Override
public Object deserialize(String topic, byte[] bytes) {
return super.deserialize(bytes, readerSchema);
}
@Override
public void close() {
}
}
- 映射器Class
@Mapper(componentModel = "spring")
public interface ShopMapper {
default Shop toChange(ShopMessage shopMessage){
if(shopMessage == null){
return null;
}
Shop shop = new Shop();
shop.setDescription(shopMessage.getDESCRIPTION().toString());
shop.setMaxAttendance(shopMessage.getMAXATTENDANCE());
shop.setSetWeekendBusinessHours(shopMessage.getWEEKENDBUSINESSHOURS().toString());
shop.setOpen(shopMessage.getISOPEN());
shop.setWebsite(shopMessage.getWEBSITE().toString());
shop.setRating(shopMessage.getRATING());
shop.setLatitude(shopMessage.getLATITUDE());
shop.setLongitude(shopMessage.getLONGITUDE());
shop.setCloseDay(shopMessage.getCLOSEDAY().toString());
shop.setBusinessHours(shopMessage.getBUSINESSHOURS().toString());
shop.setPhoneNumber(shopMessage.getPHONENUMBER().toString());
shop.setEmail(shopMessage.getEMAIL().toString());
shop.setPlaceId(shopMessage.getPLACEID().toString());
return shop;
}
}
配置存在于 application.properties
文件中,但在消息使用期间,Spring 抛出
Caused by: java.lang.ClassCastException: class com.example.kafka.avro.ShopMessage cannot be cast to class org.springframework.messaging.Message (com.example.kafka.avro.ShopMessage and org.springframework.messaging.Message are in unnamed module of loader 'app')
有人可以给我一个正确的方向来解决这个问题吗?看起来从 Avro 的 POJO 转换有问题,但我找不到根。
提前致谢。
更新
经过几次尝试,上述错误的问题似乎是由于从单个消息转换为消息列表造成的。我改变了监听函数如下。
public void listen(ConsumerRecord<Integer ,?> messages, Acknowledgment ack){
//log.info("Received batch of messages with size: {}", messages.size());
log.info(messages.key());
log.info(messages.value());
ack.acknowledge();
}
并从 Kafka 主题获取值。
{"before": {"id": 6, "latitude": 2.921318, "longitude": 101.655938, "name": "XYZ", "phone_number": "+12345678", "place_id": "P007", "rating": 5.0, "type": "Food", "website": "https://xyz.me", "address_id": 5, "closing_hours": null, "email": "info@xyz.me", "max_attendance": 11, "opening_hours": null, "business_hours": "09-18", "close_day": "Saturday", "description": "Some Dummy", "is_open": true, "weekend_business_hours": "08-12", "user_shop_id": 0}, "after": {"id": 6, "latitude": 2.921318, "longitude": 101.655938, "name": "XYZ - edited", "phone_number": "+12345678", "place_id": "P007", "rating": 5.0, "type": "Food 2", "website": "https://xyz.me", "address_id": 5, "closing_hours": null, "email": "info@xyz.me", "max_attendance": 11, "opening_hours": null, "business_hours": "09-18", "close_day": "Saturday", "description": "Some dummy", "is_open": true, "weekend_business_hours": "08-12", "user_shop_id": 0}, "source": {"version": "1.6.0.Final", "connector": "mysql", "name": "bookingdev_sqip_local", "ts_ms": 1629267837000, "snapshot": "false", "db": "booking", "sequence": null, "table": "shop", "server_id": 1, "gtid": null, "file": "mysql-bin.000044", "pos": 26432, "row": 0, "thread": null, "query": null}, "op": "u", "ts_ms": 1629267836453, "transaction": null}
除此之外,我还删除了自定义反序列化器和自定义 POJO,因为模式注册表中已经安装了模式。
现在问题仍然存在,我如何获取从模式注册表生成的 debezium 模式并将消息转换为正确的 Java POJO 以进一步执行?
更新 19.08.2021
经过与@OneCricketeer的讨论,我对Consumer的逻辑做了如下调整
public void listen(ConsumerRecord<Integer, GenericRecord> messages, Acknowledgment ack) throws JsonProcessingException {
log.info(messages.key());
log.info(messages.value());
Shop shop = new ObjectMapper().readValue(messages.value().get("after").toString(), Shop.class);
log.info("NEW VALUE #####-> " + shop.getName());
//other logic here.
ack.acknowledge();
}
但是我又遇到了一个错误:
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.0.jar:2.7.0]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition bookingdev_sqip_local.booking.shop-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class bookingdev_sqip_local.booking.shop.Key specified in writer's schema whilst finding reader's schema for a SpecificRecord.
检查了 Schema-Registry Debezium 创建的两个主题 - 一个用于键,一个用于值。
["bookingdev_sqip_local.booking.shop-value","bookingdev_sqip_local.booking.shop-key"]
似乎是由于无法映射键的架构而导致的错误。
异常是因为您的 Kafka 侦听器方法应该接收 List<ShopMessage>
而不是 List<Message<ShopMessage>>
。
尝试更改此行:
public void listen(List<Message<ShopMessage>> messages, Acknowledgment ack){
收件人:
public void listen(List<ShopMessage> messages, Acknowledgment ack){
还有shopMapper.toChange(message.getPayload())
到shopMapper.toChange(message)
在努力解决这个 Spring 启动 <-> Kafka 连接器 <-> Debezium CDC MySQL 之后,好的。我得到了一个有效的应用程序。
架构:
MySQL(Producer) <-> Debezium CDC Kafka Connect <-> Kafka <-> SpringBoot (Consumer)
我正在使用 Schema-Registry 来存储架构配置。
ShopConsumerConfig.java
@EnableKafka
@Configuration
public class ShopsConsumerConfig {
private final KafkaProperties kafkaProperties;
public ShopsConsumerConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Shop> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Shop> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(false);
factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());
return factory;
}
@Bean
public ConsumerFactory<String, Shop> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaProperties.getProperties().get("schema-registry-url"));
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); //this thing is nasty do not turn to true for this!
props.put(KafkaAvroDeserializerConfig.USE_LATEST_VERSION, true);
return props;
}
}
Shop.java
或 Java POJO
import lombok.Data;
@Data
public class Shop {
Long id;
Double latitude, longitude;
String name;
String phone_number;
String place_id;
String type;
double rating;
String website;
int address_id;
String closing_hours;
String email;
int max_attendance;
String opening_hours;
String business_hours;
String close_day;
String description;
String is_open;
String weekend_business_hours;
Long user_shop_id;
}
最后是消费者 ShopConsumer.java
@Component
public class ShopConsumer {
private final Logger log = LogManager.getLogger(ShopConsumer.class);
@KafkaListener(
groupId = "${spring.kafka.consumer.group-id}",
topics = "${spring.kafka.consumer.topic}"
)
public void listen(ConsumerRecord<?, GenericRecord> messages, Acknowledgment ack) throws JsonProcessingException {
//debugging purposes only TODO remove me
log.info(messages.key());
log.info(messages.value());
log.info(messages.value().getSchema().getField("after"));
//convert the message, obtain the "after" section to get the newly updated value and parse it to Java POJO (in this case Shop.java)
Shop shop = new ObjectMapper().readValue(messages.value().get("after").toString(), Shop.class);
//debugging purposes only.
log.info("NEW VALUE #####-> " + shop.getName());
//other logic goes here...
ack.acknowledge();
}
}
我希望这对正在努力了解如何使用 Debezium 消息的任何人有所帮助。