Spring 集成 JMS 创建 ActiveMQ 队列而不是主题
Spring Integration JMS creating ActiveMQ queue instead of topic
我正在尝试使用 ActiveMQ 代理向两个收听自动主题的消费者发送消息,使用 Spring 集成工具。
这是我的配置 bean(在发布者和订阅者之间是通用的):
@Value("${spring.activemq.broker-url}")
String brokerUrl;
@Value("${spring.activemq.user}")
String userName;
@Value("${spring.activemq.password}")
String password;
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerUrl);
connectionFactory.setUserName(userName);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true); //!!
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setPubSubDomain(true); //!!
return template;
}
以下是消费者的 bean:
@Bean(name = "jmsInputChannel")
public MessageChannel jmsInputChannel() {
return new PublishSubscribeChannel();
}
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
.channel("jmsInputChannel").get();
}
//Consumes the message.
@ServiceActivator(inputChannel="jmsInputChannel")
public void receive(String msg){
System.out.println("Received Message: " + msg);
}
这些是生产者的豆子:
@Bean(name = "jmsOutputChannel")
public MessageChannel jmsOutputChannel() {
return new PublishSubscribeChannel();
}
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(connectionFactory())
.destination("myTopic")
).get();
}
private static int counter = 1;
@Scheduled(initialDelay=5000, fixedDelay=2000)
public void send() {
String s = "Message number " + counter;
counter++;
jmsOutputChannel().send(MessageBuilder.withPayload(s).build());
}
我没有使用嵌入式 ActiveMQ 代理。我在各自的 (docker) 容器中使用一个代理、一个生产者和两个消费者。
我的问题是,虽然我在 JmsListenerContainerFactory
和 JmsTemplate
上都调用了 setPubSubDomain(true)
,但我的 "topics" 表现为队列:一个消费者打印所有偶数消息,而另一个打印所有奇数消息。
实际上,通过访问ActiveMQ web界面,我看到我的"topics"(即在/topics.jsp页面下)被命名为ActiveMQ.Advisory.Consumer.Queue.myTopic
和ActiveMQ.Advisory.Producer.Queue.myTopic
, "myTopic" 确实出现在队列页面中(即 /queues.jsp)。
节点按以下顺序启动:
- AMQ 经纪人
- 消费者 1
- 消费者 2
- 制作人
创建的第一个 "topic" 是 ActiveMQ.Advisory.Consumer.Queue.myTopic
,而生产者显然只有在生产者启动后才会出现。
我不是 ActiveMQ 方面的专家,所以我的 producer/consumer "topics" 被命名为“.Queue”的事实可能只是一种误导。但是,我确实得到了 official ActiveMQ documentation 中描述的队列语义,而不是主题。
我也看过 ,但是我使用的所有频道都已经是 PublishSubscribeChannel 类型了。
我需要实现的是将所有消息传递给我的所有(可能> 2 个)消费者。
更新: 我忘了说,我的 application.properties
文件已经包含 spring.jms.pub-sub-domain=true
,以及其他设置。
另外,我使用的Spring集成版本是4.3.12.RELEASE.
问题是,我得到的仍然是 RR 负载平衡语义,而不是发布-订阅语义。
至于我在@Hassen Bennour 提供的 link 中看到的内容,我希望在所有主题的列表中得到一个 ActiveMQ.Advisory.Producer.Topic.myTopic
和一个 ActiveMQ.Advisory.Consumer.Topic.myTopic
行。不知何故,我认为我没有很好地使用 Spring 集成库,因此当我想设置一个主题时我正在设置一个队列。
更新 2:对于造成的混乱,我们深表歉意。 jmsOutputChannel2
这里其实是jmsOutputChannel
,我把主要的部分编辑了。我在我的代码中使用辅助 "topic" 作为检查,生产者可以向其发送消息并自己接收回复。 "topic" 名称也不同,所以...它完全在一个单独的流程中。
我通过这种方式更改接收流程确实取得了一点进展:
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
//return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
//.channel("jmsInputChannel").get();
return IntegrationFlows.from(Jms.publishSubscribeChannel(connectionFactory()).destination("myTopic")) //Jms.publishSubscribeChannel() rather than Jms.messageDrivenChannelAdapter()
.channel("jmsInputChannel").get();
}
这会在代理上生成一个类型为 Consumer.Topic.myTopic
而不是 Consumer.Queue.myTopic
的咨询主题,并且实际上是一个名为 myTopic
的主题(正如我在主题选项卡中看到的那样)。但是,一旦生产者启动,就会创建一个 Producer.Queue
咨询主题,并且消息会在未传送的情况下发送到那里。
输入流中适配器的选择似乎决定了创建哪种咨询消费者主题(从 Jms.messageDrivenChannelAdapter()
切换到 Jms.publishSubscribeChannel()
时的主题与队列)。但是,我一直无法找到类似于输出流的内容。
更新 3:问题已解决,感谢@Hassen Bennour。回顾:
我在制作人的 Jms.outboundAdapter()
中连接了 jmsTemplate()
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jsaTemplate())
.destination("myTopic")
).get();
}
消费者的配置更复杂 Jms.messageDrivenChannelAdapter()
:
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
Jms.container(connectionFactory(),"myTopic")
.pubSubDomain(true).get()) )
.channel("jmsInputChannel").get();
}
虽然这个可能是最流畅最灵活的方法了,有这么一个bean...
@Bean
public Topic topic() {
return new ActiveMQTopic("myTopic");
}
连接作为适配器的目的地,而不仅仅是一个字符串。
再次感谢。
将 spring.jms.pub-sub-domain=true 添加到 application.properties
或
@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// the configurer will use PubSubDomain from application.properties if defined or false if not
//so setting it on the factory level need to be set after this
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
ActiveMQ.Advisory.Consumer.Queue.myTopic
是名为 myTopic 的队列的咨询主题
看看这里阅读咨询
http://activemq.apache.org/advisory-message.html
更新:
更新您的定义如下
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jmsTemplate())
.destination("myTopic")
).get();
}
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
Jms.container(connectionFactory(),"myTopic")
.pubSubDomain(true).get()) )
.channel("jmsInputChannel").get();
}
或将 Destination 定义为主题并将 destination("myTopic") 替换为 destination(topic())
@Bean
public Topic topic() {
return new ActiveMQTopic("myTopic");
}
我正在尝试使用 ActiveMQ 代理向两个收听自动主题的消费者发送消息,使用 Spring 集成工具。
这是我的配置 bean(在发布者和订阅者之间是通用的):
@Value("${spring.activemq.broker-url}")
String brokerUrl;
@Value("${spring.activemq.user}")
String userName;
@Value("${spring.activemq.password}")
String password;
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerUrl);
connectionFactory.setUserName(userName);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true); //!!
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setPubSubDomain(true); //!!
return template;
}
以下是消费者的 bean:
@Bean(name = "jmsInputChannel")
public MessageChannel jmsInputChannel() {
return new PublishSubscribeChannel();
}
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
.channel("jmsInputChannel").get();
}
//Consumes the message.
@ServiceActivator(inputChannel="jmsInputChannel")
public void receive(String msg){
System.out.println("Received Message: " + msg);
}
这些是生产者的豆子:
@Bean(name = "jmsOutputChannel")
public MessageChannel jmsOutputChannel() {
return new PublishSubscribeChannel();
}
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(connectionFactory())
.destination("myTopic")
).get();
}
private static int counter = 1;
@Scheduled(initialDelay=5000, fixedDelay=2000)
public void send() {
String s = "Message number " + counter;
counter++;
jmsOutputChannel().send(MessageBuilder.withPayload(s).build());
}
我没有使用嵌入式 ActiveMQ 代理。我在各自的 (docker) 容器中使用一个代理、一个生产者和两个消费者。
我的问题是,虽然我在 JmsListenerContainerFactory
和 JmsTemplate
上都调用了 setPubSubDomain(true)
,但我的 "topics" 表现为队列:一个消费者打印所有偶数消息,而另一个打印所有奇数消息。
实际上,通过访问ActiveMQ web界面,我看到我的"topics"(即在/topics.jsp页面下)被命名为ActiveMQ.Advisory.Consumer.Queue.myTopic
和ActiveMQ.Advisory.Producer.Queue.myTopic
, "myTopic" 确实出现在队列页面中(即 /queues.jsp)。
节点按以下顺序启动:
- AMQ 经纪人
- 消费者 1
- 消费者 2
- 制作人
创建的第一个 "topic" 是 ActiveMQ.Advisory.Consumer.Queue.myTopic
,而生产者显然只有在生产者启动后才会出现。
我不是 ActiveMQ 方面的专家,所以我的 producer/consumer "topics" 被命名为“.Queue”的事实可能只是一种误导。但是,我确实得到了 official ActiveMQ documentation 中描述的队列语义,而不是主题。
我也看过
我需要实现的是将所有消息传递给我的所有(可能> 2 个)消费者。
更新: 我忘了说,我的 application.properties
文件已经包含 spring.jms.pub-sub-domain=true
,以及其他设置。
另外,我使用的Spring集成版本是4.3.12.RELEASE.
问题是,我得到的仍然是 RR 负载平衡语义,而不是发布-订阅语义。
至于我在@Hassen Bennour 提供的 link 中看到的内容,我希望在所有主题的列表中得到一个 ActiveMQ.Advisory.Producer.Topic.myTopic
和一个 ActiveMQ.Advisory.Consumer.Topic.myTopic
行。不知何故,我认为我没有很好地使用 Spring 集成库,因此当我想设置一个主题时我正在设置一个队列。
更新 2:对于造成的混乱,我们深表歉意。 jmsOutputChannel2
这里其实是jmsOutputChannel
,我把主要的部分编辑了。我在我的代码中使用辅助 "topic" 作为检查,生产者可以向其发送消息并自己接收回复。 "topic" 名称也不同,所以...它完全在一个单独的流程中。
我通过这种方式更改接收流程确实取得了一点进展:
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
//return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
//.channel("jmsInputChannel").get();
return IntegrationFlows.from(Jms.publishSubscribeChannel(connectionFactory()).destination("myTopic")) //Jms.publishSubscribeChannel() rather than Jms.messageDrivenChannelAdapter()
.channel("jmsInputChannel").get();
}
这会在代理上生成一个类型为 Consumer.Topic.myTopic
而不是 Consumer.Queue.myTopic
的咨询主题,并且实际上是一个名为 myTopic
的主题(正如我在主题选项卡中看到的那样)。但是,一旦生产者启动,就会创建一个 Producer.Queue
咨询主题,并且消息会在未传送的情况下发送到那里。
输入流中适配器的选择似乎决定了创建哪种咨询消费者主题(从 Jms.messageDrivenChannelAdapter()
切换到 Jms.publishSubscribeChannel()
时的主题与队列)。但是,我一直无法找到类似于输出流的内容。
更新 3:问题已解决,感谢@Hassen Bennour。回顾:
我在制作人的 Jms.outboundAdapter()
jmsTemplate()
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jsaTemplate())
.destination("myTopic")
).get();
}
消费者的配置更复杂 Jms.messageDrivenChannelAdapter()
:
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
Jms.container(connectionFactory(),"myTopic")
.pubSubDomain(true).get()) )
.channel("jmsInputChannel").get();
}
虽然这个可能是最流畅最灵活的方法了,有这么一个bean...
@Bean
public Topic topic() {
return new ActiveMQTopic("myTopic");
}
连接作为适配器的目的地,而不仅仅是一个字符串。
再次感谢。
将 spring.jms.pub-sub-domain=true 添加到 application.properties
或
@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// the configurer will use PubSubDomain from application.properties if defined or false if not
//so setting it on the factory level need to be set after this
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
ActiveMQ.Advisory.Consumer.Queue.myTopic
是名为 myTopic 的队列的咨询主题
看看这里阅读咨询
http://activemq.apache.org/advisory-message.html
更新:
更新您的定义如下
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jmsTemplate())
.destination("myTopic")
).get();
}
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
Jms.container(connectionFactory(),"myTopic")
.pubSubDomain(true).get()) )
.channel("jmsInputChannel").get();
}
或将 Destination 定义为主题并将 destination("myTopic") 替换为 destination(topic())
@Bean
public Topic topic() {
return new ActiveMQTopic("myTopic");
}