Spring cloud stream kafka binder 是否有使用通用 json 消息的好例子

Is there a good example for Spring cloud stream kafka binder for consuming generic json message

我是 Spring 云流和 Kafka 的新手,我正在寻找一个很好的示例来使用来自 kafka 主题的 json 消息。

谢谢

您可以参考examples from spring-cloud-stream team(接收器和源项目)

获取应用程序 运行 有三种情况。

如果消费者和生产者是 Spring-Cloud-Stream (SCS) 应用程序,您只需将 content-type 设置为 application/json

#Specific channel
spring.cloud.stream.bindings.<channelName>.consumer.contentType=application/json
#For all channels
spring.cloud.stream.default.contentType=application/json

第二种情况是您的生产者不是 SCS 而您的消费者是 SCS,SCS 默认添加 headers 嵌入到有效负载中,因此您需要禁用该行为 headerMode作为 rawcontentType.

#Specific channel
spring.cloud.stream.bindings.<channelName>.consumer.headerMode=raw
#For all channels
spring.cloud.stream.default.consumer.headerMode=raw

在第三种情况下是 SCS 生产者而不是 SCS 消费者,在这种情况下,您需要使用 application/octet-stream 作为 contentType 因为 SCS 不支持原始 headers for String (there is a issue for that),因此您需要以字节

的形式发送有效负载
#Properties
spring.cloud.stream.default.contentType=application/octet-stream
spring.cloud.stream.default.producer.headerMode=raw

//Java
byte[] payload = jacksonObjectMapper.writeValueAsBytes(entity);
return channel.send(MessageBuilder.withPayload(payload).build());