_AMQ_GROUP_ID 出现在消息中但 JMSXGroupID 在@JmsListener 中为空
_AMQ_GROUP_ID present in message but JMSXGroupID null in @JmsListener
来自该文档:
Messages in a message group share the same group id, i.e. they have same group identifier property (JMSXGroupID for JMS, _AMQ_GROUP_ID for Apache ActiveMQ Artemis Core API).
我明白了为什么当我浏览代理中值为 product=paper 的消息时,最初通过 JMSXGroupID
设置的 属性 变成了 _AMQ_GROUP_ID
。但是,在我的 @JmsListener
注释方法中,我可以看到 _AMQ_GROUP_ID
属性 丢失并且 JMSXGroupID
在消息的 headers
哈希图中作为 null 出现。
@JmsListener(destination = "${artemis.destination}", subscription = "${artemis.subscriptionName}",
containerFactory = "containerFactory", concurrency = "15-15")
public void consumeMessage(Message<StatefulSpineEvent<?>> eventMessage)
所以
- 我的 Producer 应用程序在将字符串 属性
JMSXGroupID
设置为 'product=paper' 后将消息发送到 queue
- 当我在 Artemis UI
中浏览该消息的 headers 时,我可以看到 _AMQ_GROUP_ID
的值为 'product=paper'
- 当我调试我的侦听器应用程序并查看 headers 的映射时,
_AMQ_GROUP_ID
不存在并且 JMSXGroupID
的值为 null 而不是 'product=paper'。
字符“=”是否无效或是否存在其他原因?我 运行 无事可做。
编辑,使用新代码:
HeaderMapper:
@Component
public class GroupIdMessageMapper extends SimpleJmsHeaderMapper {
@Override
public MessageHeaders toHeaders(Message jmsMessage) {
MessageHeaders messageHeaders = super.toHeaders(jmsMessage);
Map<String, Object> messageHeadersMap = new HashMap<>(messageHeaders);
try {
messageHeadersMap.put("JMSXGroupID", jmsMessage.getStringProperty("_AMQ_GROUP_ID"));
} catch (JMSException e) {
e.printStackTrace();
}
// can see while debugging that this returns the correct headers
return new MessageHeaders(messageHeadersMap);
}
}
听众:
@Component
public class CustomSpringJmsListener {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
@JmsListener(destination = "local-queue", subscription = "groupid-example",
containerFactory = "myContainerFactory", concurrency = "15-15")
public void receive(Message message) throws JMSException {
LOG.info("Received message: " + message);
}
}
申请代码:
@SpringBootApplication
@EnableJms
public class GroupidApplication implements CommandLineRunner {
private static Logger LOG = LoggerFactory
.getLogger(GroupidApplication.class);
@Autowired
private JmsTemplate jmsTemplate;
@Autowired MessageConverter messageConverter;
public static void main(String[] args) {
LOG.info("STARTING THE APPLICATION");
SpringApplication.run(GroupidApplication.class, args);
LOG.info("APPLICATION FINISHED");
}
@Override
public void run(String... args) {
LOG.info("EXECUTING : command line runner");
jmsTemplate.setPubSubDomain(true);
createAndSendObjectMessage("Message1");
createAndSendTextMessage("Message2");
createAndSendTextMessage("Message3");
createAndSendTextMessage("Message4");
createAndSendTextMessage("Message5");
createAndSendTextMessage("Message6");
}
private void createAndSendTextMessage(String messageBody) {
jmsTemplate.send("local-queue", session -> {
Message message = session.createTextMessage(messageBody);
message.setStringProperty("JMSXGroupID", "product=paper");
return message;
});
}
// BEANS
@Bean
public JmsListenerContainerFactory<?> myContainerFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
factory.setSubscriptionDurable(true);
factory.setSubscriptionShared(true);
factory.setMessageConverter(messagingMessageConverter());
return factory;
}
@Bean
public MessagingMessageConverter messagingMessageConverter() {
return new MessagingMessageConverter(messageConverter, new GroupIdMessageMapper());
}
}
调用 SimpleJmsHeaderMapper 的堆栈跟踪:
toHeaders:130, SimpleJmsHeaderMapper (org.springframework.jms.support)
toHeaders:57, SimpleJmsHeaderMapper (org.springframework.jms.support)
extractHeaders:148, MessagingMessageConverter
(org.springframework.jms.support.converter) access0:466,
AbstractAdaptableMessageListener$MessagingMessageConverterAdapter
(org.springframework.jms.listener.adapter) getHeaders:552,
AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage
(org.springframework.jms.listener.adapter) resolveArgumentInternal:68,
HeaderMethodArgumentResolver
(org.springframework.messaging.handler.annotation.support)
resolveArgument:100, AbstractNamedValueMethodArgumentResolver
(org.springframework.messaging.handler.annotation.support)
resolveArgument:117, HandlerMethodArgumentResolverComposite
(org.springframework.messaging.handler.invocation)
getMethodArgumentValues:148, InvocableHandlerMethod
(org.springframework.messaging.handler.invocation) invoke:116,
InvocableHandlerMethod
(org.springframework.messaging.handler.invocation) invokeHandler:114,
MessagingMessageListenerAdapter
(org.springframework.jms.listener.adapter) onMessage:77,
MessagingMessageListenerAdapter
(org.springframework.jms.listener.adapter) doInvokeListener:736,
AbstractMessageListenerContainer (org.springframework.jms.listener)
invokeListener:696, AbstractMessageListenerContainer
(org.springframework.jms.listener) doExecuteListener:674,
AbstractMessageListenerContainer (org.springframework.jms.listener)
doReceiveAndExecute:318, AbstractPollingMessageListenerContainer
(org.springframework.jms.listener) receiveAndExecute:257,
AbstractPollingMessageListenerContainer
(org.springframework.jms.listener) invokeListener:1190,
DefaultMessageListenerContainer$AsyncMessageListenerInvoker
(org.springframework.jms.listener) executeOngoingLoop:1180,
DefaultMessageListenerContainer$AsyncMessageListenerInvoker
(org.springframework.jms.listener) run:1077,
DefaultMessageListenerContainer$AsyncMessageListenerInvoker
(org.springframework.jms.listener) run:748, Thread (java.lang)
尝试继承 SimpleJmsHeaderMapper
并覆盖 toHeaders()
。调用 super.toHeaders()
,从结果中创建一个新的 Map<>
; put()
您想要添加到地图中的任何其他 headers 和 return 地图中的新 MessageHeaders
。
将自定义映射器传递到新的 MessagingMessageConverter
并将其传递到容器工厂。
如果您使用 Spring 引导,只需将转换器添加为 @Bean
,引导将自动将其连接到工厂。
编辑
毕竟;我刚写了一个应用程序,它对我来说工作得很好,根本没有任何定制......
@SpringBootApplication
public class So58399905Application {
public static void main(String[] args) {
SpringApplication.run(So58399905Application.class, args);
}
@JmsListener(destination = "foo")
public void listen(String in, MessageHeaders headers) {
System.out.println(in + headers);
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> template.convertAndSend("foo", "bar", msg -> {
msg.setStringProperty("JMSXGroupID", "product=x");
return msg;
});
}
}
和
bar{jms_redelivered=false, JMSXGroupID=product=x, jms_deliveryMode=2, JMSXDeliveryCount=1, ...
EDIT2
这是 artemis 客户端中的一个错误 - 2.6.4 (Boot 2.1.9) 仅 getStringProperty()
returns _AMQ_GROUP_ID
属性 的值当得到 JMSXGroupID
。
映射器使用 getObjectProperty()
,return 为空。用2.10.1客户端;消息正确 returns _AMQ_GROUP_ID
属性 来自 getObjectProperty()
.
的值
来自该文档:
Messages in a message group share the same group id, i.e. they have same group identifier property (JMSXGroupID for JMS, _AMQ_GROUP_ID for Apache ActiveMQ Artemis Core API).
我明白了为什么当我浏览代理中值为 product=paper 的消息时,最初通过 JMSXGroupID
设置的 属性 变成了 _AMQ_GROUP_ID
。但是,在我的 @JmsListener
注释方法中,我可以看到 _AMQ_GROUP_ID
属性 丢失并且 JMSXGroupID
在消息的 headers
哈希图中作为 null 出现。
@JmsListener(destination = "${artemis.destination}", subscription = "${artemis.subscriptionName}",
containerFactory = "containerFactory", concurrency = "15-15")
public void consumeMessage(Message<StatefulSpineEvent<?>> eventMessage)
所以
- 我的 Producer 应用程序在将字符串 属性
JMSXGroupID
设置为 'product=paper' 后将消息发送到 queue
- 当我在 Artemis UI 中浏览该消息的 headers 时,我可以看到
- 当我调试我的侦听器应用程序并查看 headers 的映射时,
_AMQ_GROUP_ID
不存在并且JMSXGroupID
的值为 null 而不是 'product=paper'。
_AMQ_GROUP_ID
的值为 'product=paper'
字符“=”是否无效或是否存在其他原因?我 运行 无事可做。
编辑,使用新代码:
HeaderMapper:
@Component
public class GroupIdMessageMapper extends SimpleJmsHeaderMapper {
@Override
public MessageHeaders toHeaders(Message jmsMessage) {
MessageHeaders messageHeaders = super.toHeaders(jmsMessage);
Map<String, Object> messageHeadersMap = new HashMap<>(messageHeaders);
try {
messageHeadersMap.put("JMSXGroupID", jmsMessage.getStringProperty("_AMQ_GROUP_ID"));
} catch (JMSException e) {
e.printStackTrace();
}
// can see while debugging that this returns the correct headers
return new MessageHeaders(messageHeadersMap);
}
}
听众:
@Component
public class CustomSpringJmsListener {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
@JmsListener(destination = "local-queue", subscription = "groupid-example",
containerFactory = "myContainerFactory", concurrency = "15-15")
public void receive(Message message) throws JMSException {
LOG.info("Received message: " + message);
}
}
申请代码:
@SpringBootApplication
@EnableJms
public class GroupidApplication implements CommandLineRunner {
private static Logger LOG = LoggerFactory
.getLogger(GroupidApplication.class);
@Autowired
private JmsTemplate jmsTemplate;
@Autowired MessageConverter messageConverter;
public static void main(String[] args) {
LOG.info("STARTING THE APPLICATION");
SpringApplication.run(GroupidApplication.class, args);
LOG.info("APPLICATION FINISHED");
}
@Override
public void run(String... args) {
LOG.info("EXECUTING : command line runner");
jmsTemplate.setPubSubDomain(true);
createAndSendObjectMessage("Message1");
createAndSendTextMessage("Message2");
createAndSendTextMessage("Message3");
createAndSendTextMessage("Message4");
createAndSendTextMessage("Message5");
createAndSendTextMessage("Message6");
}
private void createAndSendTextMessage(String messageBody) {
jmsTemplate.send("local-queue", session -> {
Message message = session.createTextMessage(messageBody);
message.setStringProperty("JMSXGroupID", "product=paper");
return message;
});
}
// BEANS
@Bean
public JmsListenerContainerFactory<?> myContainerFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
factory.setSubscriptionDurable(true);
factory.setSubscriptionShared(true);
factory.setMessageConverter(messagingMessageConverter());
return factory;
}
@Bean
public MessagingMessageConverter messagingMessageConverter() {
return new MessagingMessageConverter(messageConverter, new GroupIdMessageMapper());
}
}
调用 SimpleJmsHeaderMapper 的堆栈跟踪:
toHeaders:130, SimpleJmsHeaderMapper (org.springframework.jms.support) toHeaders:57, SimpleJmsHeaderMapper (org.springframework.jms.support) extractHeaders:148, MessagingMessageConverter (org.springframework.jms.support.converter) access0:466, AbstractAdaptableMessageListener$MessagingMessageConverterAdapter (org.springframework.jms.listener.adapter) getHeaders:552, AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage (org.springframework.jms.listener.adapter) resolveArgumentInternal:68, HeaderMethodArgumentResolver (org.springframework.messaging.handler.annotation.support) resolveArgument:100, AbstractNamedValueMethodArgumentResolver (org.springframework.messaging.handler.annotation.support) resolveArgument:117, HandlerMethodArgumentResolverComposite (org.springframework.messaging.handler.invocation) getMethodArgumentValues:148, InvocableHandlerMethod (org.springframework.messaging.handler.invocation) invoke:116, InvocableHandlerMethod (org.springframework.messaging.handler.invocation) invokeHandler:114, MessagingMessageListenerAdapter (org.springframework.jms.listener.adapter) onMessage:77, MessagingMessageListenerAdapter (org.springframework.jms.listener.adapter) doInvokeListener:736, AbstractMessageListenerContainer (org.springframework.jms.listener) invokeListener:696, AbstractMessageListenerContainer (org.springframework.jms.listener) doExecuteListener:674, AbstractMessageListenerContainer (org.springframework.jms.listener) doReceiveAndExecute:318, AbstractPollingMessageListenerContainer (org.springframework.jms.listener) receiveAndExecute:257, AbstractPollingMessageListenerContainer (org.springframework.jms.listener) invokeListener:1190, DefaultMessageListenerContainer$AsyncMessageListenerInvoker (org.springframework.jms.listener) executeOngoingLoop:1180, DefaultMessageListenerContainer$AsyncMessageListenerInvoker (org.springframework.jms.listener) run:1077, DefaultMessageListenerContainer$AsyncMessageListenerInvoker (org.springframework.jms.listener) run:748, Thread (java.lang)
尝试继承 SimpleJmsHeaderMapper
并覆盖 toHeaders()
。调用 super.toHeaders()
,从结果中创建一个新的 Map<>
; put()
您想要添加到地图中的任何其他 headers 和 return 地图中的新 MessageHeaders
。
将自定义映射器传递到新的 MessagingMessageConverter
并将其传递到容器工厂。
如果您使用 Spring 引导,只需将转换器添加为 @Bean
,引导将自动将其连接到工厂。
编辑
毕竟;我刚写了一个应用程序,它对我来说工作得很好,根本没有任何定制......
@SpringBootApplication
public class So58399905Application {
public static void main(String[] args) {
SpringApplication.run(So58399905Application.class, args);
}
@JmsListener(destination = "foo")
public void listen(String in, MessageHeaders headers) {
System.out.println(in + headers);
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> template.convertAndSend("foo", "bar", msg -> {
msg.setStringProperty("JMSXGroupID", "product=x");
return msg;
});
}
}
和
bar{jms_redelivered=false, JMSXGroupID=product=x, jms_deliveryMode=2, JMSXDeliveryCount=1, ...
EDIT2
这是 artemis 客户端中的一个错误 - 2.6.4 (Boot 2.1.9) 仅 getStringProperty()
returns _AMQ_GROUP_ID
属性 的值当得到 JMSXGroupID
。
映射器使用 getObjectProperty()
,return 为空。用2.10.1客户端;消息正确 returns _AMQ_GROUP_ID
属性 来自 getObjectProperty()
.