如何在我的 kafka 消费者中反序列化 json 列表和对象?
How to deserialize json list with object in my kafka consumer?
我的 Kafka Producer 正在发送 Json 格式的对象列表。
我想弄清楚如何让我的消费者反序列化列表。我能够接收单个对象并读取它,但是当我将代码更改为键入 List 时,出现以下错误:
Error:(32, 47) java: incompatible types: cannot infer type arguments for org.springframework.kafka.core.DefaultKafkaConsumerFactory<>
reason: inference variable V has incompatible equality constraints java.util.List<nl.domain.X>,nl.domain.X
编辑
此错误已通过向 JsonDeserilizer 添加 TypeReference 解决。
当前问题:
使用消息时,它不是我定义的类型(即 List< X > ),而是返回一个 LinkedHashMap
这是消费者配置:
@EnableKafka
@Configuration
public class KafkaConfiguration {
@Bean
public ConsumerFactory<String, List<X>> xConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(new TypeReference<List<X>>() {}));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, List<X>> xKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, List<X>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(xConsumerFactory());
return factory;
}
}
这是消费者:
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test", groupId = "group_json", containerFactory = "xKafkaListenerFactory")
public void consume(List<X> x) {
// In reality it is consuming LinkedHashMap which isn't what i want
x.forEach(i ->
System.out.println("Consumed message: " + i.getName()));
}
}
这是生产者配置:
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory<String, List<X>> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, List<X>> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
这是生产者:
@Component
public class KafkaProducer {
private final static String TOPIC = "test";
private final KafkaTemplate<String, List<X>> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, List<X>> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void publishMessage(List<X> x) {
// Created 3 instances of X for current example
List<X> list = new ArrayList();
list.add(new X("Apple"));
list.add(new X("Beach"));
list.add(new X("Money"));
ListenableFuture<SendResult<String, List<X>>> listenableFuture = kafkaTemplate.send(TOPIC, list);
}
}
总结:
制作人似乎工作正常。发送这样的消息时,我的消费者收到以下错误。它无法将 LinkedHashMap 转换为 List。
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void nl.infrastructure.input.message.kafka.consumer.KafkaConsumer.consume(java.util.List<nl.domain.X>)' threw exception; nested exception is java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class nl.domain.X (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; nl.domain.X is in unnamed module of loader 'app'); nested exception is java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class nl.domain.X (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; nl.domain.X is in unnamed module of loader 'app')
可能是因为 JsonDeserializer
导入类型错误
使用import org.springframework.kafka.support.serializer.JsonDeserializer;
您必须按如下方式配置 JsonDeserializer:
protected Deserializer<List<X>> kafkaDeserializer() {
ObjectMapper om = new ObjectMapper();
om.getTypeFactory().constructParametricType(List.class, X.class);
return new JsonDeserializer<>(om);
}
使用@AmanGarg 回答。稍微调整一下。 (不太清楚为什么它对我不起作用。)将单个 Class
转换为 POJO 是可行的,但转换为 List<X>
则不行。
如果有人遇到此问题,请添加它。
protected JsonDeserializer<List<X>> kafkaDeserializer() {
ObjectMapper om = new ObjectMapper();
JavaType type = om.getTypeFactory().constructParametricType(List.class, X.class);
return new JsonDeserializer<List<X>>(type, om, false);
}
使用了不同的 JsonDeserializer 构造函数。
我的 Kafka Producer 正在发送 Json 格式的对象列表。
我想弄清楚如何让我的消费者反序列化列表。我能够接收单个对象并读取它,但是当我将代码更改为键入 List 时,出现以下错误:
Error:(32, 47) java: incompatible types: cannot infer type arguments for org.springframework.kafka.core.DefaultKafkaConsumerFactory<>
reason: inference variable V has incompatible equality constraints java.util.List<nl.domain.X>,nl.domain.X
编辑
此错误已通过向 JsonDeserilizer 添加 TypeReference 解决。
当前问题:
使用消息时,它不是我定义的类型(即 List< X > ),而是返回一个 LinkedHashMap
这是消费者配置:
@EnableKafka
@Configuration
public class KafkaConfiguration {
@Bean
public ConsumerFactory<String, List<X>> xConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(new TypeReference<List<X>>() {}));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, List<X>> xKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, List<X>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(xConsumerFactory());
return factory;
}
}
这是消费者:
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test", groupId = "group_json", containerFactory = "xKafkaListenerFactory")
public void consume(List<X> x) {
// In reality it is consuming LinkedHashMap which isn't what i want
x.forEach(i ->
System.out.println("Consumed message: " + i.getName()));
}
}
这是生产者配置:
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory<String, List<X>> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, List<X>> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
这是生产者:
@Component
public class KafkaProducer {
private final static String TOPIC = "test";
private final KafkaTemplate<String, List<X>> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, List<X>> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void publishMessage(List<X> x) {
// Created 3 instances of X for current example
List<X> list = new ArrayList();
list.add(new X("Apple"));
list.add(new X("Beach"));
list.add(new X("Money"));
ListenableFuture<SendResult<String, List<X>>> listenableFuture = kafkaTemplate.send(TOPIC, list);
}
}
总结:
制作人似乎工作正常。发送这样的消息时,我的消费者收到以下错误。它无法将 LinkedHashMap 转换为 List。
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void nl.infrastructure.input.message.kafka.consumer.KafkaConsumer.consume(java.util.List<nl.domain.X>)' threw exception; nested exception is java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class nl.domain.X (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; nl.domain.X is in unnamed module of loader 'app'); nested exception is java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class nl.domain.X (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; nl.domain.X is in unnamed module of loader 'app')
可能是因为 JsonDeserializer
使用import org.springframework.kafka.support.serializer.JsonDeserializer;
您必须按如下方式配置 JsonDeserializer:
protected Deserializer<List<X>> kafkaDeserializer() {
ObjectMapper om = new ObjectMapper();
om.getTypeFactory().constructParametricType(List.class, X.class);
return new JsonDeserializer<>(om);
}
使用@AmanGarg 回答。稍微调整一下。 (不太清楚为什么它对我不起作用。)将单个 Class
转换为 POJO 是可行的,但转换为 List<X>
则不行。
如果有人遇到此问题,请添加它。
protected JsonDeserializer<List<X>> kafkaDeserializer() {
ObjectMapper om = new ObjectMapper();
JavaType type = om.getTypeFactory().constructParametricType(List.class, X.class);
return new JsonDeserializer<List<X>>(type, om, false);
}
使用了不同的 JsonDeserializer 构造函数。