IBM MQ 我如何知道我已阅读所有主题?
IBM MQ How do I know I have read all topics?
我有一个动态主题树,每个主题的最后一条消息是 "retained"。当我订阅主题树时(JMS/Java/MQLibs),因为它是动态的并且我订阅的是通配符“#”,我无法提前知道我会收到哪些主题。所以,
我怎么知道我至少阅读了所有可用主题一次?
如何知道阅读的主题是订阅前存在的原始"retained"消息还是订阅后的更新? (我是否需要保留 MessageID 与主题的本地映射?我假设我无法将订阅时的时间戳与消息创建时间进行比较,因为服务器和客户端上的时钟可能不同)。
我添加了一些示例代码,以表明我正在访问主题而不是队列。
public class JMSServerSubscriber implements MessageListener {
public JMSServerSubscriber() throws JMSException {
TopicConnection topicCon = JmsTopicConnectionFactory.createTopicConnection();
TopicSession topicSes = topicCon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = topicSes.createTopic("#");
TopicSubscriber topicSub = topicSes.createSubscriber(topic);
topicSub.setMessageListener(this);
topicCon.start();
}
@Override
public void onMessage(Message arg0) {
BytesMessage bm = (BytesMessage) arg0;
try {
String destination = bm.getJMSDestination().toString();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
当没有更多发布时,consumer.receive() 方法将抛出异常,原因代码为 MQRC 2033。您可以使用原因代码来确认您已阅读所有出版物。
在收到消息时调用getIntProperty(JmsConstants.JMS_IBM_RETAIN);了解该出版物是否保留。
这是我使用 PCF 的解决方案。请原谅任何语法错误。
public class TopicChecker implements MessageListener {
PCFMessageAgent agent;
PCFMessage pcfm;
Set<String> allTopics;
TopicConnection topicCon;
TopicSession topicSes;
Topic topic;
TopicSubscriber topicSub;
JmsTopicConnectionFactory jcf;
int total;
public TopicChecker() throws Exception {
allTopics = new HashSet<>();
MQEnvironment.hostname = "myMqServer";
MQEnvironment.channel = "MY.CHANNEL";
MQEnvironment.port = 1515;
MQQueueManager m = new MQQueueManager("MY.QUEUE.MANAGER");
agent = new PCFMessageAgent(m);
pcfm = new PCFMessage(MQConstants.MQCMD_INQUIRE_TOPIC_STATUS);
pcfm.addParameter(MQConstants.MQCA_TOPIC_STRING, "MyRoot/#");
PCFMessage[] responses = agent.send(pcfm);
for (PCFMessage response: responses) {
/* We only publish to leaf nodes, so ignore branches. */
if (response.getIntParameterValue(MQConstants.MQIACF_RETAINED_PUBLICATION) > 0) {
allTopics.add(response.getStringParameterValue(MQConstants.MQCA_TOPIC_STRING));
}
}
total = allTopics.size();
agent.disconnect();
jcf = ...
topicCon = jcf.createTopicConnection();
topicSes = topicCon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = topicSes.createTopic("MyRoot/#");
topicSub = topicSes.createSubscriber(topic);
topicSub.setMessageListener(this);
topicCon.start();
}
@Override
public void onMessage(Message m) {
try {
String topicString = m.getJMSDestination().toString().replaceAll("topic://", "");
allTopics.remove(topicString);
System.out.println("Read : " + topicString + " " + allTopics.size() + " of " + total + " remaining.");
if (allTopics.size() == 0) System.out.println("---------------------DONE----------------------");
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
TopicChecker tc = new TopicChecker();
while (tc.allTopics.size() != 0);
}
}
if (response.getIntParameterValue(MQConstants.MQIACF_RETAINED_PUBLICATION) > 0) {
allTopics.add(response.getStringParameterValue(MQConstants.MQCA_TOPIC_STRING));
}
从上面的状态响应来看,要读取主题名称,实际上应该使用"MQCA_ADMIN_TOPIC_NAME"。
我有一个动态主题树,每个主题的最后一条消息是 "retained"。当我订阅主题树时(JMS/Java/MQLibs),因为它是动态的并且我订阅的是通配符“#”,我无法提前知道我会收到哪些主题。所以,
我怎么知道我至少阅读了所有可用主题一次?
如何知道阅读的主题是订阅前存在的原始"retained"消息还是订阅后的更新? (我是否需要保留 MessageID 与主题的本地映射?我假设我无法将订阅时的时间戳与消息创建时间进行比较,因为服务器和客户端上的时钟可能不同)。
我添加了一些示例代码,以表明我正在访问主题而不是队列。
public class JMSServerSubscriber implements MessageListener {
public JMSServerSubscriber() throws JMSException {
TopicConnection topicCon = JmsTopicConnectionFactory.createTopicConnection();
TopicSession topicSes = topicCon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = topicSes.createTopic("#");
TopicSubscriber topicSub = topicSes.createSubscriber(topic);
topicSub.setMessageListener(this);
topicCon.start();
}
@Override
public void onMessage(Message arg0) {
BytesMessage bm = (BytesMessage) arg0;
try {
String destination = bm.getJMSDestination().toString();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
当没有更多发布时,consumer.receive() 方法将抛出异常,原因代码为 MQRC 2033。您可以使用原因代码来确认您已阅读所有出版物。
在收到消息时调用getIntProperty(JmsConstants.JMS_IBM_RETAIN);了解该出版物是否保留。
这是我使用 PCF 的解决方案。请原谅任何语法错误。
public class TopicChecker implements MessageListener {
PCFMessageAgent agent;
PCFMessage pcfm;
Set<String> allTopics;
TopicConnection topicCon;
TopicSession topicSes;
Topic topic;
TopicSubscriber topicSub;
JmsTopicConnectionFactory jcf;
int total;
public TopicChecker() throws Exception {
allTopics = new HashSet<>();
MQEnvironment.hostname = "myMqServer";
MQEnvironment.channel = "MY.CHANNEL";
MQEnvironment.port = 1515;
MQQueueManager m = new MQQueueManager("MY.QUEUE.MANAGER");
agent = new PCFMessageAgent(m);
pcfm = new PCFMessage(MQConstants.MQCMD_INQUIRE_TOPIC_STATUS);
pcfm.addParameter(MQConstants.MQCA_TOPIC_STRING, "MyRoot/#");
PCFMessage[] responses = agent.send(pcfm);
for (PCFMessage response: responses) {
/* We only publish to leaf nodes, so ignore branches. */
if (response.getIntParameterValue(MQConstants.MQIACF_RETAINED_PUBLICATION) > 0) {
allTopics.add(response.getStringParameterValue(MQConstants.MQCA_TOPIC_STRING));
}
}
total = allTopics.size();
agent.disconnect();
jcf = ...
topicCon = jcf.createTopicConnection();
topicSes = topicCon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = topicSes.createTopic("MyRoot/#");
topicSub = topicSes.createSubscriber(topic);
topicSub.setMessageListener(this);
topicCon.start();
}
@Override
public void onMessage(Message m) {
try {
String topicString = m.getJMSDestination().toString().replaceAll("topic://", "");
allTopics.remove(topicString);
System.out.println("Read : " + topicString + " " + allTopics.size() + " of " + total + " remaining.");
if (allTopics.size() == 0) System.out.println("---------------------DONE----------------------");
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
TopicChecker tc = new TopicChecker();
while (tc.allTopics.size() != 0);
}
}
if (response.getIntParameterValue(MQConstants.MQIACF_RETAINED_PUBLICATION) > 0) { allTopics.add(response.getStringParameterValue(MQConstants.MQCA_TOPIC_STRING)); }
从上面的状态响应来看,要读取主题名称,实际上应该使用"MQCA_ADMIN_TOPIC_NAME"。