在 ActiveMQ.Advisory.Expired.Queue 中迭代非持久性 activemq 过期消息
Iterate over non-persistent activemq expired messages in ActiveMQ.Advisory.Expired.Queue
我正在 activemq 上构建一个应用程序,我正在从我的交付模式为 NON_PERSISTENT 的生产者发送消息(我不处理 PERSISTENT 交付模式,我知道它将存储在 DLQ 中- 这不是我的设计),并使用 producer.setTimeToLive(2000) 设置了消息的生存时间。正如功能所说,消息将在 2 秒后过期。
我看到过期消息在 activeMQ 管理控制台主题部分的 ActiveMQ.Advisory.Expired.Queue 中排队,即 http://localhost:8161/admin/topics.jsp.
我的问题是如何遍历 ActiveMQ.Advisory.Expired.Queue 以便我可以访问过期 MessageID =31=] 代码示例会很棒。
对目的地 ActiveMQ.Advisory.Expired.Queue 的订阅就像任何主题,它 returns 是一个 ActiveMQMessage。
可以通过 ActiveMQMessage 的 getDataStructure 方法检索 DataStructure 对象(ConsumerInfo、ProducerInfo、ConnectionInfo...)。
文档http://activemq.apache.org/advisory-message.html
示例:
Destination advisoryDestination = AdvisorySupport.getExpiredQueueMessageAdvisoryTopic(destination)
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);
public void onMessage(Message msg){
String messageId = msg.getJMSMessageID();
String orignalMessageId = msg.getStringProperty(org.apache.activemq.advisory.AdvisorySupport.MSG_PROPERTY_MESSAGE_ID);
if (msg instanceof ActiveMQMessage){
try {
ActiveMQMessage aMsg = (ActiveMQMessage)msg;
ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
} catch (JMSException e) {
log.error("Failed to process message: " + msg);
}
}
}
我正在 activemq 上构建一个应用程序,我正在从我的交付模式为 NON_PERSISTENT 的生产者发送消息(我不处理 PERSISTENT 交付模式,我知道它将存储在 DLQ 中- 这不是我的设计),并使用 producer.setTimeToLive(2000) 设置了消息的生存时间。正如功能所说,消息将在 2 秒后过期。
我看到过期消息在 activeMQ 管理控制台主题部分的 ActiveMQ.Advisory.Expired.Queue 中排队,即 http://localhost:8161/admin/topics.jsp.
我的问题是如何遍历 ActiveMQ.Advisory.Expired.Queue 以便我可以访问过期 MessageID =31=] 代码示例会很棒。
对目的地 ActiveMQ.Advisory.Expired.Queue 的订阅就像任何主题,它 returns 是一个 ActiveMQMessage。 可以通过 ActiveMQMessage 的 getDataStructure 方法检索 DataStructure 对象(ConsumerInfo、ProducerInfo、ConnectionInfo...)。
文档http://activemq.apache.org/advisory-message.html
示例:
Destination advisoryDestination = AdvisorySupport.getExpiredQueueMessageAdvisoryTopic(destination)
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);
public void onMessage(Message msg){
String messageId = msg.getJMSMessageID();
String orignalMessageId = msg.getStringProperty(org.apache.activemq.advisory.AdvisorySupport.MSG_PROPERTY_MESSAGE_ID);
if (msg instanceof ActiveMQMessage){
try {
ActiveMQMessage aMsg = (ActiveMQMessage)msg;
ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
} catch (JMSException e) {
log.error("Failed to process message: " + msg);
}
}
}