在多线程环境中没有固定的消息传递到 Rabbitmq 服务器的顺序
no fix order of messages being delivered to Rabbitmq Server in multithreading Environment
请先看一下我的代码。
这是我的测试class,它创建了 2000 个线程,这些线程正在发送消息。
public class MessageSenderMultipleThreadMock {
@Autowired
MessageList message;
@Autowired
MessageSender sender;
public boolean process() throws InterruptedException {
for (int i = 0; i < 2000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
String routingkey = "operation"
+ UUID.randomUUID().toString();
String queueName = UUID.randomUUID().toString();
message.setSender(Thread.currentThread().getName());
try {
sender.sendMessage(routingkey, queueName,
"this is message");
} catch (InvalidMessagingParameters e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(1000);
}
Thread.currentThread();
Thread.sleep(10000);
return true;
}
}
消息发件人
这是我的主要发件人class
@Service
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageList message;
String queueName = "";
String routingKey = "";
@Autowired
private QueueCreationService service;
private boolean messageSentFlag;
String returnedMessage = "";
private Logger log = LoggerFactory.getLogger(MessageSender.class.getName());
public boolean sendMessage(String routingKey, String queueName,
String messageToBeSent) throws InvalidMessagingParameters {
if ((routingKey == null && queueName == null)
|| (routingKey.equalsIgnoreCase("") || queueName
.equalsIgnoreCase("")))
throw new InvalidMessagingParameters(routingKey, queueName);
else {
this.routingKey = routingKey;
this.queueName = queueName;
}
service.processBinding(queueName, routingKey);
message.addMessages(messageToBeSent);
return execute();
}
/*
* overloaded sendMessage method will use requestMap . RequestMap includes
* queueName and routingKey that controller provides.
*/
public boolean sendMessage(Map<String, String> requestMap)
throws MessagingConnectionFailsException,
InvalidMessagingParameters {
this.queueName = requestMap.get("queue");
this.routingKey = requestMap.get("routingkey");
if ((routingKey == null && queueName == null)
|| (routingKey.equalsIgnoreCase("") || queueName
.equalsIgnoreCase("")))
throw new InvalidMessagingParameters(routingKey, queueName);
service.processBinding(queueName, routingKey);
preparingMessagingTemplate();
return execute();
}
private boolean execute() {
for (int i = 0; i < 5 && !messageSentFlag; i++) {
executeMessageSending();
}
return messageSentFlag;
}
private String convertMessageToJson(MessageList message) {
ObjectWriter ow = new ObjectMapper().writer()
.withDefaultPrettyPrinter();
String json = "";
try {
json = ow.writeValueAsString(message);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return json;
}
private void executeMessageSending() {
rabbitTemplate.convertAndSend(R.EXCHANGE_NAME, routingKey,
convertMessageToJson(message), new CorrelationData(UUID
.randomUUID().toString()));
}
private void preparingMessagingTemplate() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey) {
returnedMessage = replyText;
}
});
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
System.out.println("*" + ack);
if (ack && !returnedMessage.equalsIgnoreCase("NO_ROUTE")) {
messageSentFlag = ack;
log.info("message " + message.toString()
+ " from Operation +" + this.getClass().getName()
+ "+ has been successfully delivered");
} else {
log.info("message " + message.toString()
+ " from Operation +" + this.getClass().getName()
+ "+ has not been delivered");
}
}
});
}
}
我的配置class 被消息使用
@Configuration
@ComponentScan("com.alpharaid.orange.*")
@PropertySource("classpath:application.properties")
public class MessageConfiguration {
String content = "";
@Value("${rabbitmq_host}")
String host = "";
String port = "";
@Value("${rabbitmq_username}")
String userName = "";
@Value("${rabbitmq_password}")
String password = "";
String queueName = "";
InputStream input = null;
@Autowired
public MessageConfiguration() {
}
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
@Bean
@Scope("prototype")
public QueueCreationService service() {
return new QueueCreationService();
}
@Bean
@Scope("prototype")
public RabbitAdmin admin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
this.host);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
}
我的问题:
正如我在服务器上看到的,一些线程正在成功传递消息,而另一些则没有。
完全没有确定rabbitTemplate监听器(
rabbitTemplate.setReturnCallback(新的 ReturnCallback() {
我每次都需要听众工作,因为在此基础上我会再次尝试发送消息
private boolean execute() {
for (int i = 0; i < 5 && !messageSentFlag; i++) {
executeMessageSending();
}
return messageSentFlag;
}
我可以看到有时消息被传递了 5 次,因为 messageSentFlag 是假的,只有在确认监听器中才变为真。
- 请告诉我如何删除队列?因为我有 8000 个队列,我在 rabbitAdmin 中看到了一种删除队列的方法,但它需要队列的名称,而我的队列只是任意随机队列(UUID)
请提供您的想法,我该如何改进它或者有什么解决方法吗?
对于我的应用程序,多线程环境是必须的。
提前致谢。
RabbitMQ 只保证消息在特定队列中时的顺序。
无法保证向 RabbitMQ 发送消息的消息顺序,除非您提供这些保证。在许多情况下,这是一件困难的事情,如果不是不可能的话——尤其是在像您这样的多线程环境中。
如果您需要保证消息按特定顺序处理,您需要查看构建或使用 resequencer
一般的想法是,您需要在源头对消息进行编号 - 1、2、3、4、5 等。当您的消费者从队列中拉取消息时,您将查看消息编号并看看这是否是您现在需要的。如果不是,您将保留该消息并稍后处理它。获得当前正在查找的消息 # 后,您将按顺序处理当前持有的所有消息。
spring 应该有像重新排序器这样的东西可用,但我对该生态系统不够熟悉,无法为您指明正确的方向。
请先看一下我的代码。
这是我的测试class,它创建了 2000 个线程,这些线程正在发送消息。
public class MessageSenderMultipleThreadMock {
@Autowired
MessageList message;
@Autowired
MessageSender sender;
public boolean process() throws InterruptedException {
for (int i = 0; i < 2000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
String routingkey = "operation"
+ UUID.randomUUID().toString();
String queueName = UUID.randomUUID().toString();
message.setSender(Thread.currentThread().getName());
try {
sender.sendMessage(routingkey, queueName,
"this is message");
} catch (InvalidMessagingParameters e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(1000);
}
Thread.currentThread();
Thread.sleep(10000);
return true;
}
}
消息发件人
这是我的主要发件人class
@Service
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageList message;
String queueName = "";
String routingKey = "";
@Autowired
private QueueCreationService service;
private boolean messageSentFlag;
String returnedMessage = "";
private Logger log = LoggerFactory.getLogger(MessageSender.class.getName());
public boolean sendMessage(String routingKey, String queueName,
String messageToBeSent) throws InvalidMessagingParameters {
if ((routingKey == null && queueName == null)
|| (routingKey.equalsIgnoreCase("") || queueName
.equalsIgnoreCase("")))
throw new InvalidMessagingParameters(routingKey, queueName);
else {
this.routingKey = routingKey;
this.queueName = queueName;
}
service.processBinding(queueName, routingKey);
message.addMessages(messageToBeSent);
return execute();
}
/*
* overloaded sendMessage method will use requestMap . RequestMap includes
* queueName and routingKey that controller provides.
*/
public boolean sendMessage(Map<String, String> requestMap)
throws MessagingConnectionFailsException,
InvalidMessagingParameters {
this.queueName = requestMap.get("queue");
this.routingKey = requestMap.get("routingkey");
if ((routingKey == null && queueName == null)
|| (routingKey.equalsIgnoreCase("") || queueName
.equalsIgnoreCase("")))
throw new InvalidMessagingParameters(routingKey, queueName);
service.processBinding(queueName, routingKey);
preparingMessagingTemplate();
return execute();
}
private boolean execute() {
for (int i = 0; i < 5 && !messageSentFlag; i++) {
executeMessageSending();
}
return messageSentFlag;
}
private String convertMessageToJson(MessageList message) {
ObjectWriter ow = new ObjectMapper().writer()
.withDefaultPrettyPrinter();
String json = "";
try {
json = ow.writeValueAsString(message);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return json;
}
private void executeMessageSending() {
rabbitTemplate.convertAndSend(R.EXCHANGE_NAME, routingKey,
convertMessageToJson(message), new CorrelationData(UUID
.randomUUID().toString()));
}
private void preparingMessagingTemplate() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey) {
returnedMessage = replyText;
}
});
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
System.out.println("*" + ack);
if (ack && !returnedMessage.equalsIgnoreCase("NO_ROUTE")) {
messageSentFlag = ack;
log.info("message " + message.toString()
+ " from Operation +" + this.getClass().getName()
+ "+ has been successfully delivered");
} else {
log.info("message " + message.toString()
+ " from Operation +" + this.getClass().getName()
+ "+ has not been delivered");
}
}
});
}
}
我的配置class 被消息使用
@Configuration
@ComponentScan("com.alpharaid.orange.*")
@PropertySource("classpath:application.properties")
public class MessageConfiguration {
String content = "";
@Value("${rabbitmq_host}")
String host = "";
String port = "";
@Value("${rabbitmq_username}")
String userName = "";
@Value("${rabbitmq_password}")
String password = "";
String queueName = "";
InputStream input = null;
@Autowired
public MessageConfiguration() {
}
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
@Bean
@Scope("prototype")
public QueueCreationService service() {
return new QueueCreationService();
}
@Bean
@Scope("prototype")
public RabbitAdmin admin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
this.host);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
}
我的问题:
正如我在服务器上看到的,一些线程正在成功传递消息,而另一些则没有。
完全没有确定rabbitTemplate监听器(
rabbitTemplate.setReturnCallback(新的 ReturnCallback() {
我每次都需要听众工作,因为在此基础上我会再次尝试发送消息
private boolean execute() {
for (int i = 0; i < 5 && !messageSentFlag; i++) {
executeMessageSending();
}
return messageSentFlag;
}
我可以看到有时消息被传递了 5 次,因为 messageSentFlag 是假的,只有在确认监听器中才变为真。
- 请告诉我如何删除队列?因为我有 8000 个队列,我在 rabbitAdmin 中看到了一种删除队列的方法,但它需要队列的名称,而我的队列只是任意随机队列(UUID)
请提供您的想法,我该如何改进它或者有什么解决方法吗? 对于我的应用程序,多线程环境是必须的。
提前致谢。
RabbitMQ 只保证消息在特定队列中时的顺序。
无法保证向 RabbitMQ 发送消息的消息顺序,除非您提供这些保证。在许多情况下,这是一件困难的事情,如果不是不可能的话——尤其是在像您这样的多线程环境中。
如果您需要保证消息按特定顺序处理,您需要查看构建或使用 resequencer
一般的想法是,您需要在源头对消息进行编号 - 1、2、3、4、5 等。当您的消费者从队列中拉取消息时,您将查看消息编号并看看这是否是您现在需要的。如果不是,您将保留该消息并稍后处理它。获得当前正在查找的消息 # 后,您将按顺序处理当前持有的所有消息。
spring 应该有像重新排序器这样的东西可用,但我对该生态系统不够熟悉,无法为您指明正确的方向。