在 ActiveMQ.Advisory.Expired.Queue 中迭代非持久性 activemq 过期消息

Iterate over non-persistent activemq expired messages in ActiveMQ.Advisory.Expired.Queue

我正在 activemq 上构建一个应用程序,我正在从我的交付模式为 NON_PERSISTENT 的生产者发送消息(我不处理 PERSISTENT 交付模式,我知道它将存储在 DLQ 中- 这不是我的设计),并使用 producer.setTimeToLive(2000) 设置了消息的生存时间。正如功能所说,消息将在 2 秒后过期。

我看到过期消息在 activeMQ 管理控制台主题部分的 ActiveMQ.Advisory.Expired.Queue 中排队,即 http://localhost:8161/admin/topics.jsp.

我的问题是如何遍历 ActiveMQ.Advisory.Expired.Queue 以便我可以访问过期 MessageID =31=] 代码示例会很棒。

对目的地 ActiveMQ.Advisory.Expired.Queue 的订阅就像任何主题,它 returns 是一个 ActiveMQMessage。 可以通过 ActiveMQMessage 的 getDataStructure 方法检索 DataStructure 对象(ConsumerInfo、ProducerInfo、ConnectionInfo...)。

文档http://activemq.apache.org/advisory-message.html

示例:

Destination advisoryDestination = AdvisorySupport.getExpiredQueueMessageAdvisoryTopic(destination)
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);

public void onMessage(Message msg){
    String messageId =   msg.getJMSMessageID();
    String orignalMessageId =   msg.getStringProperty(org.apache.activemq.advisory.AdvisorySupport.MSG_PROPERTY_MESSAGE_ID);
    if (msg instanceof ActiveMQMessage){
        try {
             ActiveMQMessage aMsg =  (ActiveMQMessage)msg;
             ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
        } catch (JMSException e) {
            log.error("Failed to process message: " + msg);
        }
    }
}