'sequenceNumber' header 值必须是整数。但它很长
The 'sequenceNumber' header value must be an Integer. But it is Long
我正在使用 RabbitMQ 3.6.10 UI 发布一条消息,该消息由我的 Java 使用 Spring Integration AMQP 4.3.11 的应用程序接收。该消息是对使用拆分器创建的较早消息的回复,因此它具有 sequenceNumber
和 sequenceSize
header。我将这些 header 复制到回复中,并将它们设置为 RabbitMQ UI 中的类型 Number
。但是,在 Java 方面,我遇到了一个例外:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Message conversion failed
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:223)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=12=]1(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The 'sequenceNumber' header value must be an Integer.
at org.springframework.util.Assert.isTrue(Assert.java:92)
at org.springframework.integration.IntegrationMessageHeaderAccessor.verifyType(IntegrationMessageHeaderAccessor.java:143)
at org.springframework.messaging.support.MessageHeaderAccessor.setHeader(MessageHeaderAccessor.java:298)
at org.springframework.messaging.support.MessageHeaderAccessor.copyHeaders(MessageHeaderAccessor.java:389)
at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:177)
at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:47)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:243)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
... 14 more
我检查了 Java 侧的 sequenceNumber
和 sequenceSize
header 的类型是 Long
,而不是 Integer
.然而,在 RabbitMQ UI 中没有选项可以实现这种差异。消息将由 non-Java 应用程序发送,那么如何确保 header 被 Spring 集成识别为整数?
当我使用 Java 客户端发布回复并将 header 值设置为 Integer
时,消费者会接受它们。所以这可能是 RabbitMQ UI 的限制 header 类型不够具体(例如 32 位与 64 位数字)或 Java 客户端对期望值类型。谁能确认其中之一?
向适配器的侦听器容器添加 MessagePostProcessor
...
@Bean
public AmqpInboundChannelAdapter adapter(ConnectionFactory cf) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer(cf));
adapter.setOutputChannelName("someChannel");
return adapter;
}
@Bean
public AbstractMessageListenerContainer listenerContainer(ConnectionFactory cf) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
container.setQueueNames("foo");
container.setAfterReceivePostProcessors(m -> {
if (m.getMessageProperties().getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER) instanceof Long) {
Integer sequenceNumber = ((Long) m.getMessageProperties().getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)).intValue();
m.getMessageProperties().getHeaders().put(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER,
sequenceNumber);
}
return m;
});
return container;
}
请打开一个 JIRA Issue - 我们可能应该更宽松一些,尤其是当值 < Integer.MAX_VALUE
.
时
我正在使用 RabbitMQ 3.6.10 UI 发布一条消息,该消息由我的 Java 使用 Spring Integration AMQP 4.3.11 的应用程序接收。该消息是对使用拆分器创建的较早消息的回复,因此它具有 sequenceNumber
和 sequenceSize
header。我将这些 header 复制到回复中,并将它们设置为 RabbitMQ UI 中的类型 Number
。但是,在 Java 方面,我遇到了一个例外:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Message conversion failed
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:223)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=12=]1(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The 'sequenceNumber' header value must be an Integer.
at org.springframework.util.Assert.isTrue(Assert.java:92)
at org.springframework.integration.IntegrationMessageHeaderAccessor.verifyType(IntegrationMessageHeaderAccessor.java:143)
at org.springframework.messaging.support.MessageHeaderAccessor.setHeader(MessageHeaderAccessor.java:298)
at org.springframework.messaging.support.MessageHeaderAccessor.copyHeaders(MessageHeaderAccessor.java:389)
at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:177)
at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:47)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:243)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
... 14 more
我检查了 Java 侧的 sequenceNumber
和 sequenceSize
header 的类型是 Long
,而不是 Integer
.然而,在 RabbitMQ UI 中没有选项可以实现这种差异。消息将由 non-Java 应用程序发送,那么如何确保 header 被 Spring 集成识别为整数?
当我使用 Java 客户端发布回复并将 header 值设置为 Integer
时,消费者会接受它们。所以这可能是 RabbitMQ UI 的限制 header 类型不够具体(例如 32 位与 64 位数字)或 Java 客户端对期望值类型。谁能确认其中之一?
向适配器的侦听器容器添加 MessagePostProcessor
...
@Bean
public AmqpInboundChannelAdapter adapter(ConnectionFactory cf) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer(cf));
adapter.setOutputChannelName("someChannel");
return adapter;
}
@Bean
public AbstractMessageListenerContainer listenerContainer(ConnectionFactory cf) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
container.setQueueNames("foo");
container.setAfterReceivePostProcessors(m -> {
if (m.getMessageProperties().getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER) instanceof Long) {
Integer sequenceNumber = ((Long) m.getMessageProperties().getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)).intValue();
m.getMessageProperties().getHeaders().put(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER,
sequenceNumber);
}
return m;
});
return container;
}
请打开一个 JIRA Issue - 我们可能应该更宽松一些,尤其是当值 < Integer.MAX_VALUE
.