_AMQ_GROUP_ID 出现在消息中但 JMSXGroupID 在@JmsListener 中为空

_AMQ_GROUP_ID present in message but JMSXGroupID null in @JmsListener

来自该文档:

Messages in a message group share the same group id, i.e. they have same group identifier property (JMSXGroupID for JMS, _AMQ_GROUP_ID for Apache ActiveMQ Artemis Core API).

我明白了为什么当我浏览代理中值为 product=paper 的消息时,最初通过 JMSXGroupID 设置的 属性 变成了 _AMQ_GROUP_ID。但是,在我的 @JmsListener 注释方法中,我可以看到 _AMQ_GROUP_ID 属性 丢失并且 JMSXGroupID 在消息的 headers 哈希图中作为 null 出现。

@JmsListener(destination = "${artemis.destination}", subscription = "${artemis.subscriptionName}",
            containerFactory = "containerFactory", concurrency = "15-15")
public void consumeMessage(Message<StatefulSpineEvent<?>> eventMessage)

所以

  1. 我的 Producer 应用程序在将字符串 属性 JMSXGroupID 设置为 'product=paper'
  2. 后将消息发送到 queue
  3. 当我在 Artemis UI
  4. 中浏览该消息的 headers 时,我可以看到 _AMQ_GROUP_ID 的值为 'product=paper'
  5. 当我调试我的侦听器应用程序并查看 headers 的映射时,_AMQ_GROUP_ID 不存在并且 JMSXGroupID 的值为 null 而不是 'product=paper'。

字符“=”是否无效或是否存在其他原因?我 运行 无事可做。

编辑,使用新代码:

HeaderMapper:

@Component
public class GroupIdMessageMapper extends SimpleJmsHeaderMapper {

    @Override
    public MessageHeaders toHeaders(Message jmsMessage) {

        MessageHeaders messageHeaders = super.toHeaders(jmsMessage);

        Map<String, Object> messageHeadersMap = new HashMap<>(messageHeaders);

        try {
            messageHeadersMap.put("JMSXGroupID", jmsMessage.getStringProperty("_AMQ_GROUP_ID"));
        } catch (JMSException e) {
            e.printStackTrace();
        }

        // can see while debugging that this returns the correct headers
        return new MessageHeaders(messageHeadersMap);
    }
}

听众:

@Component
public class CustomSpringJmsListener {

    protected final Logger LOG = LoggerFactory.getLogger(getClass());

    @JmsListener(destination = "local-queue", subscription = "groupid-example",
            containerFactory = "myContainerFactory", concurrency = "15-15")
    public void receive(Message message) throws JMSException {
        LOG.info("Received message: " + message);
    }
}

申请代码:

@SpringBootApplication
@EnableJms
public class GroupidApplication implements CommandLineRunner {

    private static Logger LOG = LoggerFactory
            .getLogger(GroupidApplication.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired MessageConverter messageConverter;

    public static void main(String[] args) {
        LOG.info("STARTING THE APPLICATION");
        SpringApplication.run(GroupidApplication.class, args);

        LOG.info("APPLICATION FINISHED");
    }

    @Override
    public void run(String... args) {
        LOG.info("EXECUTING : command line runner");

        jmsTemplate.setPubSubDomain(true);

        createAndSendObjectMessage("Message1");
        createAndSendTextMessage("Message2");
        createAndSendTextMessage("Message3");
        createAndSendTextMessage("Message4");
        createAndSendTextMessage("Message5");
        createAndSendTextMessage("Message6");
    }

    private void createAndSendTextMessage(String messageBody) {
        jmsTemplate.send("local-queue", session -> {
            Message message = session.createTextMessage(messageBody);

            message.setStringProperty("JMSXGroupID", "product=paper");

            return message;
        });
    }

    // BEANS

    @Bean
    public JmsListenerContainerFactory<?> myContainerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        // This provides all boot's default to this factory, including the message converter
        configurer.configure(factory, connectionFactory);
        // You could still override some of Boot's default if necessary.
        factory.setSubscriptionDurable(true);
        factory.setSubscriptionShared(true);
        factory.setMessageConverter(messagingMessageConverter());

        return factory;
    }

