Spring Cloud Stream 将值生成为包含 JSON 而不仅仅是 JSON 的字符串
Spring Cloud Stream generates value as string containing JSON instead of just JSON
在使用 Spring Cloud Stream 的流处理应用程序中,我正在获取输入流(以整数为键)并对其调用 selectKey
以创建具有相同值的新主题,但是使用不同的键(字符串)。输入主题中包含正确 JSON 格式的记录,例如:
"key": {
"id": 1
},
"value": {
"id": 1,
"public_id": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a", ...
问题是流处理应用程序创建的主题具有 value
作为包含 JSON 的字符串而不是正确的 JSON,即:
"key": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a",
"value": "{\"id\":1,\"publicId\":\"4273b60f-6fe6-40be-8602-d0b3ed2ecf2a\"}"
代码如下:
@StreamListener
@SendTo("output")
fun process(@Input("input") stream: KStream<Int, MyObj>): KStream<String, MyObj> =
stream.selectKey { _, value -> value.publicId }
上面的函数所做的是消耗输入流,并生成输出流(发送到 output
)。该输出流与输入流具有相同的值,只是键不同。 (在这种情况下,键来自值的 publicId
属性。)
application.yml
如下:
spring.cloud.stream:
bindings:
input:
destination: input-topic
output:
destination: output-output
kafka:
streams:
binder:
application-id: test-app-id-1
bindings:
input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
output:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
有什么我想念的吗?这实际上是一个问题,还是 JSON 可以作为字符串存储在 Spring Cloud Stream 生成的消息中?
我尝试过但没有效果的其他方法:
- 使用原生 decoding/encoding
- 正在将
spring.cloud.stream.bindings.output.content-type
设置为 application/json
- 使用
map
代替 selectKey
这意味着您将 publicId: "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a"
作为字符串而不是 POJO 发送。
如果这就是您发送的内容,您应该使用 StringSerde
而不是 JsonSerde
。
编辑
我刚刚使用 Java 应用对其进行了测试,它按预期工作...
@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class So58538297Application {
public static void main(String[] args) {
SpringApplication.run(So58538297Application.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public KStream<String, Foo> process(@Input(Processor.INPUT) KStream<String, Foo> stream) {
return stream.selectKey((key, value) -> value.getBar());
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
ObjectMapper mapper = new ObjectMapper();
return args -> {
template.send(Processor.INPUT, mapper.writeValueAsString(new Foo("baz")));
};
}
@KafkaListener(id = "outputGroup", topics = Processor.OUTPUT)
public void out(String in, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("out:" + in + ", key:" + key);
}
@KafkaListener(id = "copyOfInput", topics = Processor.INPUT)
public void in(String in) {
System.out.println("in:" + in);
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
}
}
和
spring.application.name=so58538297
spring.kafka.consumer.auto-offset-reset=earliest
和
in:{"bar":"baz"}
out:{"bar":"baz"}, key:baz
在使用 Spring Cloud Stream 的流处理应用程序中,我正在获取输入流(以整数为键)并对其调用 selectKey
以创建具有相同值的新主题,但是使用不同的键(字符串)。输入主题中包含正确 JSON 格式的记录,例如:
"key": {
"id": 1
},
"value": {
"id": 1,
"public_id": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a", ...
问题是流处理应用程序创建的主题具有 value
作为包含 JSON 的字符串而不是正确的 JSON,即:
"key": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a",
"value": "{\"id\":1,\"publicId\":\"4273b60f-6fe6-40be-8602-d0b3ed2ecf2a\"}"
代码如下:
@StreamListener
@SendTo("output")
fun process(@Input("input") stream: KStream<Int, MyObj>): KStream<String, MyObj> =
stream.selectKey { _, value -> value.publicId }
上面的函数所做的是消耗输入流,并生成输出流(发送到 output
)。该输出流与输入流具有相同的值,只是键不同。 (在这种情况下,键来自值的 publicId
属性。)
application.yml
如下:
spring.cloud.stream:
bindings:
input:
destination: input-topic
output:
destination: output-output
kafka:
streams:
binder:
application-id: test-app-id-1
bindings:
input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
output:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
有什么我想念的吗?这实际上是一个问题,还是 JSON 可以作为字符串存储在 Spring Cloud Stream 生成的消息中?
我尝试过但没有效果的其他方法:
- 使用原生 decoding/encoding
- 正在将
spring.cloud.stream.bindings.output.content-type
设置为application/json
- 使用
map
代替selectKey
这意味着您将 publicId: "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a"
作为字符串而不是 POJO 发送。
如果这就是您发送的内容,您应该使用 StringSerde
而不是 JsonSerde
。
编辑
我刚刚使用 Java 应用对其进行了测试,它按预期工作...
@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class So58538297Application {
public static void main(String[] args) {
SpringApplication.run(So58538297Application.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public KStream<String, Foo> process(@Input(Processor.INPUT) KStream<String, Foo> stream) {
return stream.selectKey((key, value) -> value.getBar());
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
ObjectMapper mapper = new ObjectMapper();
return args -> {
template.send(Processor.INPUT, mapper.writeValueAsString(new Foo("baz")));
};
}
@KafkaListener(id = "outputGroup", topics = Processor.OUTPUT)
public void out(String in, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("out:" + in + ", key:" + key);
}
@KafkaListener(id = "copyOfInput", topics = Processor.INPUT)
public void in(String in) {
System.out.println("in:" + in);
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
}
}
和
spring.application.name=so58538297
spring.kafka.consumer.auto-offset-reset=earliest
和
in:{"bar":"baz"}
out:{"bar":"baz"}, key:baz