使用 spring 集成在 2 个 MQ 代理之间保留消息 ID
Retaining message id between 2 MQ brokers using spring integration
我正在使用由 message-driven-channel-adapter => 频道 => outbound-channel-adapter 组成的消息工作流。它的目的是将消息从一个 MqSeries 代理传输到另一个 MQSeries 代理。它是事务性的(需要确认)
相关部分在下面(有些部分明显遗漏了。如果您认为需要它们,我会编辑我的 post 并添加它们)。
我的问题是关于消息 headers,特别是 msgId。
当我在入站 queue 中放入带有 messageId 的消息时,我希望它在整个管道中保持不变。
但是 messageId 在出站 queue 中被转换,其内容被生成的 ID 替换,包括出站 queue 经理名称。
来自发射器(它只是一个可能的发射代码的例子。我使用的每个代码都有同样的问题,只要我提供一个 msgId):
com.ibm.mq.MQMessage message = new MQMessage();
message.messageId=("TEST MessageId 1234").getBytes();
并且来自 MQExplorer:
- 来自入站 Queue : MessageId = TEST MessageId 1234
- 来自出站 Queue : MessageId = AMQ
<一些随机(?)代码>
可能有一个明显的(但对我来说不是)原因,但我现在不明白。
我读到(嗯?) message Id 可以由 QM 从特定场景或特定命令生成。
但我看不出它如何应用于 spring 集成。
有人知道 Spring 集成如何处理 messageId 以及我如何在整个管道中保留相同信息吗?
<beans>
<int:channel id="channelMQ_MQ" ></int:channel>
<!-- Source : MQseries -->
<!- ... -->
<bean id="jmsQueue" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
</bean>
<!- ... -->
<bean id="myListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
<property name="autoStartup" value="false" />
<property name="connectionFactory" ref="connectionFactoryCaching" />
<property name="destination" ref="jmsQueue" />
<!- ... -->
<property name="sessionTransacted" value="true"/>
</bean>
<int-jms:message-driven-channel-adapter
id="jmsIn"
container="myListener"
channel="channelMQ_MQ"
error-channel="processChannel1"/>
<!-- Destination MQ_SERIES -->
<!- ... -->
<bean id="jmsQueue2" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
</bean>
<int-jms:outbound-channel-adapter channel="channelMQ_MQ"
id="jmsOut2"
destination="jmsQueue2"
connection-factory="connectionFactoryCaching2"
delivery-persistent="true"
explicit-qos-enabled="true"
session-transacted="true" >
</int-jms:outbound-channel-adapter>
</beans>
编辑 1:
按照@artem-bilan的建议,我设置了一个header-enricher。
但是 atm,这根本不起作用... None 的属性已设置。
<int:channel id="channel_tmp">
</int:channel>
<int:header-enricher input-channel="channelMQ_MQ" output-channel="channel_tmp" id="headerEnricher1">
<int:header name="MSI" expression="headers.jms_messageId"/>
<int:header name="JMS_IBM_MQMD_MsgId" expression="headers.jms_messageId"/>
<int:header name="MSGID" expression="headers.jms_messageId"/>
<int:header name="MsgId" expression="headers.jms_messageId"/>
<int:header name="CorrelId" expression="headers.jms_messageId"/>
<int:header name="GroupId" expression="headers.jms_messageId"/>
<int:header name="MsggSeqNumber" expression="headers.jms_messageId"/>
<int:header name="offset" expression="headers.jms_messageId"/>
</int:header-enricher>
<int-jms:outbound-channel-adapter channel="channel_tmp"
id="jmsOut2"
destination="jmsQueue2"
connection-factory="connectionFactoryCaching2"
delivery-persistent="true"
explicit-qos-enabled="true"
session-transacted="true" >
</int-jms:outbound-channel-adapter>
编辑 2:
经过一些研究,我们发现 IBM doc 指出 “为了能够设置消息 ID,JMS 目的地 queue 需要 属性 'MQMD WRITE ENABLE " 设置为启用。 属性 允许 JMS 应用程序设置 MQMD 字段的值。"
所以我们尝试从我们的 JmsQueue 中设置这个 属性 :
<bean id="jmsQueue2" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDWriteEnabled" value="true"></property>
<property name="MQMDMessageContext" value="2"></property>
</bean>
不幸的是,虽然很有希望,但这对 messageId 不起作用(但其他 MQMD 字段有效)。
编辑3:
按照 Artem Bilan 关于调试 JmsHeaderMapper 的建议,听起来我们发现 header 映射器不支持字节数组(spring 集成版本:5.3.2.RELEASE),但 IBM 期望...这导致 header 基本上被跳过。
因此,这不会那样工作:
<int:header name="JMS_IBM_MQMD_MsgId" expression="headers['jms_messageId'].bytes"/>
编辑 4:
在注意到 spring-integration-jms 的当前版本不接受“byte[]”类型(IBM MSGID 类型)后,我们添加了一个自定义 header 映射器。它有效,但我们必须从 already-mapped 消息(看起来像“ID:3214F1044...”)中检索(十六进制到字节)它并将它作为字节数组传递到 header 作为“JMS_IBM_MQMD_MsgId" 属性。
这是一个可疑的解决方案,因为三重转换 (MQ [BYTE24] => JMS [ID:String] => Java [Byte[]] => MQ[BYTE24] )
最终,我们发现可以配置入站 queue 以及出站 queue,例如它们将传递所有上下文(jms-mapped headers 以及原始 MQ 的) .因此,我们不必进行复杂的映射....只需进行基本映射(因为 byte[] 仍未在 defaultHeaderMapper 中进行映射)。
所以最终的解决方案是:
<bean id="jmsQueueIN" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDMessageContext" value="2"></property>
<property name="MQMDReadEnabled" value="true"></property>
</bean>
<bean id="jmsQueueOut" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDWriteEnabled" value="true"></property>
<property name="MQMDMessageContext" value="2"></property>
</bean>
<bean id="mqCompatibleJmsHeaderMapper" class="com.my.company.mappers.MqCompatibleJmsHeaderMapper"/>
<int-jms:outbound-channel-adapter channel="channel_MQ_MQ"
id="jmsOut"
destination="jmsQueueOUT"
...
header-mapper="mqCompatibleJmsHeaderMapper">
...
</int-jms:outbound-channel-adapter>
_
public class MqCompatibleJmsHeaderMapper extends DefaultJmsHeaderMapper {
...
public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
Object messageId = headers.get(WMQConstants.JMS_IBM_MQMD_MSGID);
if(messageId !=null) {
if (messageId instanceof byte[]) {
jmsMessage.setObjectProperty(WMQConstants.JMS_IBM_MQMD_MSGID, messageId);
}else {
...
}
}
super.fromHeaders(headers, jmsMessage);
}
...
}
It didn't seen that obvious to me.... do you mean that the behavior is "normal" and that I should use a headerMapper to change it, ie remove the "jms_" prefix?
嗯,header 在 DefaultJmsHeaderMapper
中的映射是这样的:headers.put(JmsHeaders.MESSAGE_ID, messageId);
。所以,它确实是 jms_messageId
。而且在出站端确实没有映射:
if (StringUtils.hasText(headerName) &&
!headerName.startsWith(JmsHeaders.PREFIX) &&
jmsMessage.getObjectProperty(headerName) == null) {
我认为有理由忽略它们,因为并非所有 JMS 供应商都允许覆盖所有这些 org.springframework.jms.support.JmsHeaders
。
对于您的 use-case,您可以在 <int-jms:outbound-channel-adapter>
之前执行此操作:
<header-enricher>
<header name="messageId" expression="headers.jms_messageId"/>
</header-enricher>
这个答案计算了帮助解决我们问题的各种(而且非常有用!)评论和答案。
在注意到 spring-integration-jms 的当前版本不接受“byte[]”类型(IBM MSGID 类型)后,我们添加了一个自定义 header 映射器。
原始 RAW msgId 无法从入站中获得:相反,我们收到了 JMS-mapped heades
例如:
JMSXAppID=com.my.company.test.MqProducer
jms_replyTo=queue://QM2/QUEUE.OUT.MOBA?targetClient=1
jms_correlationId=ID:4142434423313233343536373839305f3100000000000000
jms_messageId=ID:4142434423313233343536373839305f3100000000000000
它们是通过入站适配器附带的侦听器以这种方式接收的。
因此,初稿使用 headerMapper 将 jmsMessageId (Id:...) 转换为 IBM Mq BYTE24 comptabile(因此:byte[] 转换为“JMS_IBM_MQMD_MsgId" 属性)
但这是一个危险的解决方案,因为三重转换 (MQ [BYTE24] => JMS [ID:String] => Java [Byte[]] => MQ[BYTE24] )
最终,我们发现可以配置入站 queue 以及出站 queue,例如它们将传递所有上下文(jms-mapped headers 以及作为原始 MQMD 的):使用 MQMMDMessageContext = CMQC.MQPMO_SET_ALL_CONTEXT (2) 和 MQMDRead/WriteEnabled = true(根据它是否是入站还是出站 queue)。
这样,所有必填字段从一开始就可用:
JMS_IBM_MQMD_PutApplName=CustomOwnApplName
JMSXAppID=com.my.company.test.MqProducer
JMS_IBM_MQMD_ReplyToQ=
jms_replyTo=queue://QM2/QUEUE.OUT.MOBA?targetClient=1
JMS_IBM_MQMD_CorrelId=[B@47e0d39f
jms_correlationId=ID:4142434423313233343536373839305f3100000000000000
JMS_IBM_MQMD_MsgId=[B@399141ee
jms_messageId=ID:4142434423313233343536373839305f3100000000000000
因此,我们不必进行可疑映射....只需进行基本映射(因为 byte[] 仍未在 defaultHeaderMapper 中进行映射)。
所以最终的解决方案是:
<bean id="jmsQueueIN" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDMessageContext" value="2"></property>
<property name="MQMDReadEnabled" value="true"></property>
</bean>
<bean id="jmsQueueOut" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDWriteEnabled" value="true"></property>
<property name="MQMDMessageContext" value="2"></property>
</bean>
<bean id="mqCompatibleJmsHeaderMapper" class="com.my.company.mappers.MqCompatibleJmsHeaderMapper"/>
<int-jms:outbound-channel-adapter channel="channel_MQ_MQ"
id="jmsOut"
destination="jmsQueueOUT"
...
header-mapper="mqCompatibleJmsHeaderMapper">
...
</int-jms:outbound-channel-adapter>
_
public class MqCompatibleJmsHeaderMapper extends DefaultJmsHeaderMapper {
...
public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
Object messageId = headers.get(WMQConstants.JMS_IBM_MQMD_MSGID);
if(messageId !=null) {
if (messageId instanceof byte[]) {
jmsMessage.setObjectProperty(WMQConstants.JMS_IBM_MQMD_MSGID, messageId);
}else {
...
}
}
super.fromHeaders(headers, jmsMessage);
}
...
}
我正在使用由 message-driven-channel-adapter => 频道 => outbound-channel-adapter 组成的消息工作流。它的目的是将消息从一个 MqSeries 代理传输到另一个 MQSeries 代理。它是事务性的(需要确认)
相关部分在下面(有些部分明显遗漏了。如果您认为需要它们,我会编辑我的 post 并添加它们)。
我的问题是关于消息 headers,特别是 msgId。
当我在入站 queue 中放入带有 messageId 的消息时,我希望它在整个管道中保持不变。
但是 messageId 在出站 queue 中被转换,其内容被生成的 ID 替换,包括出站 queue 经理名称。
来自发射器(它只是一个可能的发射代码的例子。我使用的每个代码都有同样的问题,只要我提供一个 msgId):
com.ibm.mq.MQMessage message = new MQMessage();
message.messageId=("TEST MessageId 1234").getBytes();
并且来自 MQExplorer:
- 来自入站 Queue : MessageId = TEST MessageId 1234
- 来自出站 Queue : MessageId = AMQ
<一些随机(?)代码>
可能有一个明显的(但对我来说不是)原因,但我现在不明白。 我读到(嗯?) message Id 可以由 QM 从特定场景或特定命令生成。 但我看不出它如何应用于 spring 集成。
有人知道 Spring 集成如何处理 messageId 以及我如何在整个管道中保留相同信息吗?
<beans>
<int:channel id="channelMQ_MQ" ></int:channel>
<!-- Source : MQseries -->
<!- ... -->
<bean id="jmsQueue" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
</bean>
<!- ... -->
<bean id="myListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
<property name="autoStartup" value="false" />
<property name="connectionFactory" ref="connectionFactoryCaching" />
<property name="destination" ref="jmsQueue" />
<!- ... -->
<property name="sessionTransacted" value="true"/>
</bean>
<int-jms:message-driven-channel-adapter
id="jmsIn"
container="myListener"
channel="channelMQ_MQ"
error-channel="processChannel1"/>
<!-- Destination MQ_SERIES -->
<!- ... -->
<bean id="jmsQueue2" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
</bean>
<int-jms:outbound-channel-adapter channel="channelMQ_MQ"
id="jmsOut2"
destination="jmsQueue2"
connection-factory="connectionFactoryCaching2"
delivery-persistent="true"
explicit-qos-enabled="true"
session-transacted="true" >
</int-jms:outbound-channel-adapter>
</beans>
编辑 1:
按照@artem-bilan的建议,我设置了一个header-enricher。 但是 atm,这根本不起作用... None 的属性已设置。
<int:channel id="channel_tmp">
</int:channel>
<int:header-enricher input-channel="channelMQ_MQ" output-channel="channel_tmp" id="headerEnricher1">
<int:header name="MSI" expression="headers.jms_messageId"/>
<int:header name="JMS_IBM_MQMD_MsgId" expression="headers.jms_messageId"/>
<int:header name="MSGID" expression="headers.jms_messageId"/>
<int:header name="MsgId" expression="headers.jms_messageId"/>
<int:header name="CorrelId" expression="headers.jms_messageId"/>
<int:header name="GroupId" expression="headers.jms_messageId"/>
<int:header name="MsggSeqNumber" expression="headers.jms_messageId"/>
<int:header name="offset" expression="headers.jms_messageId"/>
</int:header-enricher>
<int-jms:outbound-channel-adapter channel="channel_tmp"
id="jmsOut2"
destination="jmsQueue2"
connection-factory="connectionFactoryCaching2"
delivery-persistent="true"
explicit-qos-enabled="true"
session-transacted="true" >
</int-jms:outbound-channel-adapter>
编辑 2: 经过一些研究,我们发现 IBM doc 指出 “为了能够设置消息 ID,JMS 目的地 queue 需要 属性 'MQMD WRITE ENABLE " 设置为启用。 属性 允许 JMS 应用程序设置 MQMD 字段的值。" 所以我们尝试从我们的 JmsQueue 中设置这个 属性 :
<bean id="jmsQueue2" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDWriteEnabled" value="true"></property>
<property name="MQMDMessageContext" value="2"></property>
</bean>
不幸的是,虽然很有希望,但这对 messageId 不起作用(但其他 MQMD 字段有效)。
编辑3:
按照 Artem Bilan 关于调试 JmsHeaderMapper 的建议,听起来我们发现 header 映射器不支持字节数组(spring 集成版本:5.3.2.RELEASE),但 IBM 期望...这导致 header 基本上被跳过。 因此,这不会那样工作:
<int:header name="JMS_IBM_MQMD_MsgId" expression="headers['jms_messageId'].bytes"/>
编辑 4:
在注意到 spring-integration-jms 的当前版本不接受“byte[]”类型(IBM MSGID 类型)后,我们添加了一个自定义 header 映射器。它有效,但我们必须从 already-mapped 消息(看起来像“ID:3214F1044...”)中检索(十六进制到字节)它并将它作为字节数组传递到 header 作为“JMS_IBM_MQMD_MsgId" 属性。
这是一个可疑的解决方案,因为三重转换 (MQ [BYTE24] => JMS [ID:String] => Java [Byte[]] => MQ[BYTE24] )
最终,我们发现可以配置入站 queue 以及出站 queue,例如它们将传递所有上下文(jms-mapped headers 以及原始 MQ 的) .因此,我们不必进行复杂的映射....只需进行基本映射(因为 byte[] 仍未在 defaultHeaderMapper 中进行映射)。
所以最终的解决方案是:
<bean id="jmsQueueIN" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDMessageContext" value="2"></property>
<property name="MQMDReadEnabled" value="true"></property>
</bean>
<bean id="jmsQueueOut" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDWriteEnabled" value="true"></property>
<property name="MQMDMessageContext" value="2"></property>
</bean>
<bean id="mqCompatibleJmsHeaderMapper" class="com.my.company.mappers.MqCompatibleJmsHeaderMapper"/>
<int-jms:outbound-channel-adapter channel="channel_MQ_MQ"
id="jmsOut"
destination="jmsQueueOUT"
...
header-mapper="mqCompatibleJmsHeaderMapper">
...
</int-jms:outbound-channel-adapter>
_
public class MqCompatibleJmsHeaderMapper extends DefaultJmsHeaderMapper {
...
public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
Object messageId = headers.get(WMQConstants.JMS_IBM_MQMD_MSGID);
if(messageId !=null) {
if (messageId instanceof byte[]) {
jmsMessage.setObjectProperty(WMQConstants.JMS_IBM_MQMD_MSGID, messageId);
}else {
...
}
}
super.fromHeaders(headers, jmsMessage);
}
...
}
It didn't seen that obvious to me.... do you mean that the behavior is "normal" and that I should use a headerMapper to change it, ie remove the "jms_" prefix?
嗯,header 在 DefaultJmsHeaderMapper
中的映射是这样的:headers.put(JmsHeaders.MESSAGE_ID, messageId);
。所以,它确实是 jms_messageId
。而且在出站端确实没有映射:
if (StringUtils.hasText(headerName) &&
!headerName.startsWith(JmsHeaders.PREFIX) &&
jmsMessage.getObjectProperty(headerName) == null) {
我认为有理由忽略它们,因为并非所有 JMS 供应商都允许覆盖所有这些 org.springframework.jms.support.JmsHeaders
。
对于您的 use-case,您可以在 <int-jms:outbound-channel-adapter>
之前执行此操作:
<header-enricher>
<header name="messageId" expression="headers.jms_messageId"/>
</header-enricher>
这个答案计算了帮助解决我们问题的各种(而且非常有用!)评论和答案。
在注意到 spring-integration-jms 的当前版本不接受“byte[]”类型(IBM MSGID 类型)后,我们添加了一个自定义 header 映射器。
原始 RAW msgId 无法从入站中获得:相反,我们收到了 JMS-mapped heades 例如:
JMSXAppID=com.my.company.test.MqProducer
jms_replyTo=queue://QM2/QUEUE.OUT.MOBA?targetClient=1 jms_correlationId=ID:4142434423313233343536373839305f3100000000000000 jms_messageId=ID:4142434423313233343536373839305f3100000000000000
它们是通过入站适配器附带的侦听器以这种方式接收的。
因此,初稿使用 headerMapper 将 jmsMessageId (Id:...) 转换为 IBM Mq BYTE24 comptabile(因此:byte[] 转换为“JMS_IBM_MQMD_MsgId" 属性)
但这是一个危险的解决方案,因为三重转换 (MQ [BYTE24] => JMS [ID:String] => Java [Byte[]] => MQ[BYTE24] )
最终,我们发现可以配置入站 queue 以及出站 queue,例如它们将传递所有上下文(jms-mapped headers 以及作为原始 MQMD 的):使用 MQMMDMessageContext = CMQC.MQPMO_SET_ALL_CONTEXT (2) 和 MQMDRead/WriteEnabled = true(根据它是否是入站还是出站 queue)。 这样,所有必填字段从一开始就可用:
JMS_IBM_MQMD_PutApplName=CustomOwnApplName
JMSXAppID=com.my.company.test.MqProducer
JMS_IBM_MQMD_ReplyToQ=
jms_replyTo=queue://QM2/QUEUE.OUT.MOBA?targetClient=1
JMS_IBM_MQMD_CorrelId=[B@47e0d39f
jms_correlationId=ID:4142434423313233343536373839305f3100000000000000
JMS_IBM_MQMD_MsgId=[B@399141ee
jms_messageId=ID:4142434423313233343536373839305f3100000000000000
因此,我们不必进行可疑映射....只需进行基本映射(因为 byte[] 仍未在 defaultHeaderMapper 中进行映射)。
所以最终的解决方案是:
<bean id="jmsQueueIN" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDMessageContext" value="2"></property>
<property name="MQMDReadEnabled" value="true"></property>
</bean>
<bean id="jmsQueueOut" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDWriteEnabled" value="true"></property>
<property name="MQMDMessageContext" value="2"></property>
</bean>
<bean id="mqCompatibleJmsHeaderMapper" class="com.my.company.mappers.MqCompatibleJmsHeaderMapper"/>
<int-jms:outbound-channel-adapter channel="channel_MQ_MQ"
id="jmsOut"
destination="jmsQueueOUT"
...
header-mapper="mqCompatibleJmsHeaderMapper">
...
</int-jms:outbound-channel-adapter>
_
public class MqCompatibleJmsHeaderMapper extends DefaultJmsHeaderMapper {
...
public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
Object messageId = headers.get(WMQConstants.JMS_IBM_MQMD_MSGID);
if(messageId !=null) {
if (messageId instanceof byte[]) {
jmsMessage.setObjectProperty(WMQConstants.JMS_IBM_MQMD_MSGID, messageId);
}else {
...
}
}
super.fromHeaders(headers, jmsMessage);
}
...
}