无效流 header - 在 rabbitmq 控制台上手动发布消息时
invalid stream header - when publishing message manually on rabbitmq console
我有一个包含 spring-jms、spring-boot 和 rabbitmq-jms 的应用程序。
当我尝试通过应用程序成功发送和接收消息时。
但是当我在 rabbitmq 控制台上手动发布消息时,我在应用程序侦听时遇到以下错误:
com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 61736466
at com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1140) ~[rabbitmq-jms-2.3.0.jar:2.3.0]
at com.rabbitmq.jms.client.RMQMessage.convertJmsMessage(RMQMessage.java:913) ~[rabbitmq-jms-2.3.0.jar:2.3.0]
at com.rabbitmq.jms.client.RMQMessage.convertMessage(RMQMessage.java:907) ~[rabbitmq-jms-2.3.0.jar:2.3.0]
at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:356) ~[rabbitmq-jms-2.3.0.jar:2.3.0]
at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:269) ~[rabbitmq-jms-2.3.0.jar:2.3.0]
at org.springframework.jms.support.destination.JmsDestinationAccessor.receiveFromConsumer(JmsDestinationAccessor.java:132) ~[spring-jms-5.3.13.jar:5.3.13]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:418) ~[spring-jms-5.3.13.jar:5.3.13]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:303) ~[spring-jms-5.3.13.jar:5.3.13]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) ~[spring-jms-5.3.13.jar:5.3.13]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237) ~[spring-jms-5.3.13.jar:5.3.13]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) ~[spring-jms-5.3.13.jar:5.3.13]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) ~[spring-jms-5.3.13.jar:5.3.13]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.io.StreamCorruptedException: invalid stream header: 61736466
at java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:940) ~[na:na]
at java.base/java.io.ObjectInputStream.<init>(ObjectInputStream.java:379) ~[na:na]
at com.rabbitmq.jms.util.WhiteListObjectInputStream.<init>(WhiteListObjectInputStream.java:90) ~[rabbitmq-jms-2.3.0.jar:2.3.0]
at com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1114) ~[rabbitmq-jms-2.3.0.jar:2.3.0]
... 12 common frames omitted
这是我的 JMS 配置
@EnableJms
@Configuration
public class ConnectionRabbitConfig {
@Autowired RabbitProperties rabbitProperties;
@Bean
public ConnectionFactory jmsConnectionFactory() {
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setUsername(rabbitProperties.getUser());
connectionFactory.setPassword(rabbitProperties.getPass());
connectionFactory.setVirtualHost(rabbitProperties.getVirtualhost());
connectionFactory.setHost(rabbitProperties.getHost());
connectionFactory.setPort(rabbitProperties.getPort());
return connectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(@Autowired ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAutoStartup(rabbitProperties.getListenerEnabled());
return factory;
}
@Bean
public JmsTemplate defaultJmsTemplate(@Autowired ConnectionFactory connectionFactory) {
return new JmsTemplate(connectionFactory);
}
}
发布消息
@Autowired
private JmsTemplate jmsTemplate;
public void publish(String queueName, String message, Map<String, Object> headers) {
jmsTemplate.convertAndSend(queueName, message );
}
正在使用消息
@JmsListener(destination = "queue-1")
public void consume(@Payload Message message, @Headers MessageHeaders headers) {
// runs successfully if I listen message added by same application
// but if I try to add manualy on rabbit console I got error.
}
库
implementation 'org.springframework:spring-jms:5.3.13'
implementation 'com.rabbitmq.jms:rabbitmq-jms:2.3.0'
为什么我需要在 rabbit 控制台上添加属性或 headers 才能成功侦听应用程序?或者我需要更改配置代码吗?
只是在这里添加我的解决方案。
默认情况下 spring 将我的听众添加到 queues 上与 non-ampq
的交换 jms.durable.queues
因为这在通过应用程序发布时正常,但在控制台上添加 TextMessage 时抛出错误。在控制台上发布的所有消息都是 non-ampq 消息,在 spring-jms-client lib 上因无效 headers.
而发生冲突
解决方案:
我在 amqp 上添加了一个连接到我的 queues 的目标解析器,但没有创建默认交换 jms.durable.queues
此处解决方案的更多详细信息:https://whosebug.com/a/70269301/7505687
我有一个包含 spring-jms、spring-boot 和 rabbitmq-jms 的应用程序。
当我尝试通过应用程序成功发送和接收消息时。 但是当我在 rabbitmq 控制台上手动发布消息时,我在应用程序侦听时遇到以下错误:
com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 61736466
at com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1140) ~[rabbitmq-jms-2.3.0.jar:2.3.0]
at com.rabbitmq.jms.client.RMQMessage.convertJmsMessage(RMQMessage.java:913) ~[rabbitmq-jms-2.3.0.jar:2.3.0]
at com.rabbitmq.jms.client.RMQMessage.convertMessage(RMQMessage.java:907) ~[rabbitmq-jms-2.3.0.jar:2.3.0]
at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:356) ~[rabbitmq-jms-2.3.0.jar:2.3.0]
at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:269) ~[rabbitmq-jms-2.3.0.jar:2.3.0]
at org.springframework.jms.support.destination.JmsDestinationAccessor.receiveFromConsumer(JmsDestinationAccessor.java:132) ~[spring-jms-5.3.13.jar:5.3.13]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:418) ~[spring-jms-5.3.13.jar:5.3.13]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:303) ~[spring-jms-5.3.13.jar:5.3.13]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) ~[spring-jms-5.3.13.jar:5.3.13]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237) ~[spring-jms-5.3.13.jar:5.3.13]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) ~[spring-jms-5.3.13.jar:5.3.13]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) ~[spring-jms-5.3.13.jar:5.3.13]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.io.StreamCorruptedException: invalid stream header: 61736466
at java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:940) ~[na:na]
at java.base/java.io.ObjectInputStream.<init>(ObjectInputStream.java:379) ~[na:na]
at com.rabbitmq.jms.util.WhiteListObjectInputStream.<init>(WhiteListObjectInputStream.java:90) ~[rabbitmq-jms-2.3.0.jar:2.3.0]
at com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1114) ~[rabbitmq-jms-2.3.0.jar:2.3.0]
... 12 common frames omitted
这是我的 JMS 配置
@EnableJms
@Configuration
public class ConnectionRabbitConfig {
@Autowired RabbitProperties rabbitProperties;
@Bean
public ConnectionFactory jmsConnectionFactory() {
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setUsername(rabbitProperties.getUser());
connectionFactory.setPassword(rabbitProperties.getPass());
connectionFactory.setVirtualHost(rabbitProperties.getVirtualhost());
connectionFactory.setHost(rabbitProperties.getHost());
connectionFactory.setPort(rabbitProperties.getPort());
return connectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(@Autowired ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAutoStartup(rabbitProperties.getListenerEnabled());
return factory;
}
@Bean
public JmsTemplate defaultJmsTemplate(@Autowired ConnectionFactory connectionFactory) {
return new JmsTemplate(connectionFactory);
}
}
发布消息
@Autowired
private JmsTemplate jmsTemplate;
public void publish(String queueName, String message, Map<String, Object> headers) {
jmsTemplate.convertAndSend(queueName, message );
}
正在使用消息
@JmsListener(destination = "queue-1")
public void consume(@Payload Message message, @Headers MessageHeaders headers) {
// runs successfully if I listen message added by same application
// but if I try to add manualy on rabbit console I got error.
}
库
implementation 'org.springframework:spring-jms:5.3.13'
implementation 'com.rabbitmq.jms:rabbitmq-jms:2.3.0'
为什么我需要在 rabbit 控制台上添加属性或 headers 才能成功侦听应用程序?或者我需要更改配置代码吗?
只是在这里添加我的解决方案。
默认情况下 spring 将我的听众添加到 queues 上与 non-ampq
jms.durable.queues
因为这在通过应用程序发布时正常,但在控制台上添加 TextMessage 时抛出错误。在控制台上发布的所有消息都是 non-ampq 消息,在 spring-jms-client lib 上因无效 headers.
而发生冲突解决方案:
我在 amqp 上添加了一个连接到我的 queues 的目标解析器,但没有创建默认交换 jms.durable.queues
此处解决方案的更多详细信息:https://whosebug.com/a/70269301/7505687