使用springboot反序列化KafkaConsumer中的kafka消息
Deserialize kafka messages in KafkaConsumer using springboot
我有一个 springboot 应用程序可以监听 kafka 消息并将它们转换为对象
@KafkaListener(topics = "test", groupId = "group_id")
public void consume(String message) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
Hostel hostel = objectMapper.readValue(message, Hostel.class);
}
想知道能不能直接做ti
@KafkaListener(topics = "test", groupId = "group_id")
public void consume(Hostel hostel) throws IOException {
}
您可以使用 spring-kafka
来完成。但是你需要在容器工厂
中使用一个custom deserializer(或一个JsonDeserializer
)
@KafkaListener(topics = "test", groupId = "my.group", containerFactory = "myKafkaFactory")
fun genericMessageListener(myRequest: MyRequest, ack: Acknowledgment) {
//do Something with myRequest
ack.acknowledge()
}
您的 ContainerFactory 看起来像
@Bean
fun myKafkaFactory(): ConcurrentKafkaListenerContainerFactory<String, MyRequest> {
val factory = ConcurrentKafkaListenerContainerFactory<String, MyRequest>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configProps(), StringDeserializer(), MyRequestDeserializer())
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
return factory
}
你的反序列化器看起来像
public class MyRequestDeserializer implements Deserializer {
private static ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map map, boolean b) {
}
@Override
public MyRequest deserialize(String arg0, byte[] msgBytes) {
try {
return objectMapper.readValue(new String(msgBytes), MyRequest.class);
} catch (IOException ex) {
log.warn("JSON parse/ mapping exception occurred. ", ex);
return new MyRequest();
}
}
@Override
public void close() {
log.debug("MyRequestDeserializer closed");
}
}
或者,您可以使用 spring docs
中给出的默认值 JsonDeserializer
我有一个 springboot 应用程序可以监听 kafka 消息并将它们转换为对象
@KafkaListener(topics = "test", groupId = "group_id")
public void consume(String message) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
Hostel hostel = objectMapper.readValue(message, Hostel.class);
}
想知道能不能直接做ti
@KafkaListener(topics = "test", groupId = "group_id")
public void consume(Hostel hostel) throws IOException {
}
您可以使用 spring-kafka
来完成。但是你需要在容器工厂
JsonDeserializer
)
@KafkaListener(topics = "test", groupId = "my.group", containerFactory = "myKafkaFactory")
fun genericMessageListener(myRequest: MyRequest, ack: Acknowledgment) {
//do Something with myRequest
ack.acknowledge()
}
您的 ContainerFactory 看起来像
@Bean
fun myKafkaFactory(): ConcurrentKafkaListenerContainerFactory<String, MyRequest> {
val factory = ConcurrentKafkaListenerContainerFactory<String, MyRequest>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configProps(), StringDeserializer(), MyRequestDeserializer())
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
return factory
}
你的反序列化器看起来像
public class MyRequestDeserializer implements Deserializer {
private static ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map map, boolean b) {
}
@Override
public MyRequest deserialize(String arg0, byte[] msgBytes) {
try {
return objectMapper.readValue(new String(msgBytes), MyRequest.class);
} catch (IOException ex) {
log.warn("JSON parse/ mapping exception occurred. ", ex);
return new MyRequest();
}
}
@Override
public void close() {
log.debug("MyRequestDeserializer closed");
}
}
或者,您可以使用 spring docs
中给出的默认值JsonDeserializer