MessageListener.onMessage 在 RabbitMQ 上不断被调用 Spring Boot
MessageListener.onMessage is getting called continuously on RabbitMQ with Spring Boot
我有 MessageListener.onMessage 线程休眠。我正在模拟 onMessage 的实际处理时间
方法将采取上述线程睡眠。但是,我注意到它会连续多次调用剩余的消息,直到它们被 onMessage 方法处理为止。我认为这是一种低效率。
队列中的实际消息数:1000
输出 运行 次点击
onMessage<<15656
onMessage<<15657
onMessage<<15658
onMessage<<15659
onMessage<<15660
onMessage<<15661
onMessage<<15662
onMessage<<15663
代码块
@Service
class ThreadPooledMessageListener implements MessageListener {
@Autowired
TaskExecutor threadPoolTaskExecutor;
AtomicInteger processedCount = new AtomicInteger();
@Override
public void onMessage(Message message) {
System.out.println("onMessage<<" + processedCount.incrementAndGet());
threadPoolTaskExecutor.execute(new MessageProcessor(message));
}
}
class MessageProcessor implements Runnable {
Message processingMessage;
public MessageProcessor(Message message) {
this.processingMessage = message;
}
@Override
public void run() {
System.out.println("================================"+ Thread.currentThread().getName());
System.out.println(processingMessage);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("================================");
}
}
有什么可能的修复方法。
正如@Gary Russell 指出的那样;问题是我在我的代码中使用了非 spring 托管容器 SimpleMessageListenerContainer。使用 spring 托管 bean 修复它并在那里定义并发。按预期工作。
固定代码段
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue);
container.setMessageListener(threadPooledMessageListener);
container.setConcurrentConsumers(4);
container.start();
return container;
}
>I see this as an inefficiency.
不清楚你的意思。由于您正在将消息的处理移交给另一个线程,因此侦听器会立即退出,当然,下一条消息也会被传递。
如果发生故障,这将有丢失消息的风险。
如果你正在尝试实现并发;最好设置容器 concurrentConsumers
属性 而不是在侦听器中进行自己的线程管理。容器将为您管理消费者。
我有 MessageListener.onMessage 线程休眠。我正在模拟 onMessage 的实际处理时间 方法将采取上述线程睡眠。但是,我注意到它会连续多次调用剩余的消息,直到它们被 onMessage 方法处理为止。我认为这是一种低效率。
队列中的实际消息数:1000
输出 运行 次点击
onMessage<<15656
onMessage<<15657
onMessage<<15658
onMessage<<15659
onMessage<<15660
onMessage<<15661
onMessage<<15662
onMessage<<15663
代码块
@Service
class ThreadPooledMessageListener implements MessageListener {
@Autowired
TaskExecutor threadPoolTaskExecutor;
AtomicInteger processedCount = new AtomicInteger();
@Override
public void onMessage(Message message) {
System.out.println("onMessage<<" + processedCount.incrementAndGet());
threadPoolTaskExecutor.execute(new MessageProcessor(message));
}
}
class MessageProcessor implements Runnable {
Message processingMessage;
public MessageProcessor(Message message) {
this.processingMessage = message;
}
@Override
public void run() {
System.out.println("================================"+ Thread.currentThread().getName());
System.out.println(processingMessage);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("================================");
}
}
有什么可能的修复方法。
正如@Gary Russell 指出的那样;问题是我在我的代码中使用了非 spring 托管容器 SimpleMessageListenerContainer。使用 spring 托管 bean 修复它并在那里定义并发。按预期工作。 固定代码段
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue);
container.setMessageListener(threadPooledMessageListener);
container.setConcurrentConsumers(4);
container.start();
return container;
}
>I see this as an inefficiency.
不清楚你的意思。由于您正在将消息的处理移交给另一个线程,因此侦听器会立即退出,当然,下一条消息也会被传递。
如果发生故障,这将有丢失消息的风险。
如果你正在尝试实现并发;最好设置容器 concurrentConsumers
属性 而不是在侦听器中进行自己的线程管理。容器将为您管理消费者。