从 JMS 队列中批量获取

Batch fetching from JMS queue

我需要实现以下工作流程:

  1. 每 N 毫秒从 JMS 队列中获取所有可用消息,但不超过 K 项
  2. 做一些处理
  3. 处理完成后确认所有这些

我对如何使用 JMS 正确实施步骤 #1 很感兴趣。

我尝试过的是:

  1. 创建 BlockingQueue 尺码 K
  2. 在我的 JMS onMessage 方法中 MessageListener 将消息放入 BlockingQueue
  3. 每 N 毫秒耗尽 BlockingQueue、处理消息并确认它们。

乍一看还不错,但问题是,如果在批处理过程中 onMessage 中获取了某些消息,那么一旦批处理完成,它也会在 BlockingQueue 并且尚未处理 (reference)。因此,如果我的应用出现故障,消息将根本不会被处理。

实施步骤 1 的更好方法是什么?它是 JMS API 中的某种东西还是某种标准方法?

JMS 不支持“批量获取”。消息必须一次 fetched/received 一条。但是,可以使用事务处理会话对多条消息进行批处理。

此外,通过使用 MessageListener 实现,您将放弃对应用程序何时接收消息的控制,因为只要将消息放入队列,就会调用 onMessage 方法。

我建议您完全删除 MessageListener。相反,每 N 秒使用一个事务处理 javax.jms.Session 并在您完成提交事务处理会话并确认所有消息时调用 javax.jms.MessageConsumer.receive(long) until it returns null (which means the queue is empty) or you have K messages. Then you can process all those messages and invoke javax.jms.Session.commit()

请记住,您的批次越大,重复工作的可能性就越大。例如,如果您在发生某种阻止您提交事务处理会话的故障之前接收到 K 条消息并处理了 K - 1 条消息,那么这些消息将被取消回队列,并可能稍后重新传送和处理。