使用 spring 抽象的 Rabbitmq 设计
Rabbit MQ deisgn using spring abstraction
我需要从名为 Metadata 的队列中收听消息 - 然后根据该消息,我将不得不读取一些队列,我们称之为 dataQ(该队列的名称将在元数据消息中)。要读取元数据,我可以使用 rabbit listener 但之后我必须从 dataQ 读取其他消息,所以我可以通过一种方式手动拉取 - 但我想要更干净的东西,比如 rabbit listener 这样我就不必管理频道,ack等。但由于队列名称在我们从元数据队列中读取消息之前是未知的,因此尝试探索其他解决方案。这个 dataQ 可以是 1000 个不同的队列名称,所以我们必须动态地监听那个 dataQ。
ack 也应该像这样工作 - 从元数据队列读取消息,处理给定的 dataQ - 为 dataQ 中的消息发送 acks(dataQ 可能有超过 1 条消息)并为元数据队列发送 ack。
(如果这对单个消费者有效,那么我可以添加容器模型并处理来自元数据队列的多个消息,这意味着我将能够同时处理多个数据队列。)
按照建议进行更新,对如何在主侦听器中获取事件以及该标志如何与并发一起工作感到困惑(抱歉到目前为止没有广泛使用应用程序事件)
package com.example;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
@Configuration
public class MyListener {
@Autowired
ConnectionFactory connectionFactory;
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@RabbitListener(queues = "Metadata")
public void messageProcessing(String c) {
System.out.println(c);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(c);
container.setMessageListener(new MessageListenerAdapter(new DataHandler()));
container.setApplicationEventPublisher(applicationEventPublisher);
container.setIdleEventInterval(5000);
container.start();
// how to get container idle event here
// so we can call container.stop();
}
public class DataHandler {
public void handleMessage(byte[] text) {
System.out.println("Data Received: " + text);
}
}
@EventListener
public void onApplicationEvent(ApplicationEvent event) {
//I am getting idle event here
System.out.println(event.getSource());
}
}
在元数据侦听器中启动一个新的 SimpleMessageListenerContainer
来处理数据会很容易;但您无法确认来自其他听众的原始消息。
您必须暂停元数据线程直到辅助侦听器完成,然后释放元数据线程以确认原始消息。
您可以使用容器空闲事件来检测工作是否完成(除非您有其他机制知道一切已完成)。
设置元数据侦听器容器的并发性以确定您希望以这种方式并发处理的数量。
@RabbitListener(queues = "meta")
public void handle(SomeObject message) {
// extract dataQ
// create a new SimpleMessageListenerContainer
// Inject an ApplicationEventPublisher instance
// start the container
// block here, waiting for a container idle event
// stop the container
return;
}
不过请记住,如果服务器崩溃,元数据消息将被重新传送(默认情况下)并且您可能已经处理了一些数据消息。
编辑
关于您在下面的评论,我的意思是使用您自己的发布者,这样您就不必弄清楚事件来自哪个容器...
@SpringBootApplication
public class So42459257Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So42459257Application.class, args);
RabbitTemplate template = context.getBean(RabbitTemplate.class);
template.convertAndSend("meta", "foo");
template.convertAndSend("foo", "baz");
template.convertAndSend("foo", "baz");
template.convertAndSend("meta", "bar");
template.convertAndSend("bar", "qux");
template.convertAndSend("bar", "qux");
context.getBean(So42459257Application.class).testCompleteLatch.await(10, TimeUnit.SECONDS);
context.close();
}
private final CountDownLatch testCompleteLatch = new CountDownLatch(2);
@Autowired
private ConnectionFactory connectionFactory;
@RabbitListener(queues = "meta")
public void handleMeta(final String queue) throws Exception {
System.out.println("Started processing " + queue);
final CountDownLatch startedLatch = new CountDownLatch(1);
final CountDownLatch finishedLatch = new CountDownLatch(1);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory);
container.setQueueNames(queue);
container.setMessageListener(new MessageListenerAdapter(new Object() {
@SuppressWarnings("unused")
public void handleMessage(String in) {
startedLatch.countDown();
System.out.println("Received " + in + " from " + queue);
}
}));
container.setIdleEventInterval(5000);
container.setApplicationEventPublisher(new ApplicationEventPublisher() {
@Override
public void publishEvent(Object event) {
}
@Override
public void publishEvent(ApplicationEvent event) {
if (event instanceof ListenerContainerIdleEvent) {
finishedLatch.countDown();
}
};
});
container.afterPropertiesSet();
container.start();
if (startedLatch.await(60, TimeUnit.SECONDS)) {
// handle container didn't receive any messages
}
if (finishedLatch.await(60, TimeUnit.SECONDS)) {
// handle container didn't go idle
}
System.out.println("Finished processing " + queue);
container.stop();
this.testCompleteLatch.countDown();
}
@Bean
public Queue meta() {
return new Queue("meta", false, false, true);
}
@Bean
public Queue foo() {
return new Queue("foo", false, false, true);
}
@Bean
public Queue bar() {
return new Queue("bar", false, false, true);
}
}
将侦听器容器并发设置为 2(使用 spring 启动只需添加
spring.rabbitmq.listener.concurrency=2
到应用程序属性);如果您不使用引导,请自行配置工厂。
结果:
Started processing bar
Started processing foo
Received baz from foo
Received qux from bar
Received baz from foo
Received qux from bar
Finished processing bar
Finished processing foo
我需要从名为 Metadata 的队列中收听消息 - 然后根据该消息,我将不得不读取一些队列,我们称之为 dataQ(该队列的名称将在元数据消息中)。要读取元数据,我可以使用 rabbit listener 但之后我必须从 dataQ 读取其他消息,所以我可以通过一种方式手动拉取 - 但我想要更干净的东西,比如 rabbit listener 这样我就不必管理频道,ack等。但由于队列名称在我们从元数据队列中读取消息之前是未知的,因此尝试探索其他解决方案。这个 dataQ 可以是 1000 个不同的队列名称,所以我们必须动态地监听那个 dataQ。
ack 也应该像这样工作 - 从元数据队列读取消息,处理给定的 dataQ - 为 dataQ 中的消息发送 acks(dataQ 可能有超过 1 条消息)并为元数据队列发送 ack。
(如果这对单个消费者有效,那么我可以添加容器模型并处理来自元数据队列的多个消息,这意味着我将能够同时处理多个数据队列。)
按照建议进行更新,对如何在主侦听器中获取事件以及该标志如何与并发一起工作感到困惑(抱歉到目前为止没有广泛使用应用程序事件)
package com.example;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
@Configuration
public class MyListener {
@Autowired
ConnectionFactory connectionFactory;
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@RabbitListener(queues = "Metadata")
public void messageProcessing(String c) {
System.out.println(c);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(c);
container.setMessageListener(new MessageListenerAdapter(new DataHandler()));
container.setApplicationEventPublisher(applicationEventPublisher);
container.setIdleEventInterval(5000);
container.start();
// how to get container idle event here
// so we can call container.stop();
}
public class DataHandler {
public void handleMessage(byte[] text) {
System.out.println("Data Received: " + text);
}
}
@EventListener
public void onApplicationEvent(ApplicationEvent event) {
//I am getting idle event here
System.out.println(event.getSource());
}
}
在元数据侦听器中启动一个新的 SimpleMessageListenerContainer
来处理数据会很容易;但您无法确认来自其他听众的原始消息。
您必须暂停元数据线程直到辅助侦听器完成,然后释放元数据线程以确认原始消息。 您可以使用容器空闲事件来检测工作是否完成(除非您有其他机制知道一切已完成)。
设置元数据侦听器容器的并发性以确定您希望以这种方式并发处理的数量。
@RabbitListener(queues = "meta")
public void handle(SomeObject message) {
// extract dataQ
// create a new SimpleMessageListenerContainer
// Inject an ApplicationEventPublisher instance
// start the container
// block here, waiting for a container idle event
// stop the container
return;
}
不过请记住,如果服务器崩溃,元数据消息将被重新传送(默认情况下)并且您可能已经处理了一些数据消息。
编辑
关于您在下面的评论,我的意思是使用您自己的发布者,这样您就不必弄清楚事件来自哪个容器...
@SpringBootApplication
public class So42459257Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So42459257Application.class, args);
RabbitTemplate template = context.getBean(RabbitTemplate.class);
template.convertAndSend("meta", "foo");
template.convertAndSend("foo", "baz");
template.convertAndSend("foo", "baz");
template.convertAndSend("meta", "bar");
template.convertAndSend("bar", "qux");
template.convertAndSend("bar", "qux");
context.getBean(So42459257Application.class).testCompleteLatch.await(10, TimeUnit.SECONDS);
context.close();
}
private final CountDownLatch testCompleteLatch = new CountDownLatch(2);
@Autowired
private ConnectionFactory connectionFactory;
@RabbitListener(queues = "meta")
public void handleMeta(final String queue) throws Exception {
System.out.println("Started processing " + queue);
final CountDownLatch startedLatch = new CountDownLatch(1);
final CountDownLatch finishedLatch = new CountDownLatch(1);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory);
container.setQueueNames(queue);
container.setMessageListener(new MessageListenerAdapter(new Object() {
@SuppressWarnings("unused")
public void handleMessage(String in) {
startedLatch.countDown();
System.out.println("Received " + in + " from " + queue);
}
}));
container.setIdleEventInterval(5000);
container.setApplicationEventPublisher(new ApplicationEventPublisher() {
@Override
public void publishEvent(Object event) {
}
@Override
public void publishEvent(ApplicationEvent event) {
if (event instanceof ListenerContainerIdleEvent) {
finishedLatch.countDown();
}
};
});
container.afterPropertiesSet();
container.start();
if (startedLatch.await(60, TimeUnit.SECONDS)) {
// handle container didn't receive any messages
}
if (finishedLatch.await(60, TimeUnit.SECONDS)) {
// handle container didn't go idle
}
System.out.println("Finished processing " + queue);
container.stop();
this.testCompleteLatch.countDown();
}
@Bean
public Queue meta() {
return new Queue("meta", false, false, true);
}
@Bean
public Queue foo() {
return new Queue("foo", false, false, true);
}
@Bean
public Queue bar() {
return new Queue("bar", false, false, true);
}
}
将侦听器容器并发设置为 2(使用 spring 启动只需添加
spring.rabbitmq.listener.concurrency=2
到应用程序属性);如果您不使用引导,请自行配置工厂。
结果:
Started processing bar
Started processing foo
Received baz from foo
Received qux from bar
Received baz from foo
Received qux from bar
Finished processing bar
Finished processing foo