Spring 独立启动 CommandLineRunner 不会 return 使用 spring-starter-amqp
Spring boot standalone CommandLineRunner won't return with spring-starter-amqp
我正在构建一个经典的生产者 -> rabbitmq -> 消费者流程。
所有 3 个节点 运行 在单独的 jvm 甚至单独的主机上
Producer 是一个 spring 引导命令行 运行ner 应用程序,预计在完成生产后停止。
消费者应用程序是一个 spring 启动 Web 应用程序,它监听 3 个 rabbitmq 队列(2 个持久队列绑定到直接交换,1 个非持久队列绑定到扇出交换)
我的启动顺序如下:
- 启动rabbitmq
- 开始消费者
- 开始制作人
生产者和消费者 amqp 依赖关系mvn dependency:tree
[INFO] | +- org.springframework.boot:spring-boot-starter-amqp:jar:2.1.6.RELEASE:compile
[INFO] | | +- org.springframework:spring-messaging:jar:5.1.8.RELEASE:compile
[INFO] | | \- org.springframework.amqp:spring-rabbit:jar:2.1.7.RELEASE:compile
[INFO] | | +- org.springframework.amqp:spring-amqp:jar:2.1.7.RELEASE:compile
[INFO] | | | \- org.springframework.retry:spring-retry:jar:1.2.4.RELEASE:compile
[INFO] | | +- com.rabbitmq:amqp-client:jar:5.4.3:compile
[INFO] | | \- org.springframework:spring-tx:jar:5.1.8.RELEASE:compile
生产者代码
/**
* @author louis.gueye@gmail.com
*/
@RequiredArgsConstructor
@Slf4j
public class PlatformBrokerExampleProducerJob implements CommandLineRunner {
private final AmqpTemplate template;
@Override
public void run(String... args) {
final Instant now = Instant.now();
final Instant anHourAgo = now.minus(Duration.ofHours(1));
final String directExchangeName = "careassist_queues";
final String fanoutExchangeName = "careassist_schedules_topics";
IntStream.range(0, 60).boxed().forEach(i -> {
final SensorEventDto event = SensorEventDto.builder() //
.id(UUID.randomUUID().toString()) //
.businessId("sens-q7ikjxk1ftik") //
.timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
.state(SensorState.on) //
.build();
final String routingKey = "care.events";
template.convertAndSend(directExchangeName, routingKey, event);
log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), directExchangeName, routingKey);
});
IntStream.range(0, 60).boxed().forEach(i -> {
final SensorEventDto event = SensorEventDto.builder() //
.id(UUID.randomUUID().toString()) //
.businessId("sens-q7ikjxk1ftik") //
.timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
.state(SensorState.off) //
.build();
final String routingKey = "maintenance.events";
template.convertAndSend(directExchangeName, routingKey, event);
log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), directExchangeName, routingKey);
});
IntStream.range(0, 60).boxed().forEach(i -> {
final SensorEventDto event = SensorEventDto.builder() //
.id(UUID.randomUUID().toString()) //
.businessId("sens-q7ikjxk1ftik") //
.timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
.state(SensorState.off) //
.build();
final ScheduleDto schedule = ScheduleDto.builder().id(UUID.randomUUID().toString()) //
.destination("any.routing.queue") //
.message(event) //
.timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
.build();
final String routingKey = "#";
template.convertAndSend(fanoutExchangeName, routingKey, schedule);
log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), fanoutExchangeName, routingKey);
});
}
}
消费者代码(1个听众)
@Component
@RabbitListener(queues = {PlatformBrokerExampleCareEventsQueueConsumer.QUEUE_NAME})
@Slf4j
public class PlatformBrokerExampleCareEventsQueueConsumer {
public static final String QUEUE_NAME = "care_events";
@RabbitHandler
public void onMessage(SensorEventDto event) {
log.info("<<<<<<<<<<<< Received event [" + event + "] from {}...", PlatformBrokerExampleCareEventsQueueConsumer.QUEUE_NAME);
}
}
我希望生产者生产然后关闭,但相反,java 进程无限期挂起
如果能解释为什么生产者在生产消息后不会停止,我们将不胜感激。我怀疑它与 spring-started-amqp
有关,但我不确定。我当然不需要完整的罐子,只需要包含 AmqpTemplate
的小罐子
注意:消费者收到了所有消息
github project
感谢您的帮助。
PlatformBrokerClientConfiguration 绑定队列。但我看不到任何地方可以关闭队列。所以这可能是暂停您的实例的原因。
请试试这个。
public static void main(String[] args) {
System.exit(SpringApplication.exit(SpringApplication.run(EmployeeDataProduceApp.class, args)));
}
AMQP 客户端有一些后台线程。
您应该更改 main()
方法以在运行器 returns...
之后关闭应用程序上下文
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args).close();
}
它会干净利落地关闭一切,不像 System.exit()
那样残忍。
我正在构建一个经典的生产者 -> rabbitmq -> 消费者流程。 所有 3 个节点 运行 在单独的 jvm 甚至单独的主机上
Producer 是一个 spring 引导命令行 运行ner 应用程序,预计在完成生产后停止。
消费者应用程序是一个 spring 启动 Web 应用程序,它监听 3 个 rabbitmq 队列(2 个持久队列绑定到直接交换,1 个非持久队列绑定到扇出交换)
我的启动顺序如下: - 启动rabbitmq - 开始消费者 - 开始制作人
生产者和消费者 amqp 依赖关系mvn dependency:tree
[INFO] | +- org.springframework.boot:spring-boot-starter-amqp:jar:2.1.6.RELEASE:compile
[INFO] | | +- org.springframework:spring-messaging:jar:5.1.8.RELEASE:compile
[INFO] | | \- org.springframework.amqp:spring-rabbit:jar:2.1.7.RELEASE:compile
[INFO] | | +- org.springframework.amqp:spring-amqp:jar:2.1.7.RELEASE:compile
[INFO] | | | \- org.springframework.retry:spring-retry:jar:1.2.4.RELEASE:compile
[INFO] | | +- com.rabbitmq:amqp-client:jar:5.4.3:compile
[INFO] | | \- org.springframework:spring-tx:jar:5.1.8.RELEASE:compile
生产者代码
/**
* @author louis.gueye@gmail.com
*/
@RequiredArgsConstructor
@Slf4j
public class PlatformBrokerExampleProducerJob implements CommandLineRunner {
private final AmqpTemplate template;
@Override
public void run(String... args) {
final Instant now = Instant.now();
final Instant anHourAgo = now.minus(Duration.ofHours(1));
final String directExchangeName = "careassist_queues";
final String fanoutExchangeName = "careassist_schedules_topics";
IntStream.range(0, 60).boxed().forEach(i -> {
final SensorEventDto event = SensorEventDto.builder() //
.id(UUID.randomUUID().toString()) //
.businessId("sens-q7ikjxk1ftik") //
.timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
.state(SensorState.on) //
.build();
final String routingKey = "care.events";
template.convertAndSend(directExchangeName, routingKey, event);
log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), directExchangeName, routingKey);
});
IntStream.range(0, 60).boxed().forEach(i -> {
final SensorEventDto event = SensorEventDto.builder() //
.id(UUID.randomUUID().toString()) //
.businessId("sens-q7ikjxk1ftik") //
.timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
.state(SensorState.off) //
.build();
final String routingKey = "maintenance.events";
template.convertAndSend(directExchangeName, routingKey, event);
log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), directExchangeName, routingKey);
});
IntStream.range(0, 60).boxed().forEach(i -> {
final SensorEventDto event = SensorEventDto.builder() //
.id(UUID.randomUUID().toString()) //
.businessId("sens-q7ikjxk1ftik") //
.timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
.state(SensorState.off) //
.build();
final ScheduleDto schedule = ScheduleDto.builder().id(UUID.randomUUID().toString()) //
.destination("any.routing.queue") //
.message(event) //
.timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
.build();
final String routingKey = "#";
template.convertAndSend(fanoutExchangeName, routingKey, schedule);
log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), fanoutExchangeName, routingKey);
});
}
}
消费者代码(1个听众)
@Component
@RabbitListener(queues = {PlatformBrokerExampleCareEventsQueueConsumer.QUEUE_NAME})
@Slf4j
public class PlatformBrokerExampleCareEventsQueueConsumer {
public static final String QUEUE_NAME = "care_events";
@RabbitHandler
public void onMessage(SensorEventDto event) {
log.info("<<<<<<<<<<<< Received event [" + event + "] from {}...", PlatformBrokerExampleCareEventsQueueConsumer.QUEUE_NAME);
}
}
我希望生产者生产然后关闭,但相反,java 进程无限期挂起
如果能解释为什么生产者在生产消息后不会停止,我们将不胜感激。我怀疑它与 spring-started-amqp
有关,但我不确定。我当然不需要完整的罐子,只需要包含 AmqpTemplate
注意:消费者收到了所有消息
github project
感谢您的帮助。
PlatformBrokerClientConfiguration 绑定队列。但我看不到任何地方可以关闭队列。所以这可能是暂停您的实例的原因。
请试试这个。
public static void main(String[] args) {
System.exit(SpringApplication.exit(SpringApplication.run(EmployeeDataProduceApp.class, args)));
}
AMQP 客户端有一些后台线程。
您应该更改 main()
方法以在运行器 returns...
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args).close();
}
它会干净利落地关闭一切,不像 System.exit()
那样残忍。