Spring kafka 消息反序列化失败,因为 content-type 设置为 application/json

Spring kafka message deserialization fails because the content-type is set to application/json

我正在向Kafka发送消息如下:

private KafkaTemplate<String, MyMessage > kafkaTemplate;

public void sendMessage(MyMessage data) {

    Message<MyMessage > message = MessageBuilder
            .withPayload(data)
            .setHeader(KafkaHeaders.TOPIC,topic)
            .setHeader(KafkaHeaders.MESSAGE_KEY,data.getKey())
            .build();
    kafkaTemplate.send(message);

我有自己的自定义 de/serializer MyMessage,密钥是一个简单的字符串

我正在尝试按如下方式处理消息:

@StreamListener
    public void process(@Input("input") KStream<String,MyMessage> myStream){
            final Serde<String> stringSerde = Serdes.String();
            final MyMessageSerde myMessageSerde = new MyMessageSerde();
            myStream
                    .groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
                    .aggregate(ArrayList::new,
                            (newKey,val,agg) -> {
                                agg.add(val);
                                return agg;
                            },
                            Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
                                    .withKeySerde(stringSerde)
                                    .withValueSerde(new ArrayListSerde(myMessageSerde)));

    ...
    interface MyStreamProcessor {
        @Input("input")
        KStream<?, ?> input();
    }

这是由于反序列化错误而失败,调试后我看到原因是消息的 content-type header 设置为 application/json 因此它正在尝试反序列化消息为 JSON。 content-type header 如何设置以及如何覆盖它?我不是以 JSON 形式发送消息,而是像常规 Kafka 消息一样以字节 [] 形式发送消息,因此我希望使用自定义 de/serializer

更奇怪的是,如果我将其更改为 KTable,它就可以正常工作。

@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myTable){
        final Serde<String> stringSerde = Serdes.String();
        final MyMessageSerde myMessageSerde = new MyMessageSerde();
        KStream<String,MyMessage> myStream = myTable.toStream();
        myStream
                .groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
                .aggregate(ArrayList::new,
                        (newKey,val,agg) -> {
                            agg.add(val);
                            return agg;
                        },
                        Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
                                .withKeySerde(stringSerde)
                                .withValueSerde(new ArrayListSerde(myMessageSerde)));

...
interface MyStreamProcessor {
    @Input("input")
    KTable<?, ?> input();
}

为什么使用 KTable 可以正确反序列化消息? KStream 和 KTable 之间的行为有何不同?

之所以有效,是因为您在 .group 方法中指定了自定义(反)序列化。 查看文档的 this and this 部分,了解如何指定自定义值 serde。

我通过执行以下操作使它起作用:

  1. 在我的application.yml文件中设置输入内容类型header如下: spring.cloud.stream.bindings.input.content-type: application/mymessage

  2. 创建一个 CustomMessageConverter,它扩展 AbstractMessageConverter 以从 byte[] 转换为 MyMessage。它还定义了新的 MimeType

public MyMappingConverter() { super(new MimeType("application", "mymessage")); }

  1. 创建一个 MessageConverter bean

@Bean @StreamMessageConverter public MessageConverter myMessageConverter() { return new MyMessageConverter(); }

我仍然不知道为什么我必须为 KStream 而不是为 KTable 做所有这些,如果有人能解释一下,我将不胜感激。