如何将消息发送到 IBM MQ 集群中不同 queue 管理器和主机名中托管的不同 Queue

How to send message to different Queue hosted in different queue manager and hostname in IBM MQ cluster

我基于 Apache-camel 的应用正在使用来自 IBM queue 之一的消息,例如以下是连接工厂的详细信息

hostname=host1000
QManager=QM1000
Port="some port"
Channel="common channel"

Camel 流消费和处理并将响应发送到来自消息 header 的 ReplyQueue。

 from(wmq:queue:<INPUT_QUEUE>)
.bean("processBean")
.bean("beanToSendMsgToReplyQueue")

在骆驼 header 中,我正在低于 JMSReplyQueue。您可以看到它是一个不同的 Queue 管理器,而这个 queue 管理器来自不同的主机,但在集群环境中。

JMSReplyTo = queue://QM1012/TEST.REPLY?targetClient=1

还有 queue 经理介于两者之间。喜欢

queue://<queue-manager>//<queue-name>?<other parameters>

下面是我在发送消息时遇到的异常。

ERROR o.apache.camel.processor.DefaultErrorHandler:215 - Failed delivery for (MessageId: ID-xxxxxxxxx-0-4 on ExchangeId: ID-xxxxxx-42443-1492594420697-0-1). Exhausted after delivery attempt: 1 caught: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: wmq://queue://QM1012/TEST.REPLY?targetClient=1 due to: Failed to resolve endpoint: wmq://queue://TAP2001R5/TEST?targetClient=1 due to: There are 1 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{targetClient=1}]. Processed by failure processor: FatalFallbackErrorHandler[Pipeline[[Channel[sendTo(Endpoint[wmq://queue:BACKOUT_Q])], Channel[DelegateSync[com.xxx.yyy.listener.XXXOnExceptionProcessor@21c66ee4]], Channel[Stop]]]]

任何人都可以帮助我向不同的 queue-manager queue 发送消息,它们在不同的主机中,但都在同一个集群中。 queue-manager 名称也出现在字符串的中间,那么如何解决这个问题。 如果您需要更多详细信息,请告诉我。

更新-1: 尝试使用相同的 queue 管理器但没有参数

JMSReplyTo = queue://QM1000/QUEUE_V1 下面的异常我得到

org.springframework.jms.InvalidDestinationException: JMSWMQ2008: Failed to open MQ queue 'QM1000/QUEUE_V1'.; nested exception is com.ibm.msg.client.jms.DetailedInvalidDestinationException: JMSWMQ2008: Failed to open MQ queue 'QM1000/QUEUE_V1'. JMS attempted to perform an MQOPEN, but WebSphere MQ reported an error. Use the linked exception to determine the cause of this error. Check that the specified queue and queue manager are defined correctly.; nested exception is com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2189' ('MQRC_CLUSTER_RESOLUTION_ERROR').

Update-2

我可以使用纯 javax.jms.* 和 com.ibm.mq.jms.* api 向 JMSReplyTo 发送消息,但不能通过 Apache camel。 Camel user/developer 组的任何人都可以帮助我使用 camel 组件处理相同的问题。

@Override
public void process(Exchange exchange)
    throws Exception {

    QueueConnection m_connection = this.connectionFactory.createQueueConnection();
    //m_connection.start();
    boolean transacted = false;

    QueueSession session = m_connection.createQueueSession(transacted, QueueSession.AUTO_ACKNOWLEDGE);
    TextMessage outMessage = session.createTextMessage();
    outMessage.setText(exchange.getIn().getBody());
    MQQueue mq = new MQQueue(
        "queue://QM1012/TEST.REPLY");
    QueueSender queueSender = session.createSender((MQQueue) mq);
    queueSender.send(outMessage);

    /* producerTemplate.send("wmq:" + "queue://QM1012/TEST.REPLY", exchange); */
}

您想与两个不同的 queue 管理器通信,因此您需要相应地定义两个 Camel JMS 组件实例。 Camel 无法神奇地知道 QM1000 或 QM1012 的含义以及如何访问 QM。

您首先需要两个 WMQ QM 的两个 JMS 连接工厂实例。如何获取这些取决于你的执行环境。在 JEE 服务器上,可以在配置后使用 JNDI 访问连接池。查看有关如何设置 JMS 池的应用服务器文档。如果您 运行 stand-alone 想要 XA 事务,请查看 Spring JMS 连接缓存或 Atomikos。 假设 QM1000 的 CF 是 sourceCF,QM1012 的 CF 是 targetCF。

现在您可以定义两个 Camel JMS 组件实例,每个 QM 一个。将连接工厂注入 JMS 组件 (.setConnectionFactory(...))。 假设您定义了一个 id "jmssource" 的 Camel JMS 组件,注入 sourceCF。在 JMS 组件 ID "jmstarget" 中注入 targetCF。 如何做到这一点取决于您的环境(JEE/CDI、Spring、普通 Java)。看看Whosebug,有例子。

您现在可以使用以下语法在 Camel 路由上指定 Camel JMS 生产者和消费者:

.from("jmssource:INPUT_QUEUE")
  ...
  (do some processing)
  ...
  .to("jmstarget:QUEUE_V1")

您不能使用 Camel 的 JMS reply-to 逻辑(使用 JMSReplyTo header)回复另一个 queue 经理。我认为这是 JMS 标准不允许的。您需要通过发送回复 queue.

来明确回复

要设置 targetClient 选项,目标解析器可能会有用:

import org.springframework.jms.support.destination.DynamicDestinationResolver;
import org.springframework.jms.support.destination.DestinationResolver;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import com.ibm.mq.jms.JMSC;
import com.ibm.mq.jms.MQDestination;

public class WMQDestinationResolver extends DynamicDestinationResolver implements DestinationResolver {
  private int targetClient = JMSC.MQJMS_CLIENT_JMS_COMPLIANT;

  public void setTargetClient(int targetClient) {
    this.targetClient = targetClient;
  }

  public Destination resolveDestinationName(Session session, String destinationName, boolean isPubSubDomain) throws JMSException {
    Destination destination = super.resolveDestinationName(session, destinationName, isPubSubDomain);
    if (destination instanceof MQDestination) {
      MQDestination mqDestination = (MQDestination) destination;
      mqDestination.setTargetClient(targetClient);
    }
    return destination;
  }
}

首先感谢大家的支持。 我的用例如下(上面也有)。

Connect to a MQ host(hostname, queueManager, port, channel) using Apache Camel and consume message from queue which is belongs to the same host/Qmanager. message is coming with replyToQueue (JMSReplyTo) header value. The value of ReplyToQueue (JMSReplyTo) is as follows

for e.g.

queue://Different_QueueManager_in_Cluster/TEST.REPLY?mdReadEnabled=true&messageBody=0&mdWriteEnabled=true&XMSC_WMQ_REPLYTO_STYLE=1&targetClient=1

现在的问题是,当连接 object 连接到上述主机和 queue 管理器时,如何将回复消息发送到具有不同 queue 管理器的不同 queue .

NOTE: All MQ queue managers are in clustered environment.

解决方案一: 例如

form(wmq:queue:INPUT_MSG_Q)
 .bean(requestProcessor)
 .bean(responseProcessor)

Apache Camel 默认处理 ReplyToQ (JMSReplyTo)。如果您不想向 ReplyToQ (JMSReplyTo) 发送回复,那么在使用

时使用 disableReplyTo=true

NOTE: While sending to queue://Different_QueueManager_in_Cluster/TEST.REPLY, using same connection/connection factory, MQ cluster will check that message has to go to specified queue manager with specified queue in Cluster. Regarding following parameters ?mdReadEnabled=true&messageBody=0&mdWriteEnabled=true&XMSC_WMQ_REPLYTO_STYLE=1&targetClient=1, Apache Camel is able to resolve automatically without using any 3nd party resolver while auto reply to JMSReplyTo.

方案二:

使用 disableReplyTo=true 禁用自动回复并从 header 获取 queue 详细信息并使用普通 javax.jms.* 和 com.ibm.mq.jms.* api的。代码如下。

@Override
public void process(Exchange exchange)
    throws Exception {

    QueueConnection m_connection = this.connectionFactory.createQueueConnection();
    //m_connection.start();
    boolean transacted = false;

    QueueSession session = m_connection.createQueueSession(transacted, QueueSession.AUTO_ACKNOWLEDGE);
    TextMessage outMessage = session.createTextMessage();
    outMessage.setText(exchange.getIn().getBody());
    MQQueue mq = new MQQueue(
        "queue://Different_QueueManager_in_Cluster/TEST.REPLY");
    QueueSender queueSender = session.createSender((MQQueue) mq);
    queueSender.send(outMessage);

    /* producerTemplate.send("wmq:" + "queue://Different_QueueManager_in_Cluster/TEST.REPLY", exchange); */
}

对于参数,使用@Sebastian Brandt 提到的目标解析器 (post)