停止 Spring Cloud Stream @StreamListener 监听,直到收到一些 Spring 事件

Stop Spring Cloud Stream @StreamListener from listening until receipt of some Spring Event

我正在开发 Camunda BPM Spring 启动应用程序。该应用程序使用 Spring Cloud Stream 从 rabbitmq 队列中读取消息。收到消息后,应用程序将调用 Camunda 中的流程实例。

如果在应用程序启动期间 rabbitmq 队列中已经有消息,云流侦听器甚至在 Camunda 初始化之前就开始读取消息。

是否可以阻止云流侦听器侦听队列,直到触发某个事件 - 在本例中为 PostDeployEvent。

我创建了一个示例应用程序以供参考 https://github.com/kpkurian/spring-cloud-stream-camunda

谢谢!!

根据@OlegZhurakoussky 的建议

问题

RuntimeService 是自动装配的,当应用程序启动时,假设所有服务、bean 等都已完全初始化。如果它仍在经历初始化和启动的过程,那么从 Spring 习语的角度来看,它没有正确实现。

解决方案

使用自定义生命周期实现包装 RuntimeService,在执行其 start() 方法之前不会return确保 RuntmeService 准备就绪。

我已经在示例 github 应用程序中实现了这个

Spring cloud stream (kafka binder) 添加暂停和恢复消费者的方法

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
        System.out.println(in);
        consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
    }

    @Bean
    public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
        return event -> {
            System.out.println(event);
            if (event.getConsumer().paused().size() > 0) {
                event.getConsumer().resume(event.getConsumer().paused());
            }
        };
    }
}

请查看文档 https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples

但我认为暂停方法存在一些问题: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/479

PS/ 您可以在示例侦听器中获取分区 ID 和主题名称:

  @StreamListener(Sink.INPUT)
  public void in(String in,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
    System.out.println(in);
    TopicPartition p = new TopicPartition(topic, partition);
    consumer.pause(Collections.singleton(p));
  }

或者在errorChannel global Listener

   @StreamListener("errorChannel")
  public void errorGlobal(Message<?> message) {
    Message<?> failedMessage = ((ErrorMessage)message).getOriginalMessage();
    Consumer consumer = (Consumer)failedMessage.getHeaders().get(KafkaHeaders.CONSUMER);
    int partition = (int) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
    String topic = (String) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
    TopicPartition p = new TopicPartition(topic, partition);
    // ?
    consumer.pause(Collections.singleton(p));
}