从 JMS 队列中批量获取
Batch fetching from JMS queue
我需要实现以下工作流程:
- 每 N 毫秒从 JMS 队列中获取所有可用消息,但不超过 K 项
- 做一些处理
- 处理完成后确认所有这些
我对如何使用 JMS 正确实施步骤 #1 很感兴趣。
我尝试过的是:
- 创建
BlockingQueue
尺码 K
- 在我的 JMS
onMessage
方法中 MessageListener
将消息放入 BlockingQueue
。
- 每 N 毫秒耗尽
BlockingQueue
、处理消息并确认它们。
乍一看还不错,但问题是,如果在批处理过程中 onMessage
中获取了某些消息,那么一旦批处理完成,它也会在 BlockingQueue
并且尚未处理 (reference)。因此,如果我的应用出现故障,消息将根本不会被处理。
实施步骤 1 的更好方法是什么?它是 JMS API 中的某种东西还是某种标准方法?
JMS 不支持“批量获取”。消息必须一次 fetched/received 一条。但是,可以使用事务处理会话对多条消息进行批处理。
此外,通过使用 MessageListener
实现,您将放弃对应用程序何时接收消息的控制,因为只要将消息放入队列,就会调用 onMessage
方法。
我建议您完全删除 MessageListener
。相反,每 N 秒使用一个事务处理 javax.jms.Session
并在您完成提交事务处理会话并确认所有消息时调用 javax.jms.MessageConsumer.receive(long)
until it returns null
(which means the queue is empty) or you have K messages. Then you can process all those messages and invoke javax.jms.Session.commit()
。
请记住,您的批次越大,重复工作的可能性就越大。例如,如果您在发生某种阻止您提交事务处理会话的故障之前接收到 K 条消息并处理了 K - 1 条消息,那么这些消息将被取消回队列,并可能稍后重新传送和处理。
我需要实现以下工作流程:
- 每 N 毫秒从 JMS 队列中获取所有可用消息,但不超过 K 项
- 做一些处理
- 处理完成后确认所有这些
我对如何使用 JMS 正确实施步骤 #1 很感兴趣。
我尝试过的是:
- 创建
BlockingQueue
尺码 K - 在我的 JMS
onMessage
方法中MessageListener
将消息放入BlockingQueue
。 - 每 N 毫秒耗尽
BlockingQueue
、处理消息并确认它们。
乍一看还不错,但问题是,如果在批处理过程中 onMessage
中获取了某些消息,那么一旦批处理完成,它也会在 BlockingQueue
并且尚未处理 (reference)。因此,如果我的应用出现故障,消息将根本不会被处理。
实施步骤 1 的更好方法是什么?它是 JMS API 中的某种东西还是某种标准方法?
JMS 不支持“批量获取”。消息必须一次 fetched/received 一条。但是,可以使用事务处理会话对多条消息进行批处理。
此外,通过使用 MessageListener
实现,您将放弃对应用程序何时接收消息的控制,因为只要将消息放入队列,就会调用 onMessage
方法。
我建议您完全删除 MessageListener
。相反,每 N 秒使用一个事务处理 javax.jms.Session
并在您完成提交事务处理会话并确认所有消息时调用 javax.jms.MessageConsumer.receive(long)
until it returns null
(which means the queue is empty) or you have K messages. Then you can process all those messages and invoke javax.jms.Session.commit()
。
请记住,您的批次越大,重复工作的可能性就越大。例如,如果您在发生某种阻止您提交事务处理会话的故障之前接收到 K 条消息并处理了 K - 1 条消息,那么这些消息将被取消回队列,并可能稍后重新传送和处理。