ActiveMQ 消费者等待重新传递的消息而不是处理队列中的其他消息

ActiveMQ consumer waiting for redelivered messages instead of processing other messages in the queue

我正在使用 ActiveMQ 5.15.6,我想从队列中取出 2 条消息,处理它们,然后提交事务会话。

但是,如果抛出 Exception,消息将被标记为已重新传递并返回到队列,应用程序会等待直到重新传递延迟到期,然后再次尝试处理它们。

我需要处理 then queue 中的其他消息,直到延迟的消息再次准备好发送。

代码只是概念证明:)

public class ActiveMqService {

    private final ActiveMQConnectionFactory cf;
    private final Connection connection;
    private final Session session;
    private final Queue queue;
    private final MessageConsumer consumer;

    public ActiveMqService() throws JMSException {
        cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        RedeliveryPolicy redeliveryPolicy = cf.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(5000L);
        redeliveryPolicy.setRedeliveryDelay(5000L);
        redeliveryPolicy.setMaximumRedeliveries(100);
        ActiveMQPrefetchPolicy prefetchPolicy = cf.getPrefetchPolicy();
        prefetchPolicy.setQueuePrefetch(2);
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
        connection = cf.createConnection(); //credentials
        session = connection.createSession(true, Session.SESSION_TRANSACTED);
        queue = session.createQueue("test");
        connection.start();
        consumer = session.createConsumer(queue);

    }

    @PostConstruct
    public void read() throws JMSException {

        while (true) {
            List messages = new ArrayList<Message>();
            for (int i = 0; i < 2; i++) {
                Message message = consumer.receive(1000L);
                if (message != null) {
                    messages.add(message);
                } else {
                    break; // no more messages available for this batch
                }
            }

            if (messages.size() > 0) {
                try {
                    Random random = new Random();
                    if (1 == random.nextInt(2)) {
                        for (Object message : messages) {
                            TextMessage textMessage = (TextMessage) message;
                            log.info("Message {}", textMessage.getText());
                        }
                        session.commit();
                    } else {
                        throw new Exception("vyjimka");
                    }
                } catch (Exception e) {
                    List mess = new ArrayList();
                    for (Object message : messages) {
                        TextMessage textMessage = (TextMessage) message;
                        mess.add(textMessage.getText());
                    }
                    log.info("rollbackuju message {}", mess);
                    session.rollback();
                }
            }
        }
    }
}

在日志中您可以看到,在回滚应用程序等待 5 秒(设置延迟)后重新发送消息,但不处理队列中的其他消息。

2021-12-09 17:05:47.605  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 7
2021-12-09 17:05:47.606  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 3
2021-12-09 17:05:47.612  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : rollbackuju message [8, 5]
2021-12-09 17:05:52.623  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : rollbackuju message [8, 5]
2021-12-09 17:05:57.634  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 8
2021-12-09 17:05:57.635  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 5
2021-12-09 17:05:57.642  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 9
2021-12-09 17:05:57.642  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 2
2021-12-09 17:05:57.648  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : rollbackuju message [6, 4]
2021-12-09 17:06:02.655  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 6
2021-12-09 17:06:02.656  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 4

您需要在 ActiveMqService() 中对您的 javax.jms.ConnectionFactory 调用 setNonBlockingRedelivery(true),例如:

public ActiveMqService() throws JMSException {
    cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
    cf.setNonBlockingRedelivery(true);
    ...
}

此方法的评论指出:

 /**
  * When true a MessageConsumer will not stop Message delivery before re-delivering Messages
  * from a rolled back transaction.  This implies that message order will not be preserved and
  * also will result in the TransactedIndividualAck option to be enabled.
  */

此外,您可能希望将预取大小设置为高于 2 的当前值,因为等待重新传送的 2 条消息将阻止从代理中获取其他消息。