Spring Cloud Stream 将值生成为包含 JSON 而不仅仅是 JSON 的字符串

Spring Cloud Stream generates value as string containing JSON instead of just JSON

在使用 Spring Cloud Stream 的流处理应用程序中,我正在获取输入流(以整数为键)并对其调用 selectKey 以创建具有相同值的新主题,但是使用不同的键(字符串)。输入主题中包含正确 JSON 格式的记录,例如:

"key": {
  "id": 1
},
"value": {
  "id": 1,
  "public_id": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a", ...

问题是流处理应用程序创建的主题具有 value 作为包含 JSON 的字符串而不是正确的 JSON,即:

"key": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a",
"value": "{\"id\":1,\"publicId\":\"4273b60f-6fe6-40be-8602-d0b3ed2ecf2a\"}"

代码如下:

@StreamListener
@SendTo("output")
fun process(@Input("input") stream: KStream<Int, MyObj>): KStream<String, MyObj> =
         stream.selectKey { _, value -> value.publicId }

上面的函数所做的是消耗输入流,并生成输出流(发送到 output)。该输出流与输入流具有相同的值,只是键不同。 (在这种情况下,键来自值的 publicId 属性。)

application.yml如下:

spring.cloud.stream:
  bindings:
    input:
      destination: input-topic
    output:
      destination: output-output
  kafka:
    streams:
      binder:
        application-id: test-app-id-1
      bindings:
        input:
          consumer:
            keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
        output:
          producer:
            keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde

有什么我想念的吗?这实际上是一个问题,还是 JSON 可以作为字符串存储在 Spring Cloud Stream 生成的消息中?

我尝试过但没有效果的其他方法:

这意味着您将 publicId: "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a" 作为字符串而不是 POJO 发送。

如果这就是您发送的内容,您应该使用 StringSerde 而不是 JsonSerde

编辑

我刚刚使用 Java 应用对其进行了测试,它按预期工作...

@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class So58538297Application {

    public static void main(String[] args) {
        SpringApplication.run(So58538297Application.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public KStream<String, Foo> process(@Input(Processor.INPUT) KStream<String, Foo> stream) {
        return stream.selectKey((key, value) -> value.getBar());
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        ObjectMapper mapper = new ObjectMapper();
        return args -> {
            template.send(Processor.INPUT, mapper.writeValueAsString(new Foo("baz")));
        };
    }

    @KafkaListener(id = "outputGroup", topics = Processor.OUTPUT)
    public void out(String in, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        System.out.println("out:" + in + ", key:" + key);
    }

    @KafkaListener(id = "copyOfInput", topics = Processor.INPUT)
    public void in(String in) {
        System.out.println("in:" + in);
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

    }

}

spring.application.name=so58538297

spring.kafka.consumer.auto-offset-reset=earliest

in:{"bar":"baz"}
out:{"bar":"baz"}, key:baz