JMS 2.0 - MQ 9 - 主题共享订阅不起作用
JMS 2.0 - MQ 9 - Topic Shared subscription doesn't work
我在开发订阅 MQ 主题(MQ 版本 9)的应用程序时遇到问题。
我需要进行共享主题连接,因为应用程序将 运行 在多个实例(集群)中。
规格和文档说:
"A non-durable shared subscription is used by a client which needs to be able to share the work of receiving messages from a topic subscription amongst multiple consumers. A non-durable shared subscription may therefore have more than one consumer. Each message from the subscription will be delivered to only one of the consumers on that subscription."
对我来说,所有使用相同订阅名称的客户端都在同一个"cluster",一次只有一个客户端会收到一条消息。
在我的代码中,受此 article 启发,当第二个客户端尝试创建共享订阅时出现异常。我真的不明白这是 MQ 客户端库实现中的错误还是我的代码中的错误。
这是我的示例代码:
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import com.ibm.mq.jms.MQTopicConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
public class TestGB2 {
public static void main(final String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
new Thread(new MyThread("THREAD" + i, "TESTSUB/#", "myClient", "SUBTEST")).start();
}
}
public static class MyThread implements Runnable {
private final String topicString;
private final String clientId;
private final String subscriptionName;
public MyThread(final String threadName, final String topicString, final String clientId, final String subscriptionName) {
Thread.currentThread().setName(threadName);
this.topicString = topicString;
this.clientId = clientId;
this.subscriptionName = subscriptionName;
}
@Override
public void run() {
try {
System.out.println(String.format("%s : Connecting...", Thread.currentThread().getName()));
MQTopicConnectionFactory cf = new MQTopicConnectionFactory();
cf.setHostName("xxxx");
cf.setPort(1416);
cf.setQueueManager("xxxx");
cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
cf.setChannel("xxx");
cf.setClientID(clientId);
Connection con = cf.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
con.start();
Topic topic = session.createTopic(topicString);
MessageConsumer messageConsumer = session.createSharedConsumer(topic, subscriptionName); // fail here
System.out.println(String.format("%s : Waiting for a message...", Thread.currentThread().getName()));
Message msg = messageConsumer.receive();
System.out.println(String.format("%s : Received :\n%s", Thread.currentThread().getName(), msg));
}
catch (Exception ex) {
System.out.println(String.format("%s : FAILED", Thread.currentThread().getName()));
ex.printStackTrace();
}
}
}
}
下面的代码尝试创建 10 个线程来消费同一主题的消息。只有第一个线程能够连接,所有其他线程都失败并出现以下异常:
com.ibm.msg.client.jms.DetailedIllegalStateException: JMSWMQ0026: Failed to subscribe to topic 'TESTSUB' with selector 'none' using MQSUB.
There may have been a problem creating the subscription due to it being used by another message consumer.
Make sure any message consumers using this subscription are closed before trying to create a new subscription under the same name. Please see the linked exception for more information.
at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:472)
at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:214)
at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:212)
at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:112)
at com.ibm.msg.client.wmq.internal.WMQConsumerShadow.initialize(WMQConsumerShadow.java:1038)
at com.ibm.msg.client.wmq.internal.WMQSyncConsumerShadow.initialize(WMQSyncConsumerShadow.java:134)
at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.<init>(WMQMessageConsumer.java:470)
at com.ibm.msg.client.wmq.internal.WMQSession.createSharedConsumer(WMQSession.java:938)
at com.ibm.msg.client.jms.internal.JmsSessionImpl.createSharedConsumer(JmsSessionImpl.java:4228)
at com.ibm.msg.client.jms.internal.JmsSessionImpl.createSharedConsumer(JmsSessionImpl.java:4125)
at com.ibm.mq.jms.MQSession.createSharedConsumer(MQSession.java:1319)
at TestGB.lambda[=11=](TestGB.java:33)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2042' ('MQRC_OBJECT_IN_USE').
at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:202)
... 11 more
尝试使用最后一个库:
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.1.1.0</version>
</dependency>
问题总结
问题不在于您的程序,问题在于与您订阅的主题关联的模型队列。
在队列管理器上,如果您查看订阅将匹配的主题对象,它将有一个指向模型队列的参数 MNDURMDL
。
如果您查看模型队列,您会注意到两个参数,其中一个或两个都可能导致您收到错误:
[ DEFSOPT( EXCL | SHARED ) ]
[ SHARE | NOSHARE ]
这些必须设置为 DEFSOPT(SHARED)
和 SHARE
。如果其中一个设置为另一个值,您将只能在共享订阅上拥有一个订阅者。
问题原因的其他详细信息
使用 IBM MQ Pub/Sub,当您创建 JMS 订阅时,MQ 将其视为托管订阅,IBM MQ 将在后台创建一个临时队列来订阅主题字符串。如果它是非持久订阅,则队列是临时动态队列。
失败的原因是第一个线程将以独占模式打开临时动态队列,任何其他线程都无法打开临时动态队列,您会收到 MQRC_OBJECT_IN_USE
错误。
创建应用程序特定 MNDURMDL
模型队列的可能原因
我怀疑这是因为 IBM 附带了一些不同的默认模型队列。
非持久订阅者的默认设置如下:
QUEUE(SYSTEM.NDURABLE.MODEL.QUEUE) TYPE(QMODEL)
DEFSOPT(SHARED) SHARE
还有另一个非 pub/sub 特定的默认队列,它具有以下设置:
QUEUE(SYSTEM.DEFAULT.MODEL.QUEUE) TYPE(QMODEL)
DEFSOPT(EXCL) NOSHARE
您的主题对象创建的模型队列很可能是使用如下命令创建的,该命令默认使用 SYSTEM.DEFAULT.MODEL.QUEUE
.:
的设置
DEFINE QMODEL(xxx)
将来您可以专门设置这两个参数,或者用 LIKE
关键字定义它以强制它使用不同的队列来建模设置,两个命令如下:
DEFINE QMODEL(xxx) DEFSOPT(SHARED) SHARE
DEFINE QMODEL(xxx) LIKE(SYSTEM.NDURABLE.MODEL.QUEUE)
有关创建和使用特定于应用程序的 TOPIC 对象和 MODEL 队列的其他详细信息
默认情况下,树的根节点由名为 SYSTEM.BASE.TOPIC
的标准 TOPIC 对象表示,与此 TOPIC 关联的默认模型队列如下所示:
TOPIC(SYSTEM.BASE.TOPIC) TYPE(LOCAL)
TOPICSTR() MDURMDL(SYSTEM.DURABLE.MODEL.QUEUE)
MNDURMDL(SYSTEM.NDURABLE.MODEL.QUEUE)
如果您不定义任何进一步的管理 TOPIC 对象,则所有主题都与 SYSTEM.BASE.TOPIC
匹配。此外,如果您没有定义任何进一步的管理 TOPIC 对象,并且您想要授予应用程序对主题树的特定子集的权限(例如以 TESTSUB
开头的主题字符串),您必须通过 [=25= 授予权限],这反过来又授予应用程序访问任何任意主题字符串的权限。
最佳做法是创建一个 TOPIC 对象,其主题字符串与应用程序应有权访问的主题树部分相匹配。具体到您的 TESTSUB/#
示例,如果您的管理员定义了一个新的 TOPIC 对象并指定了 TOPICSTR(TESTSUB)
,则默认值会像这样创建它:
TOPIC(TESTSUB.TOPIC) TYPE(LOCAL)
TOPICSTR(TESTSUB) MDURMDL( )
MNDURMDL( )
空白 MDURMDL
和 MNDURMDL
值告诉 MQ 使用树中下一个最接近的更高主题对象的值,如果没有定义其他内容,这将是 SYSTEM.BASE.TOPIC
并且模型队列仍将默认使用 SYSTEM.DURABLE.MODEL.QUEUE
和 SYSTEM.NDURABLE.MODEL.QUEUE
模型队列。
管理员可以改为创建 TOPIC 对象并指定不同的模型队列,例如:
TOPIC(TESTSUB.TOPIC) TYPE(LOCAL)
TOPICSTR(TESTSUB) MDURMDL(TESTSUB.DURABLE.MODEL.QUEUE)
MNDURMDL(TESTSUB.NDURABLE.MODEL.QUEUE)
通过这样做,他们可以定义特定于应用程序的模型队列,这些模型队列具有共享订阅所需的设置,并且不会影响 SYSTEM 模型队列。另一个好处是他们可以仅为以 TESTSUB
开头的主题字符串提供应用程序权限,例如 TESTSUB/A
或 TESTSUB/B
或 TESTSUB/X/Y/Z
.
我在开发订阅 MQ 主题(MQ 版本 9)的应用程序时遇到问题。
我需要进行共享主题连接,因为应用程序将 运行 在多个实例(集群)中。
规格和文档说: "A non-durable shared subscription is used by a client which needs to be able to share the work of receiving messages from a topic subscription amongst multiple consumers. A non-durable shared subscription may therefore have more than one consumer. Each message from the subscription will be delivered to only one of the consumers on that subscription."
对我来说,所有使用相同订阅名称的客户端都在同一个"cluster",一次只有一个客户端会收到一条消息。
在我的代码中,受此 article 启发,当第二个客户端尝试创建共享订阅时出现异常。我真的不明白这是 MQ 客户端库实现中的错误还是我的代码中的错误。
这是我的示例代码:
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import com.ibm.mq.jms.MQTopicConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
public class TestGB2 {
public static void main(final String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
new Thread(new MyThread("THREAD" + i, "TESTSUB/#", "myClient", "SUBTEST")).start();
}
}
public static class MyThread implements Runnable {
private final String topicString;
private final String clientId;
private final String subscriptionName;
public MyThread(final String threadName, final String topicString, final String clientId, final String subscriptionName) {
Thread.currentThread().setName(threadName);
this.topicString = topicString;
this.clientId = clientId;
this.subscriptionName = subscriptionName;
}
@Override
public void run() {
try {
System.out.println(String.format("%s : Connecting...", Thread.currentThread().getName()));
MQTopicConnectionFactory cf = new MQTopicConnectionFactory();
cf.setHostName("xxxx");
cf.setPort(1416);
cf.setQueueManager("xxxx");
cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
cf.setChannel("xxx");
cf.setClientID(clientId);
Connection con = cf.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
con.start();
Topic topic = session.createTopic(topicString);
MessageConsumer messageConsumer = session.createSharedConsumer(topic, subscriptionName); // fail here
System.out.println(String.format("%s : Waiting for a message...", Thread.currentThread().getName()));
Message msg = messageConsumer.receive();
System.out.println(String.format("%s : Received :\n%s", Thread.currentThread().getName(), msg));
}
catch (Exception ex) {
System.out.println(String.format("%s : FAILED", Thread.currentThread().getName()));
ex.printStackTrace();
}
}
}
}
下面的代码尝试创建 10 个线程来消费同一主题的消息。只有第一个线程能够连接,所有其他线程都失败并出现以下异常:
com.ibm.msg.client.jms.DetailedIllegalStateException: JMSWMQ0026: Failed to subscribe to topic 'TESTSUB' with selector 'none' using MQSUB.
There may have been a problem creating the subscription due to it being used by another message consumer.
Make sure any message consumers using this subscription are closed before trying to create a new subscription under the same name. Please see the linked exception for more information.
at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:472)
at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:214)
at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:212)
at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:112)
at com.ibm.msg.client.wmq.internal.WMQConsumerShadow.initialize(WMQConsumerShadow.java:1038)
at com.ibm.msg.client.wmq.internal.WMQSyncConsumerShadow.initialize(WMQSyncConsumerShadow.java:134)
at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.<init>(WMQMessageConsumer.java:470)
at com.ibm.msg.client.wmq.internal.WMQSession.createSharedConsumer(WMQSession.java:938)
at com.ibm.msg.client.jms.internal.JmsSessionImpl.createSharedConsumer(JmsSessionImpl.java:4228)
at com.ibm.msg.client.jms.internal.JmsSessionImpl.createSharedConsumer(JmsSessionImpl.java:4125)
at com.ibm.mq.jms.MQSession.createSharedConsumer(MQSession.java:1319)
at TestGB.lambda[=11=](TestGB.java:33)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2042' ('MQRC_OBJECT_IN_USE').
at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:202)
... 11 more
尝试使用最后一个库:
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.1.1.0</version>
</dependency>
问题总结
问题不在于您的程序,问题在于与您订阅的主题关联的模型队列。
在队列管理器上,如果您查看订阅将匹配的主题对象,它将有一个指向模型队列的参数 MNDURMDL
。
如果您查看模型队列,您会注意到两个参数,其中一个或两个都可能导致您收到错误:
[ DEFSOPT( EXCL | SHARED ) ]
[ SHARE | NOSHARE ]
这些必须设置为 DEFSOPT(SHARED)
和 SHARE
。如果其中一个设置为另一个值,您将只能在共享订阅上拥有一个订阅者。
问题原因的其他详细信息
使用 IBM MQ Pub/Sub,当您创建 JMS 订阅时,MQ 将其视为托管订阅,IBM MQ 将在后台创建一个临时队列来订阅主题字符串。如果它是非持久订阅,则队列是临时动态队列。
失败的原因是第一个线程将以独占模式打开临时动态队列,任何其他线程都无法打开临时动态队列,您会收到 MQRC_OBJECT_IN_USE
错误。
创建应用程序特定 MNDURMDL
模型队列的可能原因
我怀疑这是因为 IBM 附带了一些不同的默认模型队列。
非持久订阅者的默认设置如下:
QUEUE(SYSTEM.NDURABLE.MODEL.QUEUE) TYPE(QMODEL)
DEFSOPT(SHARED) SHARE
还有另一个非 pub/sub 特定的默认队列,它具有以下设置:
QUEUE(SYSTEM.DEFAULT.MODEL.QUEUE) TYPE(QMODEL)
DEFSOPT(EXCL) NOSHARE
您的主题对象创建的模型队列很可能是使用如下命令创建的,该命令默认使用 SYSTEM.DEFAULT.MODEL.QUEUE
.:
DEFINE QMODEL(xxx)
将来您可以专门设置这两个参数,或者用 LIKE
关键字定义它以强制它使用不同的队列来建模设置,两个命令如下:
DEFINE QMODEL(xxx) DEFSOPT(SHARED) SHARE
DEFINE QMODEL(xxx) LIKE(SYSTEM.NDURABLE.MODEL.QUEUE)
有关创建和使用特定于应用程序的 TOPIC 对象和 MODEL 队列的其他详细信息
默认情况下,树的根节点由名为 SYSTEM.BASE.TOPIC
的标准 TOPIC 对象表示,与此 TOPIC 关联的默认模型队列如下所示:
TOPIC(SYSTEM.BASE.TOPIC) TYPE(LOCAL)
TOPICSTR() MDURMDL(SYSTEM.DURABLE.MODEL.QUEUE)
MNDURMDL(SYSTEM.NDURABLE.MODEL.QUEUE)
如果您不定义任何进一步的管理 TOPIC 对象,则所有主题都与 SYSTEM.BASE.TOPIC
匹配。此外,如果您没有定义任何进一步的管理 TOPIC 对象,并且您想要授予应用程序对主题树的特定子集的权限(例如以 TESTSUB
开头的主题字符串),您必须通过 [=25= 授予权限],这反过来又授予应用程序访问任何任意主题字符串的权限。
最佳做法是创建一个 TOPIC 对象,其主题字符串与应用程序应有权访问的主题树部分相匹配。具体到您的 TESTSUB/#
示例,如果您的管理员定义了一个新的 TOPIC 对象并指定了 TOPICSTR(TESTSUB)
,则默认值会像这样创建它:
TOPIC(TESTSUB.TOPIC) TYPE(LOCAL)
TOPICSTR(TESTSUB) MDURMDL( )
MNDURMDL( )
空白 MDURMDL
和 MNDURMDL
值告诉 MQ 使用树中下一个最接近的更高主题对象的值,如果没有定义其他内容,这将是 SYSTEM.BASE.TOPIC
并且模型队列仍将默认使用 SYSTEM.DURABLE.MODEL.QUEUE
和 SYSTEM.NDURABLE.MODEL.QUEUE
模型队列。
管理员可以改为创建 TOPIC 对象并指定不同的模型队列,例如:
TOPIC(TESTSUB.TOPIC) TYPE(LOCAL)
TOPICSTR(TESTSUB) MDURMDL(TESTSUB.DURABLE.MODEL.QUEUE)
MNDURMDL(TESTSUB.NDURABLE.MODEL.QUEUE)
通过这样做,他们可以定义特定于应用程序的模型队列,这些模型队列具有共享订阅所需的设置,并且不会影响 SYSTEM 模型队列。另一个好处是他们可以仅为以 TESTSUB
开头的主题字符串提供应用程序权限,例如 TESTSUB/A
或 TESTSUB/B
或 TESTSUB/X/Y/Z
.