在 RabbitMQ 中分组接收消息,最好使用 Spring AMQP?
Group received messages in RabbitMQ, preferably using Spring AMQP?
我从服务 (S) 接收消息,该服务将每个个体 属性 更改作为单独的消息发布到实体。一个人为的例子是这样的实体:
Person {
id: 123
name: "Something",
address: {...}
}
如果姓名和地址在同一笔交易中更新,则 (S) 将发布两条消息,PersonNameCorrected
和 PersonMoved
。问题出在接收端,我正在存储此 Person
实体的投影,每个 属性 更改都会导致写入数据库。所以在这个例子中,会有两次写入数据库,但如果我可以在短时间内批处理消息并按 id 对它们进行分组,那么我只需要对数据库进行一次写入。
在 RabbitMQ 中通常如何处理这个问题? Spring AMQP 是否提供更简单的抽象?
请注意,我已经简要地查看了 prefetch,但我不确定这是否可行。如果我理解正确的话,预取也是每个连接的基础。我试图在 per-queue 的基础上实现这一点,因为如果批处理(并因此增加延迟)是要走的路,我不想将这种延迟添加到所有队列由我的服务消耗(但仅限于那些需要 "group-by-id" 功能的服务)。
预取对于这种情况没有帮助。
考虑使用 Spring Integration,它的适配器位于 Spring AMQP 之上;它还提供了一个聚合器,可用于在将消息发送到管道的下一阶段之前将消息分组在一起。
编辑
这是一个用于演示的快速启动应用程序...
@SpringBootApplication
public class So42969130Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So42969130Application.class, args)
.close();
}
@Autowired
private RabbitTemplate template;
@Autowired
private Handler handler;
@Override
public void run(String... args) throws Exception {
this.template.convertAndSend("so9130", new PersonNameChanged(123));
this.template.convertAndSend("so9130", new PersonMoved(123));
this.handler.latch.await(10, TimeUnit.SECONDS);
}
@Bean
public IntegrationFlow flow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "so9130")
.messageConverter(converter()))
.aggregate(a -> a
.correlationExpression("payload.id")
.releaseExpression("false") // open-ended release, timeout only
.sendPartialResultOnExpiry(true)
.groupTimeout(2000))
.handle(handler())
.get();
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Handler handler() {
return new Handler();
}
@Bean
public Queue queue() {
return new Queue("so9130", false, false, true);
}
public static class Handler {
private final CountDownLatch latch = new CountDownLatch(1);
@ServiceActivator
public void handle(Collection<?> aggregatedData) {
System.out.println(aggregatedData);
this.latch.countDown();
}
}
public static class PersonNameChanged {
private int id;
PersonNameChanged() {
}
PersonNameChanged(int id) {
this.id = id;
}
public int getId() {
return this.id;
}
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "PersonNameChanged [id=" + this.id + "]";
}
}
public static class PersonMoved {
private int id;
PersonMoved() {
}
PersonMoved(int id) {
this.id = id;
}
public int getId() {
return this.id;
}
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "PersonMoved [id=" + this.id + "]";
}
}
}
Pom:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>so42969130</artifactId>
<version>2.0.0-BUILD-SNAPSHOT</version>
<packaging>jar</packaging>
<name>so42969130</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
结果:
2017-03-23 09:56:57.501 INFO 75217 --- [ask-scheduler-2] .s.i.a.AbstractCorrelatingMessageHandler :
Expiring MessageGroup with correlationKey[123]
[PersonNameChanged [id=123], PersonMoved [id=123]]
使用 Spring-Integration 将消息系统的缺点转移到 software/service-side 显然是错误的 Spring Integration 或任何框架都不是这种情况。它也不能很好地扩展并且不能容错
这个问题的核心是将路由消息与业务logic/sending消息分开
AFAIK 只有 Kafka 和 Apache Artemis 支持来自 JMS 的 JMSXGroup API 查看当前成熟的队列提供程序 RabbitMQ 没有它但是 AMQP 已经被指定但是再次 RabbitMq 还没有实现它尽管来自社区的请求.
在企业架构中确保从许多独立来源进行有序顺序处理的单一但也是非常常见且重要的案例将 RabbitMQ 排除在作为默认消息传递解决方案的进一步考虑之外
我从服务 (S) 接收消息,该服务将每个个体 属性 更改作为单独的消息发布到实体。一个人为的例子是这样的实体:
Person {
id: 123
name: "Something",
address: {...}
}
如果姓名和地址在同一笔交易中更新,则 (S) 将发布两条消息,PersonNameCorrected
和 PersonMoved
。问题出在接收端,我正在存储此 Person
实体的投影,每个 属性 更改都会导致写入数据库。所以在这个例子中,会有两次写入数据库,但如果我可以在短时间内批处理消息并按 id 对它们进行分组,那么我只需要对数据库进行一次写入。
在 RabbitMQ 中通常如何处理这个问题? Spring AMQP 是否提供更简单的抽象?
请注意,我已经简要地查看了 prefetch,但我不确定这是否可行。如果我理解正确的话,预取也是每个连接的基础。我试图在 per-queue 的基础上实现这一点,因为如果批处理(并因此增加延迟)是要走的路,我不想将这种延迟添加到所有队列由我的服务消耗(但仅限于那些需要 "group-by-id" 功能的服务)。
预取对于这种情况没有帮助。
考虑使用 Spring Integration,它的适配器位于 Spring AMQP 之上;它还提供了一个聚合器,可用于在将消息发送到管道的下一阶段之前将消息分组在一起。
编辑
这是一个用于演示的快速启动应用程序...
@SpringBootApplication
public class So42969130Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So42969130Application.class, args)
.close();
}
@Autowired
private RabbitTemplate template;
@Autowired
private Handler handler;
@Override
public void run(String... args) throws Exception {
this.template.convertAndSend("so9130", new PersonNameChanged(123));
this.template.convertAndSend("so9130", new PersonMoved(123));
this.handler.latch.await(10, TimeUnit.SECONDS);
}
@Bean
public IntegrationFlow flow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "so9130")
.messageConverter(converter()))
.aggregate(a -> a
.correlationExpression("payload.id")
.releaseExpression("false") // open-ended release, timeout only
.sendPartialResultOnExpiry(true)
.groupTimeout(2000))
.handle(handler())
.get();
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Handler handler() {
return new Handler();
}
@Bean
public Queue queue() {
return new Queue("so9130", false, false, true);
}
public static class Handler {
private final CountDownLatch latch = new CountDownLatch(1);
@ServiceActivator
public void handle(Collection<?> aggregatedData) {
System.out.println(aggregatedData);
this.latch.countDown();
}
}
public static class PersonNameChanged {
private int id;
PersonNameChanged() {
}
PersonNameChanged(int id) {
this.id = id;
}
public int getId() {
return this.id;
}
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "PersonNameChanged [id=" + this.id + "]";
}
}
public static class PersonMoved {
private int id;
PersonMoved() {
}
PersonMoved(int id) {
this.id = id;
}
public int getId() {
return this.id;
}
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "PersonMoved [id=" + this.id + "]";
}
}
}
Pom:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>so42969130</artifactId>
<version>2.0.0-BUILD-SNAPSHOT</version>
<packaging>jar</packaging>
<name>so42969130</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
结果:
2017-03-23 09:56:57.501 INFO 75217 --- [ask-scheduler-2] .s.i.a.AbstractCorrelatingMessageHandler :
Expiring MessageGroup with correlationKey[123]
[PersonNameChanged [id=123], PersonMoved [id=123]]
使用 Spring-Integration 将消息系统的缺点转移到 software/service-side 显然是错误的 Spring Integration 或任何框架都不是这种情况。它也不能很好地扩展并且不能容错
这个问题的核心是将路由消息与业务logic/sending消息分开
AFAIK 只有 Kafka 和 Apache Artemis 支持来自 JMS 的 JMSXGroup API 查看当前成熟的队列提供程序 RabbitMQ 没有它但是 AMQP 已经被指定但是再次 RabbitMq 还没有实现它尽管来自社区的请求.
在企业架构中确保从许多独立来源进行有序顺序处理的单一但也是非常常见且重要的案例将 RabbitMQ 排除在作为默认消息传递解决方案的进一步考虑之外