ActiveMQ 重新传递不起作用

ActiveMQ redelivery does not work

我正在尝试使用 ActiveMQ 实现死信队列。不幸的是,这方面的文档在某些方面相当模糊,我似乎无法正确设置所有内容。

我配置了以下 Bean:

@Bean
public JmsTemplate createJMSTemplate() {
    logger.info("createJMSTemplate");
    JmsTemplate jmsTemplate = new JmsTemplate(getActiveMQConnectionFactory());
    jmsTemplate.setDefaultDestinationName(queue);
    jmsTemplate.setDeliveryPersistent(true);
    jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
    return jmsTemplate;
}

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(getActiveMQConnectionFactory());
    factory.setConcurrency("1-10");
    factory.setSessionTransacted(false);
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
}

@Bean
public ConnectionFactory getActiveMQConnectionFactory() {
    // Configure the ActiveMQConnectionFactory
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL("tcp://127.0.0.1:61616");
    activeMQConnectionFactory.setTrustedPackages(Arrays.asList("com.company"));

    // Configure the redeliver policy and the dead letter queue
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(0);
    redeliveryPolicy.setRedeliveryDelay(10000);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(3);
    RedeliveryPolicyMap redeliveryPolicyMap = activeMQConnectionFactory.getRedeliveryPolicyMap();
    redeliveryPolicyMap.put(new ActiveMQQueue(queue), redeliveryPolicy);
    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

    return activeMQConnectionFactory;
}

这是我的接收码:

@Autowired
private ConnectionFactory connectionFactory;

private static Logger logger = LoggerFactory.getLogger(QueueReceiver.class);
private Connection connection;
private Session session;
private SegmentReceiver callback;

@PostConstruct
private void init() throws JMSException, InterruptedException {
    logger.info("Initializing QueueReceiver...");
    this.connection = connectionFactory.createConnection();
    this.session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    Queue q = session.createQueue(queue);
    logger.info("Creating consumer for queue '{}'", q.getQueueName());
    MessageConsumer consumer = session.createConsumer(q);
    this.callback = new SegmentReceiver();
    consumer.setMessageListener(callback);
    this.connection.start();
}

@PreDestroy
private void destroy() throws JMSException {
    logger.info("Destroying QueueReceiver...");
    this.session.close();
    this.connection.close();
}

private class SegmentReceiver implements MessageListener {

    @Override
    public void onMessage(Message message) {
        logger.info("onMessage");
        try {
            TextMessage textMessage = (TextMessage) message;
            Segment segment = Segment.fromJSON(textMessage.getText());
            if (segment.shouldFail()) {
                throw new IOException("This segment is expected to fail");
            }
            System.out.println(segment.getText());
            message.acknowledge();
        }
        catch(IOException | JMSException exception) {
            logger.error(exception.toString());
            try {
                QueueReceiver.this.session.rollback();
            } catch (JMSException e) {
                logger.error(e.toString());
            }
            throw new RuntimeException(exception);
        }
    }

}

然而,没有任何反应。我使用默认配置使用开箱即用的 Apache ActiveMQ 5.14.2 安装程序。我在这里错过了什么?

因为你正在使用 this.session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
调用 message.acknowledge(); 与调用 session.acknowledge(); 相同。

要让 ActiveMQ 重新交付与您的配置一起成功工作,有一些可能性只需进行最小的更改:

  1. 呼叫QueueReceiver.this.session.recover();
    代替调用 QueueReceiver.this.session.rollback();

void org.apache.activemq.ActiveMQSession.recover() throws JMSException

Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.

All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all messages that have been delivered to the client.

Restarting a session causes it to take the following actions: •Stop message delivery •Mark all messages that might have been delivered but not acknowledged as "redelivered" •Restart the delivery sequence including all unacknowledged messages that had been previously delivered. Redelivered messages do not have to be delivered in exactly their original delivery order.

  1. 使用 this.session = connection.createSession(false, org.apache.activemq.ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); 并打电话 ((org.apache.activemq.command.ActiveMQMessage) message ).acknowledge(); ,请注意,不调用此方法就像回滚,意味着消息未被确认并且在 onMessage 方法中抛出异常将调用 [=46 的 QueueReceiver.this.consumer.rollback(); =]().

  2. 只需调用 QueueReceiver.this.consumer.rollback(); org.apache.activemq.ActiveMQMessageConsumer.rollback() 代替调用 QueueReceiver.this.session.rollback();

所以它被证明是问题的组合:

  • 需要将会话确认模式设置为ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE
  • 我正在使用 session.recover() 而不是 rollback()
  • 未正确配置 ActiveMQ 代理。我需要将此位添加到 activemq.xml 配置文件(将其放在 <broker> 标记下)。

        <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" >
                <!-- The constantPendingMessageLimitStrategy is used to prevent
                     slow topic consumers to block producers and affect other consumers
                     by limiting the number of messages that are retained
                     For more information, see:
    
                     http://activemq.apache.org/slow-consumer-handling.html
    
                -->
              <pendingMessageLimitStrategy> 
                <constantPendingMessageLimitStrategy limit="1000"/>
              </pendingMessageLimitStrategy>
            </policyEntry>
            <!-- Set the following policy on all queues using the '>' wildcard -->
            <policyEntry queue=">">
                <deadLetterStrategy>
                    <!--
                      Use the prefix 'DLQ.' for the destination name, and make
                      the DLQ a queue rather than a topic
                    -->
                    <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
                </deadLetterStrategy>
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>
    
  • 确保您没有激活任何可能会扰乱您的 ActiveMQConnectionFactory 配置的 redeliveryPlugin