在 SessionAwareMessageListener 中出现 RuntimeException 后新消息未被消耗
New messages was not consuming after RuntimeException in SessionAwareMessageListener
我有一个问题 spring jms SessionAwareMessageListener
当我将 RedeliveryPolicy
添加到 ActiveMQConnectionFactory
时,我的消息重新传递是正确的,但是所有下一条消息(新消息)都被卡住了并且没有消耗(class 未调用监听器)
求助,我的代码有什么问题吗?
我的 JmsPublisher:
@Component
public class DocumentMsgSender {
@Autowired
private JmsTemplate jmsTemplate;
public void send(DocumentMsgTemplate message) throws JMSException {
jmsTemplate.send(new DocMsgCreator(message));
}
class DocMsgCreator implements MessageCreator {
DocumentMsgTemplate message;
public DocMsgCreator(DocumentMsgTemplate message) {
this.message = message;
}
@Override
public Message createMessage(Session s) throws JMSException {
Message resultMessage = s.createObjectMessage(message);
resultMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
resultMessage.setJMSPriority(9);
return resultMessage;
}
}
}
我的 Jms 监听器:
@Service
public class DocumentMsgListener implements SessionAwareMessageListener<Message> {
@Autowired
private JmsTemplate sendDocJmsTemplate;
public void onMessage(Message message, Session session) throws JMSException {
throw new RuntimeException("ERROR");
}
}
我的 ActiveMQConnectionFactory 是
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(env.getProperty("activemq.url"));
connectionFactory.setRedeliveryPolicy(getRedeliveryPolicy());
connectionFactory.setMessagePrioritySupported(true);
return connectionFactory;
}
重新投递政策
private RedeliveryPolicy getRedeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(900000L);
redeliveryPolicy.setRedeliveryDelay(900000L);
redeliveryPolicy.setMaximumRedeliveries(48);
return redeliveryPolicy;
}
我的 DefaultMessageListenerContainer 是
@Bean
@Autowired
public DefaultMessageListenerContainer listenerContainer(DocumentMsgListener messageListener) {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setDestinationName("testQueue");
container.setMessageListener(messageListener);
container.setSessionTransacted(true);
return container;
}
我的 JmsTemplate:
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
jmsTemplate.setDefaultDestinationName("testQueue");
jmsTemplate.setDeliveryPersistent(true);
jmsTemplate.setExplicitQosEnabled(true);
return jmsTemplate;
}
答案是
connectionFactory.setNonBlockingRedelivery(真);
我有一个问题 spring jms SessionAwareMessageListener
当我将 RedeliveryPolicy
添加到 ActiveMQConnectionFactory
时,我的消息重新传递是正确的,但是所有下一条消息(新消息)都被卡住了并且没有消耗(class 未调用监听器)
求助,我的代码有什么问题吗?
我的 JmsPublisher:
@Component
public class DocumentMsgSender {
@Autowired
private JmsTemplate jmsTemplate;
public void send(DocumentMsgTemplate message) throws JMSException {
jmsTemplate.send(new DocMsgCreator(message));
}
class DocMsgCreator implements MessageCreator {
DocumentMsgTemplate message;
public DocMsgCreator(DocumentMsgTemplate message) {
this.message = message;
}
@Override
public Message createMessage(Session s) throws JMSException {
Message resultMessage = s.createObjectMessage(message);
resultMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
resultMessage.setJMSPriority(9);
return resultMessage;
}
}
}
我的 Jms 监听器:
@Service
public class DocumentMsgListener implements SessionAwareMessageListener<Message> {
@Autowired
private JmsTemplate sendDocJmsTemplate;
public void onMessage(Message message, Session session) throws JMSException {
throw new RuntimeException("ERROR");
}
}
我的 ActiveMQConnectionFactory 是
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(env.getProperty("activemq.url"));
connectionFactory.setRedeliveryPolicy(getRedeliveryPolicy());
connectionFactory.setMessagePrioritySupported(true);
return connectionFactory;
}
重新投递政策
private RedeliveryPolicy getRedeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(900000L);
redeliveryPolicy.setRedeliveryDelay(900000L);
redeliveryPolicy.setMaximumRedeliveries(48);
return redeliveryPolicy;
}
我的 DefaultMessageListenerContainer 是
@Bean
@Autowired
public DefaultMessageListenerContainer listenerContainer(DocumentMsgListener messageListener) {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setDestinationName("testQueue");
container.setMessageListener(messageListener);
container.setSessionTransacted(true);
return container;
}
我的 JmsTemplate:
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
jmsTemplate.setDefaultDestinationName("testQueue");
jmsTemplate.setDeliveryPersistent(true);
jmsTemplate.setExplicitQosEnabled(true);
return jmsTemplate;
}
答案是
connectionFactory.setNonBlockingRedelivery(真);