停止 Spring Cloud Stream @StreamListener 监听,直到收到一些 Spring 事件
Stop Spring Cloud Stream @StreamListener from listening until receipt of some Spring Event
我正在开发 Camunda BPM Spring 启动应用程序。该应用程序使用 Spring Cloud Stream 从 rabbitmq 队列中读取消息。收到消息后,应用程序将调用 Camunda 中的流程实例。
如果在应用程序启动期间 rabbitmq 队列中已经有消息,云流侦听器甚至在 Camunda 初始化之前就开始读取消息。
是否可以阻止云流侦听器侦听队列,直到触发某个事件 - 在本例中为 PostDeployEvent。
我创建了一个示例应用程序以供参考
https://github.com/kpkurian/spring-cloud-stream-camunda
谢谢!!
根据@OlegZhurakoussky 的建议
问题
RuntimeService 是自动装配的,当应用程序启动时,假设所有服务、bean 等都已完全初始化。如果它仍在经历初始化和启动的过程,那么从 Spring 习语的角度来看,它没有正确实现。
解决方案
使用自定义生命周期实现包装 RuntimeService,在执行其 start() 方法之前不会return确保 RuntmeService 准备就绪。
我已经在示例 github 应用程序中实现了这个
Spring cloud stream (kafka binder) 添加暂停和恢复消费者的方法
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}
请查看文档
https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples
但我认为暂停方法存在一些问题:
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/479
PS/
您可以在示例侦听器中获取分区 ID 和主题名称:
@StreamListener(Sink.INPUT)
public void in(String in,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
TopicPartition p = new TopicPartition(topic, partition);
consumer.pause(Collections.singleton(p));
}
或者在errorChannel global Listener
@StreamListener("errorChannel")
public void errorGlobal(Message<?> message) {
Message<?> failedMessage = ((ErrorMessage)message).getOriginalMessage();
Consumer consumer = (Consumer)failedMessage.getHeaders().get(KafkaHeaders.CONSUMER);
int partition = (int) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
String topic = (String) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
TopicPartition p = new TopicPartition(topic, partition);
// ?
consumer.pause(Collections.singleton(p));
}
我正在开发 Camunda BPM Spring 启动应用程序。该应用程序使用 Spring Cloud Stream 从 rabbitmq 队列中读取消息。收到消息后,应用程序将调用 Camunda 中的流程实例。
如果在应用程序启动期间 rabbitmq 队列中已经有消息,云流侦听器甚至在 Camunda 初始化之前就开始读取消息。
是否可以阻止云流侦听器侦听队列,直到触发某个事件 - 在本例中为 PostDeployEvent。
我创建了一个示例应用程序以供参考 https://github.com/kpkurian/spring-cloud-stream-camunda
谢谢!!
根据@OlegZhurakoussky 的建议
问题
RuntimeService 是自动装配的,当应用程序启动时,假设所有服务、bean 等都已完全初始化。如果它仍在经历初始化和启动的过程,那么从 Spring 习语的角度来看,它没有正确实现。
解决方案
使用自定义生命周期实现包装 RuntimeService,在执行其 start() 方法之前不会return确保 RuntmeService 准备就绪。
我已经在示例 github 应用程序中实现了这个
Spring cloud stream (kafka binder) 添加暂停和恢复消费者的方法
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}
请查看文档 https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples
但我认为暂停方法存在一些问题: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/479
PS/ 您可以在示例侦听器中获取分区 ID 和主题名称:
@StreamListener(Sink.INPUT)
public void in(String in,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
TopicPartition p = new TopicPartition(topic, partition);
consumer.pause(Collections.singleton(p));
}
或者在errorChannel global Listener
@StreamListener("errorChannel")
public void errorGlobal(Message<?> message) {
Message<?> failedMessage = ((ErrorMessage)message).getOriginalMessage();
Consumer consumer = (Consumer)failedMessage.getHeaders().get(KafkaHeaders.CONSUMER);
int partition = (int) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
String topic = (String) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
TopicPartition p = new TopicPartition(topic, partition);
// ?
consumer.pause(Collections.singleton(p));
}