IBM MQ 我如何知道我已阅读所有主题?

IBM MQ How do I know I have read all topics?

我有一个动态主题树,每个主题的最后一条消息是 "retained"。当我订阅主题树时(JMS/Java/MQLibs),因为它是动态的并且我订阅的是通配符“#”,我无法提前知道我会收到哪些主题。所以,

  1. 我怎么知道我至少阅读了所有可用主题一次?

  2. 如何知道阅读的主题是订阅前存在的原始"retained"消息还是订阅后的更新? (我是否需要保留 MessageID 与主题的本地映射?我假设我无法将订阅时的时间戳与消息创建时间进行比较,因为服务器和客户端上的时钟可能不同)。

我添加了一些示例代码,以表明我正在访问主题而不是队列。

public class JMSServerSubscriber implements MessageListener { 

public JMSServerSubscriber() throws JMSException { 
    TopicConnection topicCon = JmsTopicConnectionFactory.createTopicConnection(); 
    TopicSession topicSes = topicCon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 
    Topic topic = topicSes.createTopic("#"); 
    TopicSubscriber topicSub = topicSes.createSubscriber(topic); 
    topicSub.setMessageListener(this); 
    topicCon.start(); 
} 

    @Override 
    public void onMessage(Message arg0) { 
            BytesMessage bm = (BytesMessage) arg0; 
            try { 
                    String destination = bm.getJMSDestination().toString(); 
            } catch (JMSException e) { 
                    e.printStackTrace(); 
            }  
    } 
} 

当没有更多发布时,consumer.receive() 方法将抛出异常,原因代码为 MQRC 2033。您可以使用原因代码来确认您已阅读所有出版物。

在收到消息时调用getIntProperty(JmsConstants.JMS_IBM_RETAIN);了解该出版物是否保留。

这是我使用 PCF 的解决方案。请原谅任何语法错误。

public class TopicChecker implements MessageListener { 

    PCFMessageAgent           agent; 
    PCFMessage                pcfm; 
    Set<String>               allTopics; 
    TopicConnection           topicCon; 
    TopicSession              topicSes; 
    Topic                     topic; 
    TopicSubscriber           topicSub; 
    JmsTopicConnectionFactory jcf; 
    int                       total; 

    public TopicChecker() throws Exception { 
        allTopics = new HashSet<>(); 
        MQEnvironment.hostname = "myMqServer"; 
        MQEnvironment.channel = "MY.CHANNEL"; 
        MQEnvironment.port = 1515; 
        MQQueueManager m = new MQQueueManager("MY.QUEUE.MANAGER"); 
        agent = new PCFMessageAgent(m); 
        pcfm = new PCFMessage(MQConstants.MQCMD_INQUIRE_TOPIC_STATUS); 
        pcfm.addParameter(MQConstants.MQCA_TOPIC_STRING, "MyRoot/#"); 
        PCFMessage[] responses = agent.send(pcfm); 

        for (PCFMessage response: responses) { 
            /* We only publish to leaf nodes, so ignore branches. */
            if (response.getIntParameterValue(MQConstants.MQIACF_RETAINED_PUBLICATION) > 0) { 
            allTopics.add(response.getStringParameterValue(MQConstants.MQCA_TOPIC_STRING)); 
            } 
        } 

        total = allTopics.size(); 
        agent.disconnect(); 

        jcf = ...
        topicCon = jcf.createTopicConnection(); 
        topicSes = topicCon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 
        topic = topicSes.createTopic("MyRoot/#"); 
        topicSub = topicSes.createSubscriber(topic); 
        topicSub.setMessageListener(this); 
        topicCon.start(); 
    } 

    @Override 
    public void onMessage(Message m) { 
     try { 
        String topicString = m.getJMSDestination().toString().replaceAll("topic://", ""); 
        allTopics.remove(topicString); 
        System.out.println("Read : " + topicString + " " + allTopics.size() + " of " + total + " remaining."); 
        if (allTopics.size() == 0) System.out.println("---------------------DONE----------------------"); 
        } catch (JMSException e) { 
               e.printStackTrace(); 
        } 
   }


   public static void main(String[] args) throws Exception { 
       TopicChecker tc = new TopicChecker(); 
       while (tc.allTopics.size() != 0); 
   } 
} 

if (response.getIntParameterValue(MQConstants.MQIACF_RETAINED_PUBLICATION) > 0) { allTopics.add(response.getStringParameterValue(MQConstants.MQCA_TOPIC_STRING)); }

从上面的状态响应来看,要读取主题名称,实际上应该使用"MQCA_ADMIN_TOPIC_NAME"。