Spring Cloud Stream Kafka 发送消息
Spring Cloud Stream Kafka send message
如何使用新的 Spring Cloud Stream Kafka 功能模型发送消息?
弃用的方式如下所示。
public interface OutputTopic {
@Output("output")
MessageChannel output();
}
@Autowired
OutputTopic outputTopic;
public someMethod() {
outputTopic.output().send(MessageBuilder.build());
}
但是我怎样才能以功能样式发送消息呢?
application.yml
spring:
cloud:
function:
definition: process
stream:
bindings:
process-out-0:
destination: output
binder: kafka
@Configuration
public class Configuration {
@Bean
Supplier<Message<String>> process() {
return () -> {
return MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()).build();
};
}
我会自动装配 MessageChannel,但是没有用于进程、process-out-0、输出或类似内容的 MessageChannel-Bean。或者我可以用 Supplier-Bean 发送消息吗?
有人可以给我举个例子吗?
非常感谢!
您可以使用 StreamBridge
或反应器 API - 参见 Sending arbitrary data to an output (e.g. Foreign event-driven sources)
如何使用新的 Spring Cloud Stream Kafka 功能模型发送消息?
弃用的方式如下所示。
public interface OutputTopic {
@Output("output")
MessageChannel output();
}
@Autowired
OutputTopic outputTopic;
public someMethod() {
outputTopic.output().send(MessageBuilder.build());
}
但是我怎样才能以功能样式发送消息呢?
application.yml
spring:
cloud:
function:
definition: process
stream:
bindings:
process-out-0:
destination: output
binder: kafka
@Configuration
public class Configuration {
@Bean
Supplier<Message<String>> process() {
return () -> {
return MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()).build();
};
}
我会自动装配 MessageChannel,但是没有用于进程、process-out-0、输出或类似内容的 MessageChannel-Bean。或者我可以用 Supplier-Bean 发送消息吗? 有人可以给我举个例子吗? 非常感谢!
您可以使用 StreamBridge
或反应器 API - 参见 Sending arbitrary data to an output (e.g. Foreign event-driven sources)