Spring 和 Rabbit mq 消息顺序
Spring and Rabbit mq message order
我正在开发一个接收按 session id 分组的剩余消息的应用程序(session 1 可以由 2 条消息组成,session 2 可以由 10 条消息组成)并将它们发送到数据库。给定 session 的消息在内部具有相同的 session id。
对于给定的 session,第一条消息应该首先发送到数据库,然后是第二条,依此类推。session 中的顺序非常重要。
session的顺序并不重要,我们可以混合他们的信息,例如我们可以按此顺序将消息发送到数据库:
- session 一条消息 1
- session B 消息 1
- session 一条消息 2
- session C 消息 1
- session B 消息 2
- session 消息 3
- session C 消息 2
我创建了 10 个 rabbitmq 队列。应用程序根据 session id 选择队列:来自给定 session 的所有消息都在同一队列中。
每个队列有1个消费者,所以同一个队列中的顺序是有保证的。
出于性能原因(以及流量增长),我们必须将队列数设置得更高(节点创建 100 个队列)或部署应用程序的其他实例(10 个节点,每个队列上有 1 个消费者 - 所以每个队列 10 个消费者)。
设置更高的队列数并不难,但我做的方式有点难看并且有代码重复(见下文)。我需要改进的建议(当天我们需要 1000 个队列)。
如果我们部署 10 个节点而不是 1 个,每个队列将有 10 个消费者,队列中消息的顺序将无法保证(因此来自 session A 的消息 2 可以发送到数据库在来自 session A 的消息 1 之前)。
首选解决方案是 10 个节点,因为我们可以使其动态化,并且我们可以在需要时在 docker 中 start/stop 个节点。
这是我使用的依赖项:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>1.6.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.3.RELEASE</version>
</dependency>
兔子配置如下:
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setPrefetchCount(50);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
@Bean
public ConnectionFactory connectionFactory() {
String addresses = "address1,address2";
com.rabbitmq.client.ConnectionFactory rabbitConnection = new com.rabbitmq.client.ConnectionFactory();
rabbitConnection.setAutomaticRecoveryEnabled(true);
rabbitConnection.setUsername("username");
rabbitConnection.setPassword("password");
rabbitConnection.setVirtualHost("virtualHost");
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnection);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setAddresses(addresses);
connectionFactory.setChannelCacheSize(100);
return connectionFactory;
}
目前,我使用 10 个 classes 创建了 10 个队列。这是一个队列示例:
@Component
@RabbitListener(containerFactory = "myRabbitListenerContainerFactory", bindings = @QueueBinding(value = @Queue(value = "queue2", durable = "true"), exchange = @Exchange(type = "topic", value = "exchange2", durable = "true"), key = "key2"))
public class QueueGroup2Listener {
@RabbitHandler
public void processOrder(RequestMessage received) throws DataAccessResourceFailureException {
process(received);
}
}
我没有找到比在注释中使用不同值(从 1 到 10)创建 10 倍这个 class 更好的方法。
问题是:
如何在队列中添加消费者并保证给定 session 中消息的顺序?我的意思是队列中有 10 个消费者。消费者 A 使用来自 session A 的消息 1,因此其他消费者不应使用来自 session A.
的其他消息
奖金问题是:
我怎样才能使队列创建优于每个队列 1 class?
非常感谢
更新
这个问题的答案对我有很大帮助
RabbitMQ : Create Dynamic queues in Direct Exchange:我可以为每个 session 创建一个队列(在这种情况下,下一个问题是 rabbitmq 可以同时管理多少个队列?)
加里回答后更新
感谢您的回复,我尝试了以下方法,但启动消费者的应用程序非常长:
@Bean
public QueueMessageListener listener() {
return new QueueMessageListener();
}
@Bean(name="exchange")
public Exchange exchange() {
TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
MessageListenerAdapter adapter = new MessageListenerAdapter(listener(), "processOrder");
container.setMessageListener(adapter);
admin().declareExchange(exchange);
createQueues(exchange, QUEUE, numberOfQueues, BINDING_KEY, container, null, true);
container.start(); // very very very long
return exchange;
}
private void createQueues(Exchange exchange, String queuePrefix, int numberOfQueues, String bindingPrefix,
SimpleMessageListenerContainer container, Map<String, Object> args) {
int length = 1;
if(numberOfQueues > 1) {
length = (int)(Math.log10(numberOfQueues - 1) + 1);
}
for (int i = 0; i < numberOfQueues; i++) {
Queue queue = new Queue(queuePrefix + String.format("%0" + length + "d", i), true, false, false, args);
container.addQueues(queue);
admin().declareQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(bindingPrefix + i).noargs();
admin().declareBinding(binding);
}
}
如果我不调用启动函数,则不会创建消费者。
您可以通过编程方式启动 SimpleMessageListenerContainer
s,而不是使用声明性范例。
您还可以使用 RabbitAdmin
以编程方式声明队列、绑定等。
由于 Spring AMQP 缓存通道,因此无法保证同一通道上会发生两次发送(这会导致订单丢失的可能性非常小);为确保顺序,您需要在即将发布的 2.0 版本中使用新的 RabbitTemplate.invoke()
方法。它将在同一通道上的调用范围内执行发送,因此可以保证顺序。
如果您的发送代码是单线程的,这不是问题,因为在这种情况下将始终使用相同的通道。
我正在开发一个接收按 session id 分组的剩余消息的应用程序(session 1 可以由 2 条消息组成,session 2 可以由 10 条消息组成)并将它们发送到数据库。给定 session 的消息在内部具有相同的 session id。
对于给定的 session,第一条消息应该首先发送到数据库,然后是第二条,依此类推。session 中的顺序非常重要。
session的顺序并不重要,我们可以混合他们的信息,例如我们可以按此顺序将消息发送到数据库:
- session 一条消息 1
- session B 消息 1
- session 一条消息 2
- session C 消息 1
- session B 消息 2
- session 消息 3
- session C 消息 2
我创建了 10 个 rabbitmq 队列。应用程序根据 session id 选择队列:来自给定 session 的所有消息都在同一队列中。
每个队列有1个消费者,所以同一个队列中的顺序是有保证的。
出于性能原因(以及流量增长),我们必须将队列数设置得更高(节点创建 100 个队列)或部署应用程序的其他实例(10 个节点,每个队列上有 1 个消费者 - 所以每个队列 10 个消费者)。
设置更高的队列数并不难,但我做的方式有点难看并且有代码重复(见下文)。我需要改进的建议(当天我们需要 1000 个队列)。
如果我们部署 10 个节点而不是 1 个,每个队列将有 10 个消费者,队列中消息的顺序将无法保证(因此来自 session A 的消息 2 可以发送到数据库在来自 session A 的消息 1 之前)。
首选解决方案是 10 个节点,因为我们可以使其动态化,并且我们可以在需要时在 docker 中 start/stop 个节点。
这是我使用的依赖项:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>1.6.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.3.RELEASE</version>
</dependency>
兔子配置如下:
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setPrefetchCount(50);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
@Bean
public ConnectionFactory connectionFactory() {
String addresses = "address1,address2";
com.rabbitmq.client.ConnectionFactory rabbitConnection = new com.rabbitmq.client.ConnectionFactory();
rabbitConnection.setAutomaticRecoveryEnabled(true);
rabbitConnection.setUsername("username");
rabbitConnection.setPassword("password");
rabbitConnection.setVirtualHost("virtualHost");
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnection);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setAddresses(addresses);
connectionFactory.setChannelCacheSize(100);
return connectionFactory;
}
目前,我使用 10 个 classes 创建了 10 个队列。这是一个队列示例:
@Component
@RabbitListener(containerFactory = "myRabbitListenerContainerFactory", bindings = @QueueBinding(value = @Queue(value = "queue2", durable = "true"), exchange = @Exchange(type = "topic", value = "exchange2", durable = "true"), key = "key2"))
public class QueueGroup2Listener {
@RabbitHandler
public void processOrder(RequestMessage received) throws DataAccessResourceFailureException {
process(received);
}
}
我没有找到比在注释中使用不同值(从 1 到 10)创建 10 倍这个 class 更好的方法。
问题是: 如何在队列中添加消费者并保证给定 session 中消息的顺序?我的意思是队列中有 10 个消费者。消费者 A 使用来自 session A 的消息 1,因此其他消费者不应使用来自 session A.
的其他消息奖金问题是: 我怎样才能使队列创建优于每个队列 1 class?
非常感谢
更新
这个问题的答案对我有很大帮助 RabbitMQ : Create Dynamic queues in Direct Exchange:我可以为每个 session 创建一个队列(在这种情况下,下一个问题是 rabbitmq 可以同时管理多少个队列?)
加里回答后更新
感谢您的回复,我尝试了以下方法,但启动消费者的应用程序非常长:
@Bean
public QueueMessageListener listener() {
return new QueueMessageListener();
}
@Bean(name="exchange")
public Exchange exchange() {
TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
MessageListenerAdapter adapter = new MessageListenerAdapter(listener(), "processOrder");
container.setMessageListener(adapter);
admin().declareExchange(exchange);
createQueues(exchange, QUEUE, numberOfQueues, BINDING_KEY, container, null, true);
container.start(); // very very very long
return exchange;
}
private void createQueues(Exchange exchange, String queuePrefix, int numberOfQueues, String bindingPrefix,
SimpleMessageListenerContainer container, Map<String, Object> args) {
int length = 1;
if(numberOfQueues > 1) {
length = (int)(Math.log10(numberOfQueues - 1) + 1);
}
for (int i = 0; i < numberOfQueues; i++) {
Queue queue = new Queue(queuePrefix + String.format("%0" + length + "d", i), true, false, false, args);
container.addQueues(queue);
admin().declareQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(bindingPrefix + i).noargs();
admin().declareBinding(binding);
}
}
如果我不调用启动函数,则不会创建消费者。
您可以通过编程方式启动 SimpleMessageListenerContainer
s,而不是使用声明性范例。
您还可以使用 RabbitAdmin
以编程方式声明队列、绑定等。
由于 Spring AMQP 缓存通道,因此无法保证同一通道上会发生两次发送(这会导致订单丢失的可能性非常小);为确保顺序,您需要在即将发布的 2.0 版本中使用新的 RabbitTemplate.invoke()
方法。它将在同一通道上的调用范围内执行发送,因此可以保证顺序。
如果您的发送代码是单线程的,这不是问题,因为在这种情况下将始终使用相同的通道。