如何使用 spring AMQP 创建 rabbit 侦听器以侦听跨多个虚拟主机的队列
How to create rabbit listeners using spring AMQP to listen to queues across multiple vhosts
我有多个虚拟主机,每个主机都有一个请求队列和一个响应队列。这些虚拟主机服务于不同的客户端。请求队列和响应队列的名称在虚拟主机中保持相同。
我创建了一个 SimpleRoutingConnectionFactory,其中 clientName()+"ConnectionFactory" 作为查找键,相应的 CachingConnectionFactory 作为映射中的值。我可以通过在 convertAndSend 之前将 RabbitTemplate 绑定到虚拟主机然后解除绑定来将消息发布到请求队列。
我无法使用来自不同虚拟主机的响应队列中的消息。我为每个客户端创建了一个 SimpleRabbitListenerContainerFactory。我实现了 RabbitListenerConfigurer 并为每个 SimpleRabbitListenerContainerFactory 注册了一个 SimpleRabbitListenerEndpoint。我还在每个 SimpleRabbitListenerContainerFactory 上设置了 connectionFactory 作为客户端的 CachingConnectionFactory。
@Configuration
public class RabbitConfiguration implements RabbitListenerConfigurer {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private ClientList clients;
@Bean
@Primary
public SimpleRoutingConnectionFactory routingConnectionFactory() {
final var routingConnectionFactory = new SimpleRoutingConnectionFactory();
final Map<Object, ConnectionFactory> routeMap = new HashMap<>();
applicationContext.getBeansOfType(ConnectionFactory.class)
.forEach((beanName, bean) -> {
routeMap.put(beanName, bean);
});
routingConnectionFactory.setTargetConnectionFactories(routeMap);
return routingConnectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(routingConnectionFactory());
}
@Bean
public DirectExchange orbitExchange() {
return new DirectExchange("orbit-exchange");
}
@Bean
public Queue requestQueue() {
return QueueBuilder
.durable("request-queue")
.lazy()
.build();
}
@Bean
public Queue responseQueue() {
return QueueBuilder
.durable("response-queue")
.lazy()
.build();
}
@Bean
public Binding requestBinding() {
return BindingBuilder.bind(requestQueue())
.to(orbitExchange())
.with("orbit-request");
}
@Bean
public Binding responseBinding() {
return BindingBuilder.bind(responseQueue())
.to(orbitExchange())
.with("orbit-response");
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
clients.get()
.stream()
.forEach(client -> {
var endpoint = createEndpoint(client);
var listenerContainerFactory = applicationContext.getBean(client.getName() + "ListenerContainerFactory");
listenerContainerFactory.setConnectionFactory((ConnectionFactory)applicationContext.getBean(client.getName() + "ConnectionFactory"));
registrar.registerEndpoint(endpoint, listenerContainerFactory);
});
}
}
private SimpleRabbitListenerEndpoint createEndpoint(Client client) {
var endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setId(client.getName());
endpoint.setQueueNames("response-queue");
endpoint.setMessageListener(new MessageListenerAdapter(new MessageReceiver(), "receive"));
return endpoint;
}
}
但是,我得到 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer: Failed to check/redeclare auto-delete queue(s). java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
我无法弄清楚是什么原因造成的,因为我根本没有使用 SimpleRoutingConnectionFactory 来处理消息。
编辑:
下面的完整堆栈跟踪 -
ERROR [2020-07-09T04:12:38,028] [amdoListenerEndpoint-1] [TraceId:] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer: Failed to check/redeclare auto-delete queue(s).
java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:120)
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:98)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2089)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2062)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2042)
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueInfo(RabbitAdmin.java:407)
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:391)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.attemptDeclarations(AbstractMessageListenerContainer.java:1836)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1349)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
at java.base/java.lang.Thread.run(Thread.java:834)
编辑2:
我对每个侦听器都使用了 routingConnectionFactory,并使用了 setLookUpKeyQualifier。没有更多例外,但是,听众似乎没有做任何事情,即没有听队列。
@Import(MqConfig.class)
//This is to import CachingConnectinFactory beans and SimpleRabbitListenerContainerFactory beans for all clients
@Configuration
public class RabbitConfiguration implements RabbitListenerConfigurer {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private ClientList clients;
@Bean
@Primary
public SimpleRoutingConnectionFactory routingConnectionFactory() {
final var routingConnectionFactory = new SimpleRoutingConnectionFactory();
final Map<Object, ConnectionFactory> routeMap = new HashMap<>();
applicationContext.getBeansOfType(ConnectionFactory.class)
.forEach((beanName, bean) -> {
routeMap.put(beanName+"[response-queue]", bean);
});
routingConnectionFactory.setTargetConnectionFactories(routeMap);
return routingConnectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(routingConnectionFactory());
}
@Bean
public DirectExchange orbitExchange() {
return new DirectExchange("orbit-exchange");
}
@Bean
public Queue requestQueue() {
return QueueBuilder
.durable("request-queue")
.lazy()
.build();
}
@Bean
public Queue responseQueue() {
return QueueBuilder
.durable("response-queue")
.lazy()
.build();
}
@Bean
public Binding requestBinding() {
return BindingBuilder.bind(requestQueue())
.to(orbitExchange())
.with("orbit-request");
}
@Bean
public Binding responseBinding() {
return BindingBuilder.bind(responseQueue())
.to(orbitExchange())
.with("orbit-response");
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
clients.get()
.stream()
.forEach(client -> {
var endpoint = createEndpoint(client);
var listenerContainerFactory = getListenerContainerFactory(Client client);
listenerContainerFactory.setConnectionFactory((ConnectionFactory)applicationContext.getBean(client.getName() + "ConnectionFactory"));
registrar.registerEndpoint(endpoint, listenerContainerFactory);
});
}
}
private SimpleRabbitListenerEndpoint createEndpoint(Client client) {
var endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setId(client.getName());
endpoint.setQueueNames("response-queue");
endpoint.setMessageListener(new MessageListenerAdapter(new MessageReceiver(), "receive"));
return endpoint;
}
private SimpleRabbitListenerContainerFactory getListenerContainerFactory(Client client) {
var listenerContainerFactory = (SimpleRabbitListenerContainerFactory) applicationContext.getBean(client.getName() + "ListenerContainerFactory");
listenerContainerFactory.setConnectionFactory(routingConnectionFactory());
listenerContainerFactory.setContainerCustomizer(container -> {
container.setQueueNames("response-queue");
container.setLookupKeyQualifier(client.getName());
container.setMessageListener(message -> log.info("Received message"));
});
return listenerContainerFactory;
}
}
发生了一些非常奇怪的事情; [null]
意味着当我们调用 getRoutingLookupKey()
时 cf 不是路由 cf 但当我们调用 getConnectionFactory()
时它是。
不清楚这是怎么发生的。也许您可以在调试器中找出原因?
一种解决方案是注入路由 cf 并使用 setLookupKeyQualifier(...)
.
查找键将是 clientId[queueName]
。
我有多个虚拟主机,每个主机都有一个请求队列和一个响应队列。这些虚拟主机服务于不同的客户端。请求队列和响应队列的名称在虚拟主机中保持相同。
我创建了一个 SimpleRoutingConnectionFactory,其中 clientName()+"ConnectionFactory" 作为查找键,相应的 CachingConnectionFactory 作为映射中的值。我可以通过在 convertAndSend 之前将 RabbitTemplate 绑定到虚拟主机然后解除绑定来将消息发布到请求队列。
我无法使用来自不同虚拟主机的响应队列中的消息。我为每个客户端创建了一个 SimpleRabbitListenerContainerFactory。我实现了 RabbitListenerConfigurer 并为每个 SimpleRabbitListenerContainerFactory 注册了一个 SimpleRabbitListenerEndpoint。我还在每个 SimpleRabbitListenerContainerFactory 上设置了 connectionFactory 作为客户端的 CachingConnectionFactory。
@Configuration
public class RabbitConfiguration implements RabbitListenerConfigurer {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private ClientList clients;
@Bean
@Primary
public SimpleRoutingConnectionFactory routingConnectionFactory() {
final var routingConnectionFactory = new SimpleRoutingConnectionFactory();
final Map<Object, ConnectionFactory> routeMap = new HashMap<>();
applicationContext.getBeansOfType(ConnectionFactory.class)
.forEach((beanName, bean) -> {
routeMap.put(beanName, bean);
});
routingConnectionFactory.setTargetConnectionFactories(routeMap);
return routingConnectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(routingConnectionFactory());
}
@Bean
public DirectExchange orbitExchange() {
return new DirectExchange("orbit-exchange");
}
@Bean
public Queue requestQueue() {
return QueueBuilder
.durable("request-queue")
.lazy()
.build();
}
@Bean
public Queue responseQueue() {
return QueueBuilder
.durable("response-queue")
.lazy()
.build();
}
@Bean
public Binding requestBinding() {
return BindingBuilder.bind(requestQueue())
.to(orbitExchange())
.with("orbit-request");
}
@Bean
public Binding responseBinding() {
return BindingBuilder.bind(responseQueue())
.to(orbitExchange())
.with("orbit-response");
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
clients.get()
.stream()
.forEach(client -> {
var endpoint = createEndpoint(client);
var listenerContainerFactory = applicationContext.getBean(client.getName() + "ListenerContainerFactory");
listenerContainerFactory.setConnectionFactory((ConnectionFactory)applicationContext.getBean(client.getName() + "ConnectionFactory"));
registrar.registerEndpoint(endpoint, listenerContainerFactory);
});
}
}
private SimpleRabbitListenerEndpoint createEndpoint(Client client) {
var endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setId(client.getName());
endpoint.setQueueNames("response-queue");
endpoint.setMessageListener(new MessageListenerAdapter(new MessageReceiver(), "receive"));
return endpoint;
}
}
但是,我得到 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer: Failed to check/redeclare auto-delete queue(s). java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
我无法弄清楚是什么原因造成的,因为我根本没有使用 SimpleRoutingConnectionFactory 来处理消息。
编辑: 下面的完整堆栈跟踪 -
ERROR [2020-07-09T04:12:38,028] [amdoListenerEndpoint-1] [TraceId:] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer: Failed to check/redeclare auto-delete queue(s).
java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:120)
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:98)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2089)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2062)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2042)
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueInfo(RabbitAdmin.java:407)
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:391)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.attemptDeclarations(AbstractMessageListenerContainer.java:1836)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1349)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
at java.base/java.lang.Thread.run(Thread.java:834)
编辑2: 我对每个侦听器都使用了 routingConnectionFactory,并使用了 setLookUpKeyQualifier。没有更多例外,但是,听众似乎没有做任何事情,即没有听队列。
@Import(MqConfig.class)
//This is to import CachingConnectinFactory beans and SimpleRabbitListenerContainerFactory beans for all clients
@Configuration
public class RabbitConfiguration implements RabbitListenerConfigurer {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private ClientList clients;
@Bean
@Primary
public SimpleRoutingConnectionFactory routingConnectionFactory() {
final var routingConnectionFactory = new SimpleRoutingConnectionFactory();
final Map<Object, ConnectionFactory> routeMap = new HashMap<>();
applicationContext.getBeansOfType(ConnectionFactory.class)
.forEach((beanName, bean) -> {
routeMap.put(beanName+"[response-queue]", bean);
});
routingConnectionFactory.setTargetConnectionFactories(routeMap);
return routingConnectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(routingConnectionFactory());
}
@Bean
public DirectExchange orbitExchange() {
return new DirectExchange("orbit-exchange");
}
@Bean
public Queue requestQueue() {
return QueueBuilder
.durable("request-queue")
.lazy()
.build();
}
@Bean
public Queue responseQueue() {
return QueueBuilder
.durable("response-queue")
.lazy()
.build();
}
@Bean
public Binding requestBinding() {
return BindingBuilder.bind(requestQueue())
.to(orbitExchange())
.with("orbit-request");
}
@Bean
public Binding responseBinding() {
return BindingBuilder.bind(responseQueue())
.to(orbitExchange())
.with("orbit-response");
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
clients.get()
.stream()
.forEach(client -> {
var endpoint = createEndpoint(client);
var listenerContainerFactory = getListenerContainerFactory(Client client);
listenerContainerFactory.setConnectionFactory((ConnectionFactory)applicationContext.getBean(client.getName() + "ConnectionFactory"));
registrar.registerEndpoint(endpoint, listenerContainerFactory);
});
}
}
private SimpleRabbitListenerEndpoint createEndpoint(Client client) {
var endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setId(client.getName());
endpoint.setQueueNames("response-queue");
endpoint.setMessageListener(new MessageListenerAdapter(new MessageReceiver(), "receive"));
return endpoint;
}
private SimpleRabbitListenerContainerFactory getListenerContainerFactory(Client client) {
var listenerContainerFactory = (SimpleRabbitListenerContainerFactory) applicationContext.getBean(client.getName() + "ListenerContainerFactory");
listenerContainerFactory.setConnectionFactory(routingConnectionFactory());
listenerContainerFactory.setContainerCustomizer(container -> {
container.setQueueNames("response-queue");
container.setLookupKeyQualifier(client.getName());
container.setMessageListener(message -> log.info("Received message"));
});
return listenerContainerFactory;
}
}
发生了一些非常奇怪的事情; [null]
意味着当我们调用 getRoutingLookupKey()
时 cf 不是路由 cf 但当我们调用 getConnectionFactory()
时它是。
不清楚这是怎么发生的。也许您可以在调试器中找出原因?
一种解决方案是注入路由 cf 并使用 setLookupKeyQualifier(...)
.
查找键将是 clientId[queueName]
。