Spring Boot kafka 值序列化器
SprintBoot kafka value-serializer
我有一个带有 apache kafka(开源流处理软件)的 SpringBoot 项目
我有这个听众
@KafkaListener(topics = "test")
public String consume(Hostel hostel) throws IOException {
}
这个序列化器
public class HostelSerializer implements Deserializer<Hostel> {
private final ObjectMapper objectMapper;
public InputRequestMessageSerializer(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public void configure(Map<String, ?> map, boolean b) {
this.configure(map,b);
}
@SneakyThrows
@Override
public Hostel deserialize(String s, byte[] bytes) {
return objectMapper.readValue(bytes, Hostel.class);
}
@Override
public void close() {
this.close();
}
}
并在属性中:
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
topics: test
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.kafka.config.HostelSerializer
当我收到一条消息时,我仍然很紧张,我有这个错误
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String]
to [com.message.Hostel] for GenericMessage [.....}]
查看日志,它没有从属性中获取值:
2020-04-09 16:41:30,840 INFO : gid: trace= span= [main] o.a.k.c.consumer.ConsumerConfig ConsumerConfig values:
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
你混de/serialization。由于您配置了 consumer,因此您只需要使用适当的 反序列化 接口和实现:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
topics: test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.kafka.config.HostelDeserializer
...和...
public class HostelDeserializer implements Deserializer<Hostel> { .. }
我有一个带有 apache kafka(开源流处理软件)的 SpringBoot 项目 我有这个听众
@KafkaListener(topics = "test")
public String consume(Hostel hostel) throws IOException {
}
这个序列化器
public class HostelSerializer implements Deserializer<Hostel> {
private final ObjectMapper objectMapper;
public InputRequestMessageSerializer(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public void configure(Map<String, ?> map, boolean b) {
this.configure(map,b);
}
@SneakyThrows
@Override
public Hostel deserialize(String s, byte[] bytes) {
return objectMapper.readValue(bytes, Hostel.class);
}
@Override
public void close() {
this.close();
}
}
并在属性中:
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
topics: test
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.kafka.config.HostelSerializer
当我收到一条消息时,我仍然很紧张,我有这个错误
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String]
to [com.message.Hostel] for GenericMessage [.....}]
查看日志,它没有从属性中获取值:
2020-04-09 16:41:30,840 INFO : gid: trace= span= [main] o.a.k.c.consumer.ConsumerConfig ConsumerConfig values:
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
你混de/serialization。由于您配置了 consumer,因此您只需要使用适当的 反序列化 接口和实现:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
topics: test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.kafka.config.HostelDeserializer
...和...
public class HostelDeserializer implements Deserializer<Hostel> { .. }