'sequenceNumber' header 值必须是整数。但它很长

The 'sequenceNumber' header value must be an Integer. But it is Long

我正在使用 RabbitMQ 3.6.10 UI 发布一条消息,该消息由我的 Java 使用 Spring Integration AMQP 4.3.11 的应用程序接收。该消息是对使用拆分器创建的较早消息的回复,因此它具有 sequenceNumbersequenceSize header。我将这些 header 复制到回复中,并将它们设置为 RabbitMQ UI 中的类型 Number。但是,在 Java 方面,我遇到了一个例外:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Message conversion failed
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:223)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=12=]1(SimpleMessageListenerContainer.java:97)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:97)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The 'sequenceNumber' header value must be an Integer.
    at org.springframework.util.Assert.isTrue(Assert.java:92)
    at org.springframework.integration.IntegrationMessageHeaderAccessor.verifyType(IntegrationMessageHeaderAccessor.java:143)
    at org.springframework.messaging.support.MessageHeaderAccessor.setHeader(MessageHeaderAccessor.java:298)
    at org.springframework.messaging.support.MessageHeaderAccessor.copyHeaders(MessageHeaderAccessor.java:389)
    at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:177)
    at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:47)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:243)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
    ... 14 more

我检查了 Java 侧的 sequenceNumbersequenceSize header 的类型是 Long,而不是 Integer.然而,在 RabbitMQ UI 中没有选项可以实现这种差异。消息将由 non-Java 应用程序发送,那么如何确保 header 被 Spring 集成识别为整数?

当我使用 Java 客户端发布回复并将 header 值设置为 Integer 时,消费者会接受它们。所以这可能是 RabbitMQ UI 的限制 header 类型不够具体(例如 32 位与 64 位数字)或 Java 客户端对期望值类型。谁能确认其中之一?

向适配器的侦听器容器添加 MessagePostProcessor...

@Bean
public AmqpInboundChannelAdapter adapter(ConnectionFactory cf) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer(cf));
    adapter.setOutputChannelName("someChannel");
    return adapter;
}

@Bean
public AbstractMessageListenerContainer listenerContainer(ConnectionFactory cf) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
    container.setQueueNames("foo");
    container.setAfterReceivePostProcessors(m -> {
        if (m.getMessageProperties().getHeaders()
                .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER) instanceof Long) {
            Integer sequenceNumber = ((Long) m.getMessageProperties().getHeaders()
                    .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)).intValue();
            m.getMessageProperties().getHeaders().put(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER,
                    sequenceNumber);
        }
        return m;
    });
    return container;
}

请打开一个 JIRA Issue - 我们可能应该更宽松一些,尤其是当值 < Integer.MAX_VALUE.