Spring Cloud Stream Kafka 发送消息

Spring Cloud Stream Kafka send message

如何使用新的 Spring Cloud Stream Kafka 功能模型发送消息?

弃用的方式如下所示。

public interface OutputTopic {

    @Output("output")
    MessageChannel output();
}

@Autowired
OutputTopic outputTopic;

public someMethod() {
    
    outputTopic.output().send(MessageBuilder.build());
}

但是我怎样才能以功能样式发送消息呢?

application.yml

spring:
  cloud:
    function:
      definition: process
    stream:
      bindings:
        process-out-0:
          destination: output
          binder: kafka
@Configuration
public class Configuration {

    @Bean
    Supplier<Message<String>> process() {
        return () -> {
            return MessageBuilder.withPayload("foo")
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()).build();
        };
    }

我会自动装配 MessageChannel,但是没有用于进程、process-out-0、输出或类似内容的 MessageChannel-Bean。或者我可以用 Supplier-Bean 发送消息吗? 有人可以给我举个例子吗? 非常感谢!

您可以使用 StreamBridge 或反应器 API - 参见 Sending arbitrary data to an output (e.g. Foreign event-driven sources)