ActiveMQ 消费者等待重新传递的消息而不是处理队列中的其他消息
ActiveMQ consumer waiting for redelivered messages instead of processing other messages in the queue
我正在使用 ActiveMQ 5.15.6,我想从队列中取出 2 条消息,处理它们,然后提交事务会话。
但是,如果抛出 Exception
,消息将被标记为已重新传递并返回到队列,应用程序会等待直到重新传递延迟到期,然后再次尝试处理它们。
我需要处理 then queue 中的其他消息,直到延迟的消息再次准备好发送。
代码只是概念证明:)
public class ActiveMqService {
private final ActiveMQConnectionFactory cf;
private final Connection connection;
private final Session session;
private final Queue queue;
private final MessageConsumer consumer;
public ActiveMqService() throws JMSException {
cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
RedeliveryPolicy redeliveryPolicy = cf.getRedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(5000L);
redeliveryPolicy.setRedeliveryDelay(5000L);
redeliveryPolicy.setMaximumRedeliveries(100);
ActiveMQPrefetchPolicy prefetchPolicy = cf.getPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(2);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
connection = cf.createConnection(); //credentials
session = connection.createSession(true, Session.SESSION_TRANSACTED);
queue = session.createQueue("test");
connection.start();
consumer = session.createConsumer(queue);
}
@PostConstruct
public void read() throws JMSException {
while (true) {
List messages = new ArrayList<Message>();
for (int i = 0; i < 2; i++) {
Message message = consumer.receive(1000L);
if (message != null) {
messages.add(message);
} else {
break; // no more messages available for this batch
}
}
if (messages.size() > 0) {
try {
Random random = new Random();
if (1 == random.nextInt(2)) {
for (Object message : messages) {
TextMessage textMessage = (TextMessage) message;
log.info("Message {}", textMessage.getText());
}
session.commit();
} else {
throw new Exception("vyjimka");
}
} catch (Exception e) {
List mess = new ArrayList();
for (Object message : messages) {
TextMessage textMessage = (TextMessage) message;
mess.add(textMessage.getText());
}
log.info("rollbackuju message {}", mess);
session.rollback();
}
}
}
}
}
在日志中您可以看到,在回滚应用程序等待 5 秒(设置延迟)后重新发送消息,但不处理队列中的其他消息。
2021-12-09 17:05:47.605 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 7
2021-12-09 17:05:47.606 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 3
2021-12-09 17:05:47.612 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : rollbackuju message [8, 5]
2021-12-09 17:05:52.623 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : rollbackuju message [8, 5]
2021-12-09 17:05:57.634 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 8
2021-12-09 17:05:57.635 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 5
2021-12-09 17:05:57.642 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 9
2021-12-09 17:05:57.642 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 2
2021-12-09 17:05:57.648 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : rollbackuju message [6, 4]
2021-12-09 17:06:02.655 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 6
2021-12-09 17:06:02.656 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 4
您需要在 ActiveMqService()
中对您的 javax.jms.ConnectionFactory
调用 setNonBlockingRedelivery(true)
,例如:
public ActiveMqService() throws JMSException {
cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
cf.setNonBlockingRedelivery(true);
...
}
此方法的评论指出:
/**
* When true a MessageConsumer will not stop Message delivery before re-delivering Messages
* from a rolled back transaction. This implies that message order will not be preserved and
* also will result in the TransactedIndividualAck option to be enabled.
*/
此外,您可能希望将预取大小设置为高于 2
的当前值,因为等待重新传送的 2 条消息将阻止从代理中获取其他消息。
我正在使用 ActiveMQ 5.15.6,我想从队列中取出 2 条消息,处理它们,然后提交事务会话。
但是,如果抛出 Exception
,消息将被标记为已重新传递并返回到队列,应用程序会等待直到重新传递延迟到期,然后再次尝试处理它们。
我需要处理 then queue 中的其他消息,直到延迟的消息再次准备好发送。
代码只是概念证明:)
public class ActiveMqService {
private final ActiveMQConnectionFactory cf;
private final Connection connection;
private final Session session;
private final Queue queue;
private final MessageConsumer consumer;
public ActiveMqService() throws JMSException {
cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
RedeliveryPolicy redeliveryPolicy = cf.getRedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(5000L);
redeliveryPolicy.setRedeliveryDelay(5000L);
redeliveryPolicy.setMaximumRedeliveries(100);
ActiveMQPrefetchPolicy prefetchPolicy = cf.getPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(2);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
connection = cf.createConnection(); //credentials
session = connection.createSession(true, Session.SESSION_TRANSACTED);
queue = session.createQueue("test");
connection.start();
consumer = session.createConsumer(queue);
}
@PostConstruct
public void read() throws JMSException {
while (true) {
List messages = new ArrayList<Message>();
for (int i = 0; i < 2; i++) {
Message message = consumer.receive(1000L);
if (message != null) {
messages.add(message);
} else {
break; // no more messages available for this batch
}
}
if (messages.size() > 0) {
try {
Random random = new Random();
if (1 == random.nextInt(2)) {
for (Object message : messages) {
TextMessage textMessage = (TextMessage) message;
log.info("Message {}", textMessage.getText());
}
session.commit();
} else {
throw new Exception("vyjimka");
}
} catch (Exception e) {
List mess = new ArrayList();
for (Object message : messages) {
TextMessage textMessage = (TextMessage) message;
mess.add(textMessage.getText());
}
log.info("rollbackuju message {}", mess);
session.rollback();
}
}
}
}
}
在日志中您可以看到,在回滚应用程序等待 5 秒(设置延迟)后重新发送消息,但不处理队列中的其他消息。
2021-12-09 17:05:47.605 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 7
2021-12-09 17:05:47.606 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 3
2021-12-09 17:05:47.612 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : rollbackuju message [8, 5]
2021-12-09 17:05:52.623 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : rollbackuju message [8, 5]
2021-12-09 17:05:57.634 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 8
2021-12-09 17:05:57.635 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 5
2021-12-09 17:05:57.642 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 9
2021-12-09 17:05:57.642 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 2
2021-12-09 17:05:57.648 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : rollbackuju message [6, 4]
2021-12-09 17:06:02.655 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 6
2021-12-09 17:06:02.656 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 4
您需要在 ActiveMqService()
中对您的 javax.jms.ConnectionFactory
调用 setNonBlockingRedelivery(true)
,例如:
public ActiveMqService() throws JMSException {
cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
cf.setNonBlockingRedelivery(true);
...
}
此方法的评论指出:
/** * When true a MessageConsumer will not stop Message delivery before re-delivering Messages * from a rolled back transaction. This implies that message order will not be preserved and * also will result in the TransactedIndividualAck option to be enabled. */
此外,您可能希望将预取大小设置为高于 2
的当前值,因为等待重新传送的 2 条消息将阻止从代理中获取其他消息。