    @Bean
    public MessagingMessageConverter messagingMessageConverter() {
        return new MessagingMessageConverter(messageConverter, new GroupIdMessageMapper());
    }
}

调用 SimpleJmsHeaderMapper 的堆栈跟踪:

toHeaders:130, SimpleJmsHeaderMapper (org.springframework.jms.support) toHeaders:57, SimpleJmsHeaderMapper (org.springframework.jms.support) extractHeaders:148, MessagingMessageConverter (org.springframework.jms.support.converter) access0:466, AbstractAdaptableMessageListener$MessagingMessageConverterAdapter (org.springframework.jms.listener.adapter) getHeaders:552, AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage (org.springframework.jms.listener.adapter) resolveArgumentInternal:68, HeaderMethodArgumentResolver (org.springframework.messaging.handler.annotation.support) resolveArgument:100, AbstractNamedValueMethodArgumentResolver (org.springframework.messaging.handler.annotation.support) resolveArgument:117, HandlerMethodArgumentResolverComposite (org.springframework.messaging.handler.invocation) getMethodArgumentValues:148, InvocableHandlerMethod (org.springframework.messaging.handler.invocation) invoke:116, InvocableHandlerMethod (org.springframework.messaging.handler.invocation) invokeHandler:114, MessagingMessageListenerAdapter (org.springframework.jms.listener.adapter) onMessage:77, MessagingMessageListenerAdapter (org.springframework.jms.listener.adapter) doInvokeListener:736, AbstractMessageListenerContainer (org.springframework.jms.listener) invokeListener:696, AbstractMessageListenerContainer (org.springframework.jms.listener) doExecuteListener:674, AbstractMessageListenerContainer (org.springframework.jms.listener) doReceiveAndExecute:318, AbstractPollingMessageListenerContainer (org.springframework.jms.listener) receiveAndExecute:257, AbstractPollingMessageListenerContainer (org.springframework.jms.listener) invokeListener:1190, DefaultMessageListenerContainer$AsyncMessageListenerInvoker (org.springframework.jms.listener) executeOngoingLoop:1180, DefaultMessageListenerContainer$AsyncMessageListenerInvoker (org.springframework.jms.listener) run:1077, DefaultMessageListenerContainer$AsyncMessageListenerInvoker (org.springframework.jms.listener) run:748, Thread (java.lang)

尝试继承 SimpleJmsHeaderMapper 并覆盖 toHeaders()。调用 super.toHeaders(),从结果中创建一个新的 Map<>put() 您想要添加到地图中的任何其他 headers 和 return 地图中的新 MessageHeaders

将自定义映射器传递到新的 MessagingMessageConverter 并将其传递到容器工厂。

如果您使用 Spring 引导,只需将转换器添加为 @Bean,引导将自动将其连接到工厂。

编辑

毕竟;我刚写了一个应用程序,它对我来说工作得很好,根本没有任何定制......

@SpringBootApplication
public class So58399905Application {

    public static void main(String[] args) {
        SpringApplication.run(So58399905Application.class, args);
    }

    @JmsListener(destination = "foo")
    public void listen(String in, MessageHeaders headers) {
        System.out.println(in + headers);
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> template.convertAndSend("foo", "bar", msg -> {
            msg.setStringProperty("JMSXGroupID", "product=x");
            return msg;
        });
    }

}

bar{jms_redelivered=false, JMSXGroupID=product=x, jms_deliveryMode=2, JMSXDeliveryCount=1,  ...

EDIT2

这是 artemis 客户端中的一个错误 - 2.6.4 (Boot 2.1.9) 仅 getStringProperty() returns _AMQ_GROUP_ID 属性 的值当得到 JMSXGroupID

映射器使用 getObjectProperty(),return 为空。用2.10.1客户端;消息正确 returns _AMQ_GROUP_ID 属性 来自 getObjectProperty().

的值