Spring kafka 消息反序列化失败,因为 content-type 设置为 application/json
Spring kafka message deserialization fails because the content-type is set to application/json
我正在向Kafka发送消息如下:
private KafkaTemplate<String, MyMessage > kafkaTemplate;
public void sendMessage(MyMessage data) {
Message<MyMessage > message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC,topic)
.setHeader(KafkaHeaders.MESSAGE_KEY,data.getKey())
.build();
kafkaTemplate.send(message);
我有自己的自定义 de/serializer MyMessage,密钥是一个简单的字符串
我正在尝试按如下方式处理消息:
@StreamListener
public void process(@Input("input") KStream<String,MyMessage> myStream){
final Serde<String> stringSerde = Serdes.String();
final MyMessageSerde myMessageSerde = new MyMessageSerde();
myStream
.groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
.aggregate(ArrayList::new,
(newKey,val,agg) -> {
agg.add(val);
return agg;
},
Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
.withKeySerde(stringSerde)
.withValueSerde(new ArrayListSerde(myMessageSerde)));
...
interface MyStreamProcessor {
@Input("input")
KStream<?, ?> input();
}
这是由于反序列化错误而失败,调试后我看到原因是消息的 content-type header 设置为 application/json 因此它正在尝试反序列化消息为 JSON。 content-type header 如何设置以及如何覆盖它?我不是以 JSON 形式发送消息,而是像常规 Kafka 消息一样以字节 [] 形式发送消息,因此我希望使用自定义 de/serializer
更奇怪的是,如果我将其更改为 KTable,它就可以正常工作。
@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myTable){
final Serde<String> stringSerde = Serdes.String();
final MyMessageSerde myMessageSerde = new MyMessageSerde();
KStream<String,MyMessage> myStream = myTable.toStream();
myStream
.groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
.aggregate(ArrayList::new,
(newKey,val,agg) -> {
agg.add(val);
return agg;
},
Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
.withKeySerde(stringSerde)
.withValueSerde(new ArrayListSerde(myMessageSerde)));
...
interface MyStreamProcessor {
@Input("input")
KTable<?, ?> input();
}
为什么使用 KTable 可以正确反序列化消息? KStream 和 KTable 之间的行为有何不同?
之所以有效,是因为您在 .group
方法中指定了自定义(反)序列化。
查看文档的 this and this 部分,了解如何指定自定义值 serde。
我通过执行以下操作使它起作用:
在我的application.yml文件中设置输入内容类型header如下:
spring.cloud.stream.bindings.input.content-type: application/mymessage
创建一个 CustomMessageConverter,它扩展 AbstractMessageConverter 以从 byte[] 转换为 MyMessage。它还定义了新的 MimeType
public MyMappingConverter() {
super(new MimeType("application", "mymessage"));
}
- 创建一个 MessageConverter bean
@Bean
@StreamMessageConverter
public MessageConverter myMessageConverter() {
return new MyMessageConverter();
}
我仍然不知道为什么我必须为 KStream 而不是为 KTable 做所有这些,如果有人能解释一下,我将不胜感激。
我正在向Kafka发送消息如下:
private KafkaTemplate<String, MyMessage > kafkaTemplate;
public void sendMessage(MyMessage data) {
Message<MyMessage > message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC,topic)
.setHeader(KafkaHeaders.MESSAGE_KEY,data.getKey())
.build();
kafkaTemplate.send(message);
我有自己的自定义 de/serializer MyMessage,密钥是一个简单的字符串
我正在尝试按如下方式处理消息:
@StreamListener
public void process(@Input("input") KStream<String,MyMessage> myStream){
final Serde<String> stringSerde = Serdes.String();
final MyMessageSerde myMessageSerde = new MyMessageSerde();
myStream
.groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
.aggregate(ArrayList::new,
(newKey,val,agg) -> {
agg.add(val);
return agg;
},
Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
.withKeySerde(stringSerde)
.withValueSerde(new ArrayListSerde(myMessageSerde)));
...
interface MyStreamProcessor {
@Input("input")
KStream<?, ?> input();
}
这是由于反序列化错误而失败,调试后我看到原因是消息的 content-type header 设置为 application/json 因此它正在尝试反序列化消息为 JSON。 content-type header 如何设置以及如何覆盖它?我不是以 JSON 形式发送消息,而是像常规 Kafka 消息一样以字节 [] 形式发送消息,因此我希望使用自定义 de/serializer
更奇怪的是,如果我将其更改为 KTable,它就可以正常工作。
@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myTable){
final Serde<String> stringSerde = Serdes.String();
final MyMessageSerde myMessageSerde = new MyMessageSerde();
KStream<String,MyMessage> myStream = myTable.toStream();
myStream
.groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
.aggregate(ArrayList::new,
(newKey,val,agg) -> {
agg.add(val);
return agg;
},
Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
.withKeySerde(stringSerde)
.withValueSerde(new ArrayListSerde(myMessageSerde)));
...
interface MyStreamProcessor {
@Input("input")
KTable<?, ?> input();
}
为什么使用 KTable 可以正确反序列化消息? KStream 和 KTable 之间的行为有何不同?
之所以有效,是因为您在 .group
方法中指定了自定义(反)序列化。
查看文档的 this and this 部分,了解如何指定自定义值 serde。
我通过执行以下操作使它起作用:
在我的application.yml文件中设置输入内容类型header如下:
spring.cloud.stream.bindings.input.content-type: application/mymessage
创建一个 CustomMessageConverter,它扩展 AbstractMessageConverter 以从 byte[] 转换为 MyMessage。它还定义了新的 MimeType
public MyMappingConverter() {
super(new MimeType("application", "mymessage"));
}
- 创建一个 MessageConverter bean
@Bean
@StreamMessageConverter
public MessageConverter myMessageConverter() {
return new MyMessageConverter();
}
我仍然不知道为什么我必须为 KStream 而不是为 KTable 做所有这些,如果有人能解释一下,我将不胜感激。