Spring AMQP 优先消息
Spring AMQP Priority Message
RabbitMQ 队列中的消息优先级。它与提供的 java 客户端的 rabbitmq 一起工作。但它不适用于 spring-rabbit 依赖项。请看一看。
- RabbitMQ 服务器版本 - 3.6.5
- Erlang - OTP 19 (8.0)
使用 RabbitMQ Java 客户端
Pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.samples</groupId>
<artifactId>RabbitMQ</artifactId>
<version>0.0.1-SNAPSHOT</version>
<developers>
<developer>
<name>Sagar Rout</name>
</developer>
</developers>
<properties>
<!-- Generic properties -->
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- Spring -->
<spring-framework.version>4.3.2.RELEASE</spring-framework.version>
</properties>
<dependencies>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-framework.version}</version>
</dependency>
<!-- Spring AMQP -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.1.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Publisher.java
public class Publisher {
private final static String QUEUE_NAME = "S1_Priority";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false, false, args);
String message = "Hello World!";
for (int i = 0; i < 10; i++) {
channel.basicPublish("", QUEUE_NAME,
new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(i).build(),
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'" + "priority" + i);
}
channel.close();
connection.close();
}}
Consumer.Java
public class Consumer {
private final static String QUEUE_NAME = "S1_Priority";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false, false, args);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'" + properties.getPriority());
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}}
这是有效的,具有更高优先级的消息即将出现。但它不适用于 Spring-rabbit。请找到代码。
RabbitMQConfig.class
@Configuration
@ComponentScan(basePackages = { "com.blackocean.*" })
@PropertySource("classpath:config.properties")
public class RabbitMQConfig {
@Value("${rabbitmq.host}")
private String host;
@Value("${rabbitmq.port}")
private Integer port;
@Value("${rabbitmq.username}")
private String username;
@Value("${rabbitmq.password}")
private String password;
@Value("${rabbitmq.connection.size}")
private Integer connectionSize ;
@Bean
public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() {
return new PropertySourcesPlaceholderConfigurer();
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(host);
cachingConnectionFactory.setPort(port);
cachingConnectionFactory.setUsername(username);
cachingConnectionFactory.setPassword(password);
cachingConnectionFactory.setConnectionLimit(connectionSize);
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue queue() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", 10);
Queue queue = new Queue("myQueue", true, false, false, args) ;
return queue ;
}}
SendUsingJava配置
public class Send1UsingJavaConfig {
/**
* @param args
*/
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
rabbitTemplate.convertAndSend("", "myQueue", "Hi Mr.Ocean 10", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(9);
return message;
}
});
}
}
接收使用Java配置
public class RecvUsingJavaConfig {
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
// Basic Example
String message = (String) rabbitTemplate.receiveAndConvert("myQueue");
System.out.println(message);
}}
Config.properties
#RabbitMQ
rabbitmq.host=localhost
#Always provide port and connection size in numbers
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.connection.size=100
现在我以不同的优先级发送消息,但它总是按顺序接收消息。任何建议都会很棒!!!
这里只是一个猜测,我尝试查看我使用过的旧 AMQP 库(旧版本 Rabbit MQ 中的优先级队列)。
优先级设置如下
args.put("x-max-priority", 10);
,看起来和args.put("x-priority", 10);
略有不同。
您可以参考 link 中的旧 priority queue repo。你可以试试看是否有帮助
如果有人对消息的优先级有类似的要求,那么您需要在创建队列之前定义优先级(配置Class)。如果您计划将配置应用于现有队列,它将不起作用(根据我的测试)。
@Value("${myApp.rabbitmq.queue}")
private String queueName;
@Bean
Queue queue(){
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
Queue queue = new Queue(queueName, true, false, false, args) ;
return queue ;
}
当您将消息推送到队列中时,请确保优先级不超过 10,因为我们已将队列的最大优先级定义为 10。
BR,桑托斯
- 队列必须有 arg 'x-max-priority'。
- 发布时messageProperties.priority必须是none0.
使用spring-bootamqp时,重要的是设置
spring.rabbitmq.listener.simple.prefetch=1
否则 spring-boot 将完全忽略优先级获取 250 条消息。
RabbitMQ 队列中的消息优先级。它与提供的 java 客户端的 rabbitmq 一起工作。但它不适用于 spring-rabbit 依赖项。请看一看。
- RabbitMQ 服务器版本 - 3.6.5
- Erlang - OTP 19 (8.0)
使用 RabbitMQ Java 客户端
Pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.samples</groupId>
<artifactId>RabbitMQ</artifactId>
<version>0.0.1-SNAPSHOT</version>
<developers>
<developer>
<name>Sagar Rout</name>
</developer>
</developers>
<properties>
<!-- Generic properties -->
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- Spring -->
<spring-framework.version>4.3.2.RELEASE</spring-framework.version>
</properties>
<dependencies>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-framework.version}</version>
</dependency>
<!-- Spring AMQP -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.1.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Publisher.java
public class Publisher {
private final static String QUEUE_NAME = "S1_Priority";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false, false, args);
String message = "Hello World!";
for (int i = 0; i < 10; i++) {
channel.basicPublish("", QUEUE_NAME,
new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(i).build(),
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'" + "priority" + i);
}
channel.close();
connection.close();
}}
Consumer.Java
public class Consumer {
private final static String QUEUE_NAME = "S1_Priority";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false, false, args);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'" + properties.getPriority());
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}}
这是有效的,具有更高优先级的消息即将出现。但它不适用于 Spring-rabbit。请找到代码。
RabbitMQConfig.class
@Configuration
@ComponentScan(basePackages = { "com.blackocean.*" })
@PropertySource("classpath:config.properties")
public class RabbitMQConfig {
@Value("${rabbitmq.host}")
private String host;
@Value("${rabbitmq.port}")
private Integer port;
@Value("${rabbitmq.username}")
private String username;
@Value("${rabbitmq.password}")
private String password;
@Value("${rabbitmq.connection.size}")
private Integer connectionSize ;
@Bean
public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() {
return new PropertySourcesPlaceholderConfigurer();
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(host);
cachingConnectionFactory.setPort(port);
cachingConnectionFactory.setUsername(username);
cachingConnectionFactory.setPassword(password);
cachingConnectionFactory.setConnectionLimit(connectionSize);
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue queue() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", 10);
Queue queue = new Queue("myQueue", true, false, false, args) ;
return queue ;
}}
SendUsingJava配置
public class Send1UsingJavaConfig {
/**
* @param args
*/
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
rabbitTemplate.convertAndSend("", "myQueue", "Hi Mr.Ocean 10", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(9);
return message;
}
});
}
}
接收使用Java配置
public class RecvUsingJavaConfig {
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
// Basic Example
String message = (String) rabbitTemplate.receiveAndConvert("myQueue");
System.out.println(message);
}}
Config.properties
#RabbitMQ
rabbitmq.host=localhost
#Always provide port and connection size in numbers
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.connection.size=100
现在我以不同的优先级发送消息,但它总是按顺序接收消息。任何建议都会很棒!!!
这里只是一个猜测,我尝试查看我使用过的旧 AMQP 库(旧版本 Rabbit MQ 中的优先级队列)。
优先级设置如下
args.put("x-max-priority", 10);
,看起来和args.put("x-priority", 10);
略有不同。
您可以参考 link 中的旧 priority queue repo。你可以试试看是否有帮助
如果有人对消息的优先级有类似的要求,那么您需要在创建队列之前定义优先级(配置Class)。如果您计划将配置应用于现有队列,它将不起作用(根据我的测试)。
@Value("${myApp.rabbitmq.queue}")
private String queueName;
@Bean
Queue queue(){
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
Queue queue = new Queue(queueName, true, false, false, args) ;
return queue ;
}
当您将消息推送到队列中时,请确保优先级不超过 10,因为我们已将队列的最大优先级定义为 10。
BR,桑托斯
- 队列必须有 arg 'x-max-priority'。
- 发布时messageProperties.priority必须是none0.
使用spring-bootamqp时,重要的是设置
spring.rabbitmq.listener.simple.prefetch=1
否则 spring-boot 将完全忽略优先级获取 250 条消息。