Spring Cloud Stream Kafka Producer 消息
Spring Cloud Stream Kafka Producer messages
我想用 spring 引导设置一个 spring-cloud-stream-kafka 生产者。
生产者正在工作,我可以使用来自 kafka 代理的消息,但消息还包含一些 header 信息,如下所示:
contentType "text/plain"originalContentType "application/json;charset=UTF-8"{"message":"hello"}
我的 POJO 包含一个字段(字符串消息),所以我希望只有 JSON 字符串会发送到 kafka。
我的 RestController 中的方法 test() 触发了生产者:
@EnableBinding(ProducerChannels.class)
@SpringBootApplication
@RestController
public class KafkaStreamProducerApplication {
private MessageChannel consumer;
public KafkaStreamProducerApplication(ProducerChannels channels) {
this.consumer = channels.consumer();
}
@PostMapping("/test/{message}")
public void test(@PathVariable String message) {
Message<MyMessage> msg = MessageBuilder.withPayload(new MyMessage(message)).build();
this.consumer.send(msg);
}
interface ProducerChannels {
@Output
MessageChannel consumer();
}
我的application.properties
spring.cloud.stream.bindings.consumer.destination=consumer
spring.cloud.stream.bindings.consumer.content-type=application/json
如果您能推荐有关此主题的任何文档或示例,我将不胜感激。 github 上的示例通常非常薄,它们使用大量自动配置并且没有解释。我使用的示例是针对 RabbitMQ 的。
contentType
和 originalContentType
header 被 Spring Cloud Stream 在消费者应用程序反序列化消息并根据 content-type设置.
contentType
header 只有在您配置绑定的 content-type 时才明确设置,就像您在此处所做的那样 spring.cloud.stream.bindings.consumer.content-type=application/json
。
当设置 contentType
header 时,Spring Cloud Stream 在 producing/consuming 的 serialization/de-serialization 过程中使用 originalContentType
标志保留此 header消息 to/from 经纪人(通过活页夹)。
在您的情况下,我猜您可能根本不需要设置 contentType
。
除了 spring-cloud-stream-samples github 存储库中的示例之外,您还可以参考 out of the box app starters,它涵盖了广泛的应用程序,可以 运行 针对任何支持的活页夹(包括 Kafka)。
如果您想避免嵌入 headers(这样您就可以在某些 non-Spring Cloud Stream 应用程序中接收消息),请将生产者的 headerMode
设置为 raw
。
headerMode
When set to raw, disables header embedding on output. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when producing data for non-Spring Cloud Stream applications.
Default: embeddedHeaders.
我想用 spring 引导设置一个 spring-cloud-stream-kafka 生产者。
生产者正在工作,我可以使用来自 kafka 代理的消息,但消息还包含一些 header 信息,如下所示:
contentType "text/plain"originalContentType "application/json;charset=UTF-8"{"message":"hello"}
我的 POJO 包含一个字段(字符串消息),所以我希望只有 JSON 字符串会发送到 kafka。
我的 RestController 中的方法 test() 触发了生产者:
@EnableBinding(ProducerChannels.class)
@SpringBootApplication
@RestController
public class KafkaStreamProducerApplication {
private MessageChannel consumer;
public KafkaStreamProducerApplication(ProducerChannels channels) {
this.consumer = channels.consumer();
}
@PostMapping("/test/{message}")
public void test(@PathVariable String message) {
Message<MyMessage> msg = MessageBuilder.withPayload(new MyMessage(message)).build();
this.consumer.send(msg);
}
interface ProducerChannels {
@Output
MessageChannel consumer();
}
我的application.properties
spring.cloud.stream.bindings.consumer.destination=consumer
spring.cloud.stream.bindings.consumer.content-type=application/json
如果您能推荐有关此主题的任何文档或示例,我将不胜感激。 github 上的示例通常非常薄,它们使用大量自动配置并且没有解释。我使用的示例是针对 RabbitMQ 的。
contentType
和 originalContentType
header 被 Spring Cloud Stream 在消费者应用程序反序列化消息并根据 content-type设置.
contentType
header 只有在您配置绑定的 content-type 时才明确设置,就像您在此处所做的那样 spring.cloud.stream.bindings.consumer.content-type=application/json
。
当设置 contentType
header 时,Spring Cloud Stream 在 producing/consuming 的 serialization/de-serialization 过程中使用 originalContentType
标志保留此 header消息 to/from 经纪人(通过活页夹)。
在您的情况下,我猜您可能根本不需要设置 contentType
。
除了 spring-cloud-stream-samples github 存储库中的示例之外,您还可以参考 out of the box app starters,它涵盖了广泛的应用程序,可以 运行 针对任何支持的活页夹(包括 Kafka)。
如果您想避免嵌入 headers(这样您就可以在某些 non-Spring Cloud Stream 应用程序中接收消息),请将生产者的 headerMode
设置为 raw
。
headerMode
When set to raw, disables header embedding on output. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when producing data for non-Spring Cloud Stream applications.
Default: embeddedHeaders.