如何使用 spring AMQP 创建 rabbit 侦听器以侦听跨多个虚拟主机的队列

How to create rabbit listeners using spring AMQP to listen to queues across multiple vhosts

我有多个虚拟主机,每个主机都有一个请求队列和一个响应队列。这些虚拟主机服务于不同的客户端。请求队列和响应队列的名称在虚拟主机中保持相同。

我创建了一个 SimpleRoutingConnectionFactory,其中 clientName()+"ConnectionFactory" 作为查找键,相应的 CachingConnectionFactory 作为映射中的值。我可以通过在 convertAndSend 之前将 RabbitTemplate 绑定到虚拟主机然后解除绑定来将消息发布到请求队列。

我无法使用来自不同虚拟主机的响应队列中的消息。我为每个客户端创建了一个 SimpleRabbitListenerContainerFactory。我实现了 RabbitListenerConfigurer 并为每个 SimpleRabbitListenerContainerFactory 注册了一个 SimpleRabbitListenerEndpoint。我还在每个 SimpleRabbitListenerContainerFactory 上设置了 connectionFactory 作为客户端的 CachingConnectionFactory。


@Configuration
public class RabbitConfiguration implements RabbitListenerConfigurer {

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private ClientList clients;

    @Bean
    @Primary
    public SimpleRoutingConnectionFactory routingConnectionFactory() {
        final var routingConnectionFactory = new SimpleRoutingConnectionFactory();
        final Map<Object, ConnectionFactory> routeMap = new HashMap<>();
        applicationContext.getBeansOfType(ConnectionFactory.class)
                .forEach((beanName, bean) -> {
                    routeMap.put(beanName, bean);
                });
        routingConnectionFactory.setTargetConnectionFactories(routeMap);
        return routingConnectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(routingConnectionFactory());
    }

    @Bean
    public DirectExchange orbitExchange() {
        return new DirectExchange("orbit-exchange");
    }

    @Bean
    public Queue requestQueue() {
        return QueueBuilder
                .durable("request-queue")
                .lazy()
                .build();
    }

    @Bean
    public Queue responseQueue() {
        return QueueBuilder
                .durable("response-queue")
                .lazy()
                .build();
    }

    @Bean
    public Binding requestBinding() {
        return BindingBuilder.bind(requestQueue())
                .to(orbitExchange())
                .with("orbit-request");
    }

    @Bean
    public Binding responseBinding() {
        return BindingBuilder.bind(responseQueue())
                .to(orbitExchange())
                .with("orbit-response");
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
            clients.get()
                    .stream()
                    .forEach(client -> {
                        var endpoint = createEndpoint(client);
                        var listenerContainerFactory = applicationContext.getBean(client.getName() + "ListenerContainerFactory");
                        listenerContainerFactory.setConnectionFactory((ConnectionFactory)applicationContext.getBean(client.getName() + "ConnectionFactory"));
                        registrar.registerEndpoint(endpoint, listenerContainerFactory);
                    });
        }
    }

    private SimpleRabbitListenerEndpoint createEndpoint(Client client) {
        var endpoint = new SimpleRabbitListenerEndpoint();
        endpoint.setId(client.getName());
        endpoint.setQueueNames("response-queue");
        endpoint.setMessageListener(new MessageListenerAdapter(new MessageReceiver(), "receive"));
        return endpoint;
    }

}

但是,我得到 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer: Failed to check/redeclare auto-delete queue(s). java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]

我无法弄清楚是什么原因造成的,因为我根本没有使用 SimpleRoutingConnectionFactory 来处理消息。

编辑: 下面的完整堆栈跟踪 -

ERROR [2020-07-09T04:12:38,028] [amdoListenerEndpoint-1] [TraceId:] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer: Failed to check/redeclare auto-delete queue(s).
java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
        at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:120)
        at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:98)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2089)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2062)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2042)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueInfo(RabbitAdmin.java:407)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:391)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.attemptDeclarations(AbstractMessageListenerContainer.java:1836)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1817)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1349)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
        at java.base/java.lang.Thread.run(Thread.java:834)

编辑2: 我对每个侦听器都使用了 routingConnectionFactory,并使用了 setLookUpKeyQualifier。没有更多例外,但是,听众似乎没有做任何事情,即没有听队列。

@Import(MqConfig.class) 
//This is to import CachingConnectinFactory beans and SimpleRabbitListenerContainerFactory beans for all clients
@Configuration
public class RabbitConfiguration implements RabbitListenerConfigurer {

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private ClientList clients;

    @Bean
    @Primary
    public SimpleRoutingConnectionFactory routingConnectionFactory() {
        final var routingConnectionFactory = new SimpleRoutingConnectionFactory();
        final Map<Object, ConnectionFactory> routeMap = new HashMap<>();
        applicationContext.getBeansOfType(ConnectionFactory.class)
                .forEach((beanName, bean) -> {
                    routeMap.put(beanName+"[response-queue]", bean);
                });
        routingConnectionFactory.setTargetConnectionFactories(routeMap);
        return routingConnectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(routingConnectionFactory());
    }

    @Bean
    public DirectExchange orbitExchange() {
        return new DirectExchange("orbit-exchange");
    }

    @Bean
    public Queue requestQueue() {
        return QueueBuilder
                .durable("request-queue")
                .lazy()
                .build();
    }

    @Bean
    public Queue responseQueue() {
        return QueueBuilder
                .durable("response-queue")
                .lazy()
                .build();
    }

    @Bean
    public Binding requestBinding() {
        return BindingBuilder.bind(requestQueue())
                .to(orbitExchange())
                .with("orbit-request");
    }

    @Bean
    public Binding responseBinding() {
        return BindingBuilder.bind(responseQueue())
                .to(orbitExchange())
                .with("orbit-response");
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
            clients.get()
                    .stream()
                    .forEach(client -> {
                        var endpoint = createEndpoint(client);
                        var listenerContainerFactory = getListenerContainerFactory(Client client);
                        listenerContainerFactory.setConnectionFactory((ConnectionFactory)applicationContext.getBean(client.getName() + "ConnectionFactory"));
                        registrar.registerEndpoint(endpoint, listenerContainerFactory);
                    });
        }
    }

    private SimpleRabbitListenerEndpoint createEndpoint(Client client) {
        var endpoint = new SimpleRabbitListenerEndpoint();
        endpoint.setId(client.getName());
        endpoint.setQueueNames("response-queue");
        endpoint.setMessageListener(new MessageListenerAdapter(new MessageReceiver(), "receive"));
        return endpoint;
    }

    private SimpleRabbitListenerContainerFactory getListenerContainerFactory(Client client) {
        var listenerContainerFactory =  (SimpleRabbitListenerContainerFactory) applicationContext.getBean(client.getName() + "ListenerContainerFactory");
        listenerContainerFactory.setConnectionFactory(routingConnectionFactory());
        listenerContainerFactory.setContainerCustomizer(container -> {
            container.setQueueNames("response-queue");
            container.setLookupKeyQualifier(client.getName());
            container.setMessageListener(message -> log.info("Received message"));
        });
        return listenerContainerFactory;
    }

}

发生了一些非常奇怪的事情; [null] 意味着当我们调用 getRoutingLookupKey() 时 cf 不是路由 cf 但当我们调用 getConnectionFactory() 时它是。

不清楚这是怎么发生的。也许您可以在调试器中找出原因?

一种解决方案是注入路由 cf 并使用 setLookupKeyQualifier(...).

查找键将是 clientId[queueName]