如何使用 Spring Cloud Stream w/o deprecated @Output 只生成一条消息,或关闭轮询?

How do I produce just one message using Spring Cloud Stream w/o deprecated @Output, or turn off polling?

我正在尝试使用 Spring 云向 Kafka 发布 一条 消息,没有任何已弃用的 classes/methods 或注释。我也希望能够轻松更改有效负载。

因此,为了清楚起见,我尽量不使用 已弃用 @Output 注释,也不使用任何 KafkaTemplate.

我的配置:

spring:
  cloud:
    stream:
      bindings:
        message-out-0:
          destination: ${spring.application.name}
          producer:
            key:
              serializer:
                type: string
                format: utf-8
                charset: utf-8
            value:
              serializer:
                type: string
                format: utf-8
                charset: utf-8

我的代码 - 到目前为止我尝试过的代码:

@Component
@RequiredArgsConstructor
public class ApplicationAnnouncer implements CommandLineRunner {
    
    private final MessageService messageService;
    
    @Override
    public void run(String... args) throws Exception {
        messageService.value = "Application started...";
        messageService.message();
    }
}

一次尝试:

@Configuration
public class MessageService {
     public Object value;
     
     @Bean
     public Supplier<Message<?>> message () {
          return () -> MessageBuilder.withPayload(value).build();
     }
}

另一次尝试:

@Configuration
public class MessageService {
     public Object value;
     
     @Bean
     public Supplier<Flux<?>> message () {
          return () -> Flux.fromStream(Stream.generate(() -> {
               try {
                    Thread.sleep(1000);
                    return value;
               } catch (Exception e) {
                    // ignore
               }
               return null;
          })).subscribeOn(Schedulers.elastic()).share();
     }
}

两次尝试在控制台消费者中的输出:

Hello World!
Hello World!
Hello World!
Hello World! // ... Repeated every second

documentation 状态:

The framework provides a default polling mechanism (answering the question of "Who?") that will trigger the invocation of the supplier and by default it will do so every second (answering the question of "How often?").

但是如果我不希望它每秒轮询一次怎么办?

我向 MessageService 提供消息的方式很奇怪... 是配置吗?还是服务?

我还没有找到仅将 ONE CUSTOMIZABLE MESSAGE 推送到 Kafka 的最基本示例。

您可以使用 StreamBridge:

进入云流绑定
@Component
@RequiredArgsConstructor
public class ApplicationAnnouncer implements CommandLineRunner {

    private final StreamBridge streamBridge;
    
    @Override
    public void run(String... args) throws Exception {
        streamBridge.send("message-out-0", "Application started...");
    }
}

第一个字符串是从提供该功能的 bean 派生的应用程序设置中提供的绑定名称。

您甚至不需要派生 绑定名称 的实际 bean。在这种情况下,任何名称都可以。


您可以找到一些示例 here