spring-jms - 通过 jms 配置进行侦听器交换和绑定队列
spring-jms - listener exchange and bind queue by jms configuration
我有一个项目 spring-jms
我正在尝试与交易所合作。我创建了 4 个监听器,它们都默认绑定到名为 'jms.durable.queues' 的交换中。甚至以为我在 Rabbit 控制台上创建自己的交换并手动绑定队列,spring 正在创建默认交换。
我如何创建自己的队列并绑定到我的交换器或禁用 [=35=] 创建队列并绑定到默认交换器 'jms.durable.queues'?
我的配置class
import javax.jms.ConnectionFactory;
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
@EnableJms
@Configuration
public class ConnectionQueueConfig {
@Bean
public ConnectionFactory jmsConnectionRabbitFactory(@Autowired RabbitProperties rabbitProperties) {
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setUsername(rabbitProperties.getUser());
connectionFactory.setPassword(rabbitProperties.getPass());
connectionFactory.setVirtualHost(rabbitProperties.getVirtualhost());
connectionFactory.setHost(rabbitProperties.getHost());
connectionFactory.setPort(rabbitProperties.getPort());
return connectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory( @Autowired ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAutoStartup(true);
return factory;
}
@Bean
public JmsTemplate defaultJmsTemplate(@Autowired ConnectionFactory connectionFactory) {
return new JmsTemplate(connectionFactory);
}
}
我的听众
@JmsListener(destination = "queue-1-from-exchange-A" )
public void messageConsumer1(@Payload Message message, @Headers MessageHeaders headers){
}
@JmsListener(destination = "queue-2-from-exchange-A" )
public void messageConsumer2(@Payload Message message, @Headers MessageHeaders headers){
}
@JmsListener(destination = "queue-1-from-exchange-B" )
public void messageConsumer3(@Payload Message message, @Headers MessageHeaders headers){
}
@JmsListener(destination = "queue-2-from-exchange-B" )
public void messageConsumer4(@Payload Message message, @Headers MessageHeaders headers){
}
依赖关系
implementation 'org.springframework:spring-jms:5.3.13'
implementation 'com.rabbitmq.jms:rabbitmq-jms:2.3.0'
我看到了关于 RMQDestination.class
,我可以用它来创建我的两个交换器和队列吗?
在 spring-jms 配置上是否有任何 spring 解析器以编程方式管理目标?
伪代码示例
someSpringResolverDestination.setDestination(new RMQDestination());
someSpringResolverDestination.setDestination(new RMQDestination());
在 RabbitMQ JMS 客户端中查看此 class 的源代码:https://github.com/rabbitmq/rabbitmq-jms-client/blob/main/src/main/java/com/rabbitmq/jms/admin/RMQDestination.java。
由于您没有明确提供交换器,因此队列名称实际上已绑定到该默认交换器。
查看有关此 JMS 客户端的更多文档以及如何为特定交换和绑定手动声明目的地:https://www.rabbitmq.com/jms-client.html。
更新
仔细查看该文档:https://www.rabbitmq.com/jms-client.html#destination-interoperability
@Bean
public Destination jmsDestination() {
RMQDestination jmsDestination = new RMQDestination();
jmsDestination.setDestinationName("myQueue");
jmsDestination.setAmqp(true);
jmsDestination.setAmqpQueueName("rabbitQueueName");
return jmsDestination;
}
然后查看 @JmsListener
JavaDocs:
/**
* The destination name for this listener, resolved through the container-wide
* {@link org.springframework.jms.support.destination.DestinationResolver} strategy.
*/
String destination();
JmsDestinationAccessor
默认使用 DynamicDestinationResolver
,这可能不会在这里为我们工作,因为它会做你目前所做的一切 jms.durable.queues
交换.
因此,您的解决方案要考虑的另一个选项是自定义:
/**
* The bean name of the {@link org.springframework.jms.config.JmsListenerContainerFactory}
* to use to create the message listener container responsible for serving this endpoint.
* <p>If not specified, the default container factory is used, if any.
*/
String containerFactory() default "";
因此,您需要为 DefaultJmsListenerContainerFactory
添加一个 bean 并向其 setDestinationResolver(DestinationResolver)
注入一个 BeanFactoryDestinationResolver
。然后,您在 @JmsListener
方法的 destination
选项中提供一个 RMQDestination
bean 名称,BeanFactory
将为您提供一个具有所需交换和绑定的适当 RMQDestination
.
只是发布解决方案(@Artem Bilan 帮助了我)
我添加了一个 DestinationResolver()
用于将侦听器连接到我的队列,并且成功了。
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory( @Autowired ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(new DestinationResolver() {
@Override
public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
RMQDestination jmsDestination = new RMQDestination();
jmsDestination.setDestinationName(destinationName);
jmsDestination.setAmqpQueueName(destinationName);
jmsDestination.setAmqp(true);
return jmsDestination;
}
});
factory.setAutoStartup(true);
return factory;
}
我有一个项目 spring-jms
我正在尝试与交易所合作。我创建了 4 个监听器,它们都默认绑定到名为 'jms.durable.queues' 的交换中。甚至以为我在 Rabbit 控制台上创建自己的交换并手动绑定队列,spring 正在创建默认交换。
我如何创建自己的队列并绑定到我的交换器或禁用 [=35=] 创建队列并绑定到默认交换器 'jms.durable.queues'?
我的配置class
import javax.jms.ConnectionFactory;
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
@EnableJms
@Configuration
public class ConnectionQueueConfig {
@Bean
public ConnectionFactory jmsConnectionRabbitFactory(@Autowired RabbitProperties rabbitProperties) {
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setUsername(rabbitProperties.getUser());
connectionFactory.setPassword(rabbitProperties.getPass());
connectionFactory.setVirtualHost(rabbitProperties.getVirtualhost());
connectionFactory.setHost(rabbitProperties.getHost());
connectionFactory.setPort(rabbitProperties.getPort());
return connectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory( @Autowired ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAutoStartup(true);
return factory;
}
@Bean
public JmsTemplate defaultJmsTemplate(@Autowired ConnectionFactory connectionFactory) {
return new JmsTemplate(connectionFactory);
}
}
我的听众
@JmsListener(destination = "queue-1-from-exchange-A" )
public void messageConsumer1(@Payload Message message, @Headers MessageHeaders headers){
}
@JmsListener(destination = "queue-2-from-exchange-A" )
public void messageConsumer2(@Payload Message message, @Headers MessageHeaders headers){
}
@JmsListener(destination = "queue-1-from-exchange-B" )
public void messageConsumer3(@Payload Message message, @Headers MessageHeaders headers){
}
@JmsListener(destination = "queue-2-from-exchange-B" )
public void messageConsumer4(@Payload Message message, @Headers MessageHeaders headers){
}
依赖关系
implementation 'org.springframework:spring-jms:5.3.13'
implementation 'com.rabbitmq.jms:rabbitmq-jms:2.3.0'
我看到了关于 RMQDestination.class
,我可以用它来创建我的两个交换器和队列吗?
在 spring-jms 配置上是否有任何 spring 解析器以编程方式管理目标?
伪代码示例
someSpringResolverDestination.setDestination(new RMQDestination());
someSpringResolverDestination.setDestination(new RMQDestination());
在 RabbitMQ JMS 客户端中查看此 class 的源代码:https://github.com/rabbitmq/rabbitmq-jms-client/blob/main/src/main/java/com/rabbitmq/jms/admin/RMQDestination.java。
由于您没有明确提供交换器,因此队列名称实际上已绑定到该默认交换器。
查看有关此 JMS 客户端的更多文档以及如何为特定交换和绑定手动声明目的地:https://www.rabbitmq.com/jms-client.html。
更新
仔细查看该文档:https://www.rabbitmq.com/jms-client.html#destination-interoperability
@Bean
public Destination jmsDestination() {
RMQDestination jmsDestination = new RMQDestination();
jmsDestination.setDestinationName("myQueue");
jmsDestination.setAmqp(true);
jmsDestination.setAmqpQueueName("rabbitQueueName");
return jmsDestination;
}
然后查看 @JmsListener
JavaDocs:
/**
* The destination name for this listener, resolved through the container-wide
* {@link org.springframework.jms.support.destination.DestinationResolver} strategy.
*/
String destination();
JmsDestinationAccessor
默认使用 DynamicDestinationResolver
,这可能不会在这里为我们工作,因为它会做你目前所做的一切 jms.durable.queues
交换.
因此,您的解决方案要考虑的另一个选项是自定义:
/**
* The bean name of the {@link org.springframework.jms.config.JmsListenerContainerFactory}
* to use to create the message listener container responsible for serving this endpoint.
* <p>If not specified, the default container factory is used, if any.
*/
String containerFactory() default "";
因此,您需要为 DefaultJmsListenerContainerFactory
添加一个 bean 并向其 setDestinationResolver(DestinationResolver)
注入一个 BeanFactoryDestinationResolver
。然后,您在 @JmsListener
方法的 destination
选项中提供一个 RMQDestination
bean 名称,BeanFactory
将为您提供一个具有所需交换和绑定的适当 RMQDestination
.
只是发布解决方案(@Artem Bilan 帮助了我)
我添加了一个 DestinationResolver()
用于将侦听器连接到我的队列,并且成功了。
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory( @Autowired ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(new DestinationResolver() {
@Override
public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
RMQDestination jmsDestination = new RMQDestination();
jmsDestination.setDestinationName(destinationName);
jmsDestination.setAmqpQueueName(destinationName);
jmsDestination.setAmqp(true);
return jmsDestination;
}
});
factory.setAutoStartup(true);
return factory;
}