java spring 启动 amqp 两个具有不同行为的侦听器
java spring boot amqp two listeners with different behaviour
我是 spring amqp 的新手,我想创建两个具有不同行为的不同侦听器。问题是我在编译时不知道队列名称,所以我不能使用 解决方案。
我想做的是:从 "sidechannel" 队列中读取(然后删除)第一条消息,它应该看起来像这样 {"queues":["queue1","queue2"]}
。
现在从队列 1 和队列 2 中读取(然后删除)第一条消息。之后,转到步骤1,读取sidechannel的第一条消息
我尝试创建 2 个具有不同侦听器的 SimpleMessageListenerContainer,正如您在下面的代码中看到的那样,但它并没有像我想象的那样工作。
我的代码:
@SpringBootApplication
public class Main implements CommandLineRunner {
final static String queueName = "sidechannel";
@Autowired
AnnotationConfigApplicationContext context;
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("spring-boot-exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueNames(queueName);
container.setMessageListener(sidechannelListener());
return container;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer2() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueNames("queue1","queue2");
container.setMessageListener(queueListener());
return container;
}
@Bean
public MessageListener sidechannelListener() {
return message -> {
String msg = new String(message.getBody());
System.out.println(msg);
try {
Map<String, Object> map = jsonToMap(msg);
for (String name : (ArrayList<String>) map.get("queues")) {
System.out.println("Waiting for " + name + " message");
rabbitTemplate.receive(name);
}
} catch (IOException e) {
e.printStackTrace();
}
};
}
@Bean
public MessageListener queueListener() {
return message -> {
String msg = new String(message.getBody());
System.out.println("Received message: ");
System.out.println(msg);
};
}
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(Main.class, args);
}
@Override
public void run(String... args) throws Exception {
rabbitTemplate.setReceiveTimeout(-1);
while(true) {
System.out.println("Waiting for side channel message");
rabbitTemplate.receive(queueName);
}
// context.close();
}
}
首先,由于某些原因,侧通道队列中的消息在处理后不会被删除。
其次,当我期待这样的输出时:
Waiting for side channel message
{"queues":["queue1","queue2"]}
Waiting for queue1 message
Received message:
"message from queue1"
Waiting for queue2 message
"message from queue2"
Waiting for side channel message
即使我在那些不同的队列上收到消息,也不会发生任何事情(因为 rabbitTemplate.setReceiveTimeout(-1);
),但它会以某种方式对我收到的每条消息做出反应...
此外,我不明白的是,如果我先将消息发送到侧通道,然后发送到队列 1,它会像这样:
Waiting for side channel message
{"queues":["queue1","queue2"]}
Waiting for queue1 message
Received message:
"message from queue1"
现在,如果我向 queue1 发送另一条(第二条)消息,它会打印出这条消息,然后 Waiting for queue2 message
。
所以它需要两条消息才能继续循环...我不知道我做错了什么。
你似乎在混合范式;您有消息驱动的侦听器容器,并且您还在使用轮询 (template.receive()
)。通常,queue1、queue2 的容器已经处理了来自这些队列的消息,this
System.out.println("Waiting for " + name + " message");
rabbitTemplate.receive(name);
如果超时<0,将永远阻塞;因此原始消息将永远不会被确认。
我是 spring amqp 的新手,我想创建两个具有不同行为的不同侦听器。问题是我在编译时不知道队列名称,所以我不能使用
我想做的是:从 "sidechannel" 队列中读取(然后删除)第一条消息,它应该看起来像这样 {"queues":["queue1","queue2"]}
。
现在从队列 1 和队列 2 中读取(然后删除)第一条消息。之后,转到步骤1,读取sidechannel的第一条消息
我尝试创建 2 个具有不同侦听器的 SimpleMessageListenerContainer,正如您在下面的代码中看到的那样,但它并没有像我想象的那样工作。
我的代码:
@SpringBootApplication
public class Main implements CommandLineRunner {
final static String queueName = "sidechannel";
@Autowired
AnnotationConfigApplicationContext context;
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("spring-boot-exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueNames(queueName);
container.setMessageListener(sidechannelListener());
return container;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer2() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueNames("queue1","queue2");
container.setMessageListener(queueListener());
return container;
}
@Bean
public MessageListener sidechannelListener() {
return message -> {
String msg = new String(message.getBody());
System.out.println(msg);
try {
Map<String, Object> map = jsonToMap(msg);
for (String name : (ArrayList<String>) map.get("queues")) {
System.out.println("Waiting for " + name + " message");
rabbitTemplate.receive(name);
}
} catch (IOException e) {
e.printStackTrace();
}
};
}
@Bean
public MessageListener queueListener() {
return message -> {
String msg = new String(message.getBody());
System.out.println("Received message: ");
System.out.println(msg);
};
}
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(Main.class, args);
}
@Override
public void run(String... args) throws Exception {
rabbitTemplate.setReceiveTimeout(-1);
while(true) {
System.out.println("Waiting for side channel message");
rabbitTemplate.receive(queueName);
}
// context.close();
}
}
首先,由于某些原因,侧通道队列中的消息在处理后不会被删除。 其次,当我期待这样的输出时:
Waiting for side channel message
{"queues":["queue1","queue2"]}
Waiting for queue1 message
Received message:
"message from queue1"
Waiting for queue2 message
"message from queue2"
Waiting for side channel message
即使我在那些不同的队列上收到消息,也不会发生任何事情(因为 rabbitTemplate.setReceiveTimeout(-1);
),但它会以某种方式对我收到的每条消息做出反应...
此外,我不明白的是,如果我先将消息发送到侧通道,然后发送到队列 1,它会像这样:
Waiting for side channel message
{"queues":["queue1","queue2"]}
Waiting for queue1 message
Received message:
"message from queue1"
现在,如果我向 queue1 发送另一条(第二条)消息,它会打印出这条消息,然后 Waiting for queue2 message
。
所以它需要两条消息才能继续循环...我不知道我做错了什么。
你似乎在混合范式;您有消息驱动的侦听器容器,并且您还在使用轮询 (template.receive()
)。通常,queue1、queue2 的容器已经处理了来自这些队列的消息,this
System.out.println("Waiting for " + name + " message");
rabbitTemplate.receive(name);
如果超时<0,将永远阻塞;因此原始消息将永远不会被确认。