ActiveMQ Artemis - 按顺序处理消息
ActiveMQ Artemis - handle messages sequentially
我已经尝试确保队列中的消费者(使用消息分组)一次只会从它正在处理的每个队列中接收一条消息,直到消费者确认该消息。
为了进行测试,我设置了 ActiveMQ Artemis 并在通配符 EXAMPLE.* 上设置了 3 个消费者,一个发布者向 5 个队列中的每一个发送 10 条消息:EXAMPLE.1 - EXAMPLE.5。我看到的是每个消费者都立即从队列中收到消息。我已经尝试使用消费者 window 大小设置(如 0),因为我认为这会帮助我一次只从每个队列传递一条消息,但这似乎不起作用。
我是不是误解了那个设置?如果是这样,我是否应该查看任何其他设置来帮助我完成这项工作?
我试图实现的特定用例是我可能有很多队列和几个消费者。重要的是每个队列中的消息按顺序处理,但所有队列都可以并行处理。
谢谢!
ActiveMQ Artemis 支持的消息分组允许同一消费者串行处理同一组的所有消息。如果一个队列的所有消息都具有相同的group id,它们将被同一个消费者串行处理。
但是分组消息会影响并发处理。例如,如果队列头部有与客户端关联的组的 100 条消息块,后面跟着与另一个客户端关联的组的其他消息,那么所有前 100 条消息都需要发送到适当的客户端(它正在连续使用那些分组的消息)在其他消息可以被使用之前。
消费者window大小只影响来自服务器的消费者缓冲区消息。如果消费者 window 大小设置为 0,则消费者不会缓冲任何消息,因此消息可以传递给另一个消费者。
在您的情况下,生产者可以使用队列名称来设置组 ID,这样 EXAMPLE.* 的消费者将按顺序处理每个队列的消息,但我不会将消费者 window 大小设置为 0因为它可能会限制消费者的并行性。
以下消费者 window 大小等于 0 的主题层次结构中消息分组的演示显示了从每个队列顺序消费的消息以及消费者之间有限的并行性。
public static void main(final String[] args) throws Exception {
Connection connection = null;
try {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setConsumerWindowSize(1);
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topicSubscribe = ActiveMQJMSClient.createTopic("EXAMPLE.*");
MessageConsumer[] messageConsumers = new MessageConsumer[] {
session.createSharedConsumer(topicSubscribe, "EXAMPLE"),
session.createSharedConsumer(topicSubscribe, "EXAMPLE"),
session.createSharedConsumer(topicSubscribe, "EXAMPLE")
};
MessageProducer producer = session.createProducer(null);
for (int i = 0; i < 10; i++) {
for (int t = 0; t < 5; t++) {
TextMessage groupMessage = session.createTextMessage("Group-" + t + " message " + i);
groupMessage.setStringProperty("JMSXGroupID", "Group-" + t);
producer.send(ActiveMQJMSClient.createTopic("EXAMPLE." + t), groupMessage);
}
}
connection.start();
TextMessage messageReceived;
for (int i = 0; i < 100; i++) {
for (int c = 0; c < 3; c++) {
while ((messageReceived = (TextMessage) messageConsumers[c].receive(500)) != null) {
System.out.println("Consumer" + c + " received message: " + messageReceived.getText());
}
System.out.println("Consumer" + c + " received message: null");
}
}
} finally {
// Step 12. Be sure to close our resources!
if (connection != null) {
connection.close();
}
}
}
演示的输出是:
Consumer0 received message: Group-0 message 0
Consumer0 received message: Group-3 message 0
Consumer0 received message: Group-4 message 0
Consumer0 received message: Group-0 message 1
Consumer0 received message: null
Consumer1 received message: Group-1 message 0
Consumer1 received message: Group-1 message 1
Consumer1 received message: null
Consumer2 received message: Group-2 message 0
Consumer2 received message: Group-2 message 1
Consumer2 received message: null
Consumer0 received message: Group-3 message 1
Consumer0 received message: Group-4 message 1
Consumer0 received message: Group-0 message 2
Consumer0 received message: Group-3 message 2
Consumer0 received message: Group-4 message 2
Consumer0 received message: Group-0 message 3
Consumer0 received message: null
Consumer1 received message: Group-1 message 2
Consumer1 received message: Group-1 message 3
Consumer1 received message: null
Consumer2 received message: Group-2 message 2
Consumer2 received message: Group-2 message 3
Consumer2 received message: null
...
我已经尝试确保队列中的消费者(使用消息分组)一次只会从它正在处理的每个队列中接收一条消息,直到消费者确认该消息。
为了进行测试,我设置了 ActiveMQ Artemis 并在通配符 EXAMPLE.* 上设置了 3 个消费者,一个发布者向 5 个队列中的每一个发送 10 条消息:EXAMPLE.1 - EXAMPLE.5。我看到的是每个消费者都立即从队列中收到消息。我已经尝试使用消费者 window 大小设置(如 0),因为我认为这会帮助我一次只从每个队列传递一条消息,但这似乎不起作用。
我是不是误解了那个设置?如果是这样,我是否应该查看任何其他设置来帮助我完成这项工作?
我试图实现的特定用例是我可能有很多队列和几个消费者。重要的是每个队列中的消息按顺序处理,但所有队列都可以并行处理。
谢谢!
ActiveMQ Artemis 支持的消息分组允许同一消费者串行处理同一组的所有消息。如果一个队列的所有消息都具有相同的group id,它们将被同一个消费者串行处理。
但是分组消息会影响并发处理。例如,如果队列头部有与客户端关联的组的 100 条消息块,后面跟着与另一个客户端关联的组的其他消息,那么所有前 100 条消息都需要发送到适当的客户端(它正在连续使用那些分组的消息)在其他消息可以被使用之前。
消费者window大小只影响来自服务器的消费者缓冲区消息。如果消费者 window 大小设置为 0,则消费者不会缓冲任何消息,因此消息可以传递给另一个消费者。
在您的情况下,生产者可以使用队列名称来设置组 ID,这样 EXAMPLE.* 的消费者将按顺序处理每个队列的消息,但我不会将消费者 window 大小设置为 0因为它可能会限制消费者的并行性。
以下消费者 window 大小等于 0 的主题层次结构中消息分组的演示显示了从每个队列顺序消费的消息以及消费者之间有限的并行性。
public static void main(final String[] args) throws Exception {
Connection connection = null;
try {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setConsumerWindowSize(1);
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topicSubscribe = ActiveMQJMSClient.createTopic("EXAMPLE.*");
MessageConsumer[] messageConsumers = new MessageConsumer[] {
session.createSharedConsumer(topicSubscribe, "EXAMPLE"),
session.createSharedConsumer(topicSubscribe, "EXAMPLE"),
session.createSharedConsumer(topicSubscribe, "EXAMPLE")
};
MessageProducer producer = session.createProducer(null);
for (int i = 0; i < 10; i++) {
for (int t = 0; t < 5; t++) {
TextMessage groupMessage = session.createTextMessage("Group-" + t + " message " + i);
groupMessage.setStringProperty("JMSXGroupID", "Group-" + t);
producer.send(ActiveMQJMSClient.createTopic("EXAMPLE." + t), groupMessage);
}
}
connection.start();
TextMessage messageReceived;
for (int i = 0; i < 100; i++) {
for (int c = 0; c < 3; c++) {
while ((messageReceived = (TextMessage) messageConsumers[c].receive(500)) != null) {
System.out.println("Consumer" + c + " received message: " + messageReceived.getText());
}
System.out.println("Consumer" + c + " received message: null");
}
}
} finally {
// Step 12. Be sure to close our resources!
if (connection != null) {
connection.close();
}
}
}
演示的输出是:
Consumer0 received message: Group-0 message 0
Consumer0 received message: Group-3 message 0
Consumer0 received message: Group-4 message 0
Consumer0 received message: Group-0 message 1
Consumer0 received message: null
Consumer1 received message: Group-1 message 0
Consumer1 received message: Group-1 message 1
Consumer1 received message: null
Consumer2 received message: Group-2 message 0
Consumer2 received message: Group-2 message 1
Consumer2 received message: null
Consumer0 received message: Group-3 message 1
Consumer0 received message: Group-4 message 1
Consumer0 received message: Group-0 message 2
Consumer0 received message: Group-3 message 2
Consumer0 received message: Group-4 message 2
Consumer0 received message: Group-0 message 3
Consumer0 received message: null
Consumer1 received message: Group-1 message 2
Consumer1 received message: Group-1 message 3
Consumer1 received message: null
Consumer2 received message: Group-2 message 2
Consumer2 received message: Group-2 message 3
Consumer2 received message: null
...