spring-jms - 通过 jms 配置进行侦听器交换和绑定队列

spring-jms - listener exchange and bind queue by jms configuration

我有一个项目 spring-jms

我正在尝试与交易所合作。我创建了 4 个监听器,它们都默认绑定到名为 'jms.durable.queues' 的交换中。甚至以为我在 Rabbit 控制台上创建自己的交换并手动绑定队列,spring 正在创建默认交换。

我如何创建自己的队列并绑定到我的交换器或禁用 [​​=35=] 创建队列并绑定到默认交换器 'jms.durable.queues'?

我的配置class


import javax.jms.ConnectionFactory;

import com.rabbitmq.jms.admin.RMQConnectionFactory;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

@EnableJms
@Configuration
public class ConnectionQueueConfig {

    @Bean
    public ConnectionFactory jmsConnectionRabbitFactory(@Autowired RabbitProperties rabbitProperties) {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setUsername(rabbitProperties.getUser());
        connectionFactory.setPassword(rabbitProperties.getPass());
        connectionFactory.setVirtualHost(rabbitProperties.getVirtualhost());
        connectionFactory.setHost(rabbitProperties.getHost());
        connectionFactory.setPort(rabbitProperties.getPort());
        return connectionFactory;
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory( @Autowired ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public JmsTemplate defaultJmsTemplate(@Autowired ConnectionFactory connectionFactory) {
        return new JmsTemplate(connectionFactory);
    }


}

我的听众


    @JmsListener(destination = "queue-1-from-exchange-A" )
    public void messageConsumer1(@Payload Message message,  @Headers MessageHeaders headers){
    }

    @JmsListener(destination = "queue-2-from-exchange-A" )
    public void messageConsumer2(@Payload Message message,  @Headers MessageHeaders headers){
    }

    @JmsListener(destination = "queue-1-from-exchange-B" )
    public void messageConsumer3(@Payload Message message,  @Headers MessageHeaders headers){
    }

    @JmsListener(destination = "queue-2-from-exchange-B" )
    public void messageConsumer4(@Payload Message message,  @Headers MessageHeaders headers){
    }

依赖关系

    implementation 'org.springframework:spring-jms:5.3.13'
    implementation 'com.rabbitmq.jms:rabbitmq-jms:2.3.0'

我看到了关于 RMQDestination.class,我可以用它来创建我的两个交换器和队列吗? 在 spring-jms 配置上是否有任何 spring 解析器以编程方式管理目标?

伪代码示例

someSpringResolverDestination.setDestination(new RMQDestination());
someSpringResolverDestination.setDestination(new RMQDestination());

在 RabbitMQ JMS 客户端中查看此 class 的源代码:https://github.com/rabbitmq/rabbitmq-jms-client/blob/main/src/main/java/com/rabbitmq/jms/admin/RMQDestination.java

由于您没有明确提供交换器,因此队列名称实际上已绑定到该默认交换器。

查看有关此 JMS 客户端的更多文档以及如何为特定交换和绑定手动声明目的地:https://www.rabbitmq.com/jms-client.html

更新

仔细查看该文档:https://www.rabbitmq.com/jms-client.html#destination-interoperability

@Bean
public Destination jmsDestination() {
    RMQDestination jmsDestination = new RMQDestination();
    jmsDestination.setDestinationName("myQueue");
    jmsDestination.setAmqp(true);
    jmsDestination.setAmqpQueueName("rabbitQueueName");
    return jmsDestination;
}

然后查看 @JmsListener JavaDocs:

/**
 * The destination name for this listener, resolved through the container-wide
 * {@link org.springframework.jms.support.destination.DestinationResolver} strategy.
 */
String destination();

JmsDestinationAccessor 默认使用 DynamicDestinationResolver,这可能不会在这里为我们工作,因为它会做你目前所做的一切 jms.durable.queues交换.

因此,您的解决方案要考虑的另一个选项是自定义:

/**
 * The bean name of the {@link org.springframework.jms.config.JmsListenerContainerFactory}
 * to use to create the message listener container responsible for serving this endpoint.
 * <p>If not specified, the default container factory is used, if any.
 */
String containerFactory() default "";

因此,您需要为 DefaultJmsListenerContainerFactory 添加一个 bean 并向其 setDestinationResolver(DestinationResolver) 注入一个 BeanFactoryDestinationResolver。然后,您在 @JmsListener 方法的 destination 选项中提供一个 RMQDestination bean 名称,BeanFactory 将为您提供一个具有所需交换和绑定的适当 RMQDestination .

只是发布解决方案(@Artem Bilan 帮助了我)

我添加了一个 DestinationResolver() 用于将侦听器连接到我的队列,并且成功了。


@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory( @Autowired ConnectionFactory connectionFactory) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);

    factory.setDestinationResolver(new DestinationResolver() {
        @Override
        public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)  throws JMSException {
            RMQDestination jmsDestination = new RMQDestination();
            jmsDestination.setDestinationName(destinationName);
            jmsDestination.setAmqpQueueName(destinationName);
            jmsDestination.setAmqp(true);
            return jmsDestination;
        }
    });
    
    factory.setAutoStartup(true);
    return factory;
}