使用 spring 集成在 2 个 MQ 代理之间保留消息 ID

Retaining message id between 2 MQ brokers using spring integration

我正在使用由 message-driven-channel-adapter => 频道 => outbound-channel-adapter 组成的消息工作流。它的目的是将消息从一个 MqSeries 代理传输到另一个 MQSeries 代理。它是事务性的(需要确认)

相关部分在下面(有些部分明显遗漏了。如果您认为需要它们,我会编辑我的 post 并添加它们)。

我的问题是关于消息 headers,特别是 msgId。 当我在入站 queue 中放入带有 messageId 的消息时,我希望它在整个管道中保持不变。
但是 messageId 在出站 queue 中被转换,其内容被生成的 ID 替换,包括出站 queue 经理名称。

来自发射器(它只是一个可能的发射代码的例子。我使用的每个代码都有同样的问题,只要我提供一个 msgId):

com.ibm.mq.MQMessage message = new MQMessage(); 
message.messageId=("TEST MessageId 1234").getBytes();

并且来自 MQExplorer:

可能有一个明显的(但对我来说不是)原因,但我现在不明白。 我读到(嗯?) message Id 可以由 QM 从特定场景或特定命令生成。 但我看不出它如何应用于 spring 集成。

有人知道 Spring 集成如何处理 messageId 以及我如何在整个管道中保留相同信息吗?

<beans>
        
    <int:channel id="channelMQ_MQ" ></int:channel>

    <!-- Source : MQseries -->
    <!- ... -->
    <bean id="jmsQueue" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
        ...       
    </bean>
    <!- ... -->
    <bean id="myListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
        <property name="autoStartup" value="false" />
        <property name="connectionFactory" ref="connectionFactoryCaching" />
        <property name="destination" ref="jmsQueue" />
        <!- ... -->         
        <property name="sessionTransacted" value="true"/>
    </bean>
    
    <int-jms:message-driven-channel-adapter 
        id="jmsIn" 
        container="myListener" 
        channel="channelMQ_MQ" 
        error-channel="processChannel1"/>
                                    

    <!-- Destination MQ_SERIES      -->
        <!- ... -->
    <bean id="jmsQueue2" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
        ...
    </bean>
    
    <int-jms:outbound-channel-adapter   channel="channelMQ_MQ" 
                                        id="jmsOut2" 
                                        destination="jmsQueue2" 
                                        connection-factory="connectionFactoryCaching2" 
                                        delivery-persistent="true" 
                                        explicit-qos-enabled="true" 
                                        session-transacted="true" >
    </int-jms:outbound-channel-adapter>

                                        

</beans>

编辑 1:

按照@artem-bilan的建议,我设置了一个header-enricher。 但是 atm,这根本不起作用... None 的属性已设置。

    <int:channel id="channel_tmp">
    </int:channel>
    
    
    <int:header-enricher input-channel="channelMQ_MQ"  output-channel="channel_tmp"  id="headerEnricher1">
        <int:header name="MSI" expression="headers.jms_messageId"/>
        <int:header name="JMS_IBM_MQMD_MsgId" expression="headers.jms_messageId"/>
        <int:header name="MSGID" expression="headers.jms_messageId"/>
        <int:header name="MsgId" expression="headers.jms_messageId"/>
        <int:header name="CorrelId" expression="headers.jms_messageId"/>
        <int:header name="GroupId" expression="headers.jms_messageId"/>
        <int:header name="MsggSeqNumber" expression="headers.jms_messageId"/>
        <int:header name="offset" expression="headers.jms_messageId"/>
    </int:header-enricher>
    
    <int-jms:outbound-channel-adapter   channel="channel_tmp" 
                                        id="jmsOut2" 
                                        destination="jmsQueue2" 
                                        connection-factory="connectionFactoryCaching2" 
                                        delivery-persistent="true" 
                                        explicit-qos-enabled="true" 
                                        session-transacted="true" >
    </int-jms:outbound-channel-adapter>

编辑 2: 经过一些研究,我们发现 IBM doc 指出 “为了能够设置消息 ID,JMS 目的地 queue 需要 属性 'MQMD WRITE ENABLE " 设置为启用。 属性 允许 JMS 应用程序设置 MQMD 字段的值。" 所以我们尝试从我们的 JmsQueue 中设置这个 属性 :

    <bean id="jmsQueue2" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
        ...
      <property name="MQMDWriteEnabled" value="true"></property>
      <property name="MQMDMessageContext" value="2"></property>
    </bean>

不幸的是,虽然很有希望,但这对 messageId 不起作用(但其他 MQMD 字段有效)。

编辑3:

按照 Artem Bilan 关于调试 JmsHeaderMapper 的建议,听起来我们发现 header 映射器不支持字节数组(spring 集成版本:5.3.2.RELEASE),但 IBM 期望...这导致 header 基本上被跳过。 因此,这不会那样工作:

    <int:header name="JMS_IBM_MQMD_MsgId" expression="headers['jms_messageId'].bytes"/>

编辑 4:

在注意到 spring-integration-jms 的当前版本不接受“byte[]”类型(IBM MSGID 类型)后,我们添加了一个自定义 header 映射器。它有效,但我们必须从 already-mapped 消息(看起来像“ID:3214F1044...”)中检索(十六进制到字节)它并将它作为字节数组传递到 header 作为“JMS_IBM_MQMD_MsgId" 属性。 这是一个可疑的解决方案,因为三重转换 (MQ [BYTE24] => JMS [ID:String] => Java [Byte[]] => MQ[BYTE24] )
最终,我们发现可以配置入站 queue 以及出站 queue,例如它们将传递所有上下文(jms-mapped headers 以及原始 MQ 的) .因此,我们不必进行复杂的映射....只需进行基本映射(因为 byte[] 仍未在 defaultHeaderMapper 中进行映射)。

所以最终的解决方案是:

<bean id="jmsQueueIN" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
  ...
  <property name="MQMDMessageContext" value="2"></property>
  <property name="MQMDReadEnabled" value="true"></property>
</bean>

<bean id="jmsQueueOut" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
    ...
    <property name="MQMDWriteEnabled" value="true"></property>
    <property name="MQMDMessageContext" value="2"></property>       
</bean>


<bean id="mqCompatibleJmsHeaderMapper" class="com.my.company.mappers.MqCompatibleJmsHeaderMapper"/> 
<int-jms:outbound-channel-adapter   channel="channel_MQ_MQ" 
                                    id="jmsOut" 
                                    destination="jmsQueueOUT" 
                                    ... 
                                    header-mapper="mqCompatibleJmsHeaderMapper">
...                                     
</int-jms:outbound-channel-adapter>

_

public class MqCompatibleJmsHeaderMapper extends DefaultJmsHeaderMapper {
...
  public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
    Object messageId = headers.get(WMQConstants.JMS_IBM_MQMD_MSGID);
    if(messageId !=null) {
        if (messageId instanceof byte[]) {
            jmsMessage.setObjectProperty(WMQConstants.JMS_IBM_MQMD_MSGID, messageId);
        }else  {
         ...
        }
    }
    super.fromHeaders(headers, jmsMessage);
  }

...
}

It didn't seen that obvious to me.... do you mean that the behavior is "normal" and that I should use a headerMapper to change it, ie remove the "jms_" prefix?

嗯,header 在 DefaultJmsHeaderMapper 中的映射是这样的:headers.put(JmsHeaders.MESSAGE_ID, messageId);。所以,它确实是 jms_messageId。而且在出站端确实没有映射:

if (StringUtils.hasText(headerName) &&
                    !headerName.startsWith(JmsHeaders.PREFIX) &&
                    jmsMessage.getObjectProperty(headerName) == null) {

我认为有理由忽略它们,因为并非所有 JMS 供应商都允许覆盖所有这些 org.springframework.jms.support.JmsHeaders

对于您的 use-case,您可以在 <int-jms:outbound-channel-adapter> 之前执行此操作:

<header-enricher>
    <header name="messageId" expression="headers.jms_messageId"/>
</header-enricher>

这个答案计算了帮助解决我们问题的各种(而且非常有用!)评论和答案。

在注意到 spring-integration-jms 的当前版本不接受“byte[]”类型(IBM MSGID 类型)后,我们添加了一个自定义 header 映射器。

原始 RAW msgId 无法从入站中获得:相反,我们收到了 JMS-mapped heades 例如:

JMSXAppID=com.my.company.test.MqProducer
jms_replyTo=queue://QM2/QUEUE.OUT.MOBA?targetClient=1 jms_correlationId=ID:4142434423313233343536373839305f3100000000000000 jms_messageId=ID:4142434423313233343536373839305f3100000000000000

它们是通过入站适配器附带的侦听器以这种方式接收的。
因此,初稿使用 headerMapper 将 jmsMessageId (Id:...) 转换为 IBM Mq BYTE24 comptabile(因此:byte[] 转换为“JMS_IBM_MQMD_MsgId" 属性)

但这是一个危险的解决方案,因为三重转换 (MQ [BYTE24] => JMS [ID:String] => Java [Byte[]] => MQ[BYTE24] )

最终,我们发现可以配置入站 queue 以及出站 queue,例如它们将传递所有上下文(jms-mapped headers 以及作为原始 MQMD 的):使用 MQMMDMessageContext = CMQC.MQPMO_SET_ALL_CONTEXT (2) 和 MQMDRead/WriteEnabled = true(根据它是否是入站还是出站 queue)。 这样,所有必填字段从一开始就可用:

JMS_IBM_MQMD_PutApplName=CustomOwnApplName
JMSXAppID=com.my.company.test.MqProducer
JMS_IBM_MQMD_ReplyToQ=
jms_replyTo=queue://QM2/QUEUE.OUT.MOBA?targetClient=1
JMS_IBM_MQMD_CorrelId=[B@47e0d39f
jms_correlationId=ID:4142434423313233343536373839305f3100000000000000
JMS_IBM_MQMD_MsgId=[B@399141ee
jms_messageId=ID:4142434423313233343536373839305f3100000000000000

因此,我们不必进行可疑映射....只需进行基本映射(因为 byte[] 仍未在 defaultHeaderMapper 中进行映射)。

所以最终的解决方案是:

<bean id="jmsQueueIN" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
  ...
  <property name="MQMDMessageContext" value="2"></property>
  <property name="MQMDReadEnabled" value="true"></property>
</bean>

<bean id="jmsQueueOut" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
    ...
    <property name="MQMDWriteEnabled" value="true"></property>
    <property name="MQMDMessageContext" value="2"></property>       
</bean>


<bean id="mqCompatibleJmsHeaderMapper" class="com.my.company.mappers.MqCompatibleJmsHeaderMapper"/> 
<int-jms:outbound-channel-adapter   channel="channel_MQ_MQ" 
                                    id="jmsOut" 
                                    destination="jmsQueueOUT" 
                                    ... 
                                    header-mapper="mqCompatibleJmsHeaderMapper">
...                                     
</int-jms:outbound-channel-adapter>

_

public class MqCompatibleJmsHeaderMapper extends DefaultJmsHeaderMapper {
...
  public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
    Object messageId = headers.get(WMQConstants.JMS_IBM_MQMD_MSGID);
    if(messageId !=null) {
        if (messageId instanceof byte[]) {
            jmsMessage.setObjectProperty(WMQConstants.JMS_IBM_MQMD_MSGID, messageId);
        }else  {
         ...
        }
    }
    super.fromHeaders(headers, jmsMessage);
  }

...
}