Spring 集成 JMS 创建 ActiveMQ 队列而不是主题

Spring Integration JMS creating ActiveMQ queue instead of topic

我正在尝试使用 ActiveMQ 代理向两个收听自动主题的消费者发送消息,使用 Spring 集成工具。

这是我的配置 bean(在发布者和订阅者之间是通用的):

@Value("${spring.activemq.broker-url}")
String brokerUrl;

@Value("${spring.activemq.user}")
String userName;

@Value("${spring.activemq.password}")
String password;

@Bean
public ConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL(brokerUrl);
    connectionFactory.setUserName(userName);
    connectionFactory.setPassword(password);
    return connectionFactory;
}

@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setPubSubDomain(true); //!!
    configurer.configure(factory, connectionFactory);
    return factory;
}

@Bean
public JmsTemplate jmsTemplate() {
    JmsTemplate template = new JmsTemplate();
    template.setConnectionFactory(connectionFactory());
    template.setPubSubDomain(true); //!!
    return template;
}

以下是消费者的 bean:

@Bean(name = "jmsInputChannel")
public MessageChannel jmsInputChannel() {
    return new PublishSubscribeChannel();
}

@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {        
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
            .channel("jmsInputChannel").get();
}

//Consumes the message.
@ServiceActivator(inputChannel="jmsInputChannel")
public void receive(String msg){
    System.out.println("Received Message: " + msg);
}

这些是生产者的豆子:

@Bean(name = "jmsOutputChannel")
public MessageChannel jmsOutputChannel() {
    return new PublishSubscribeChannel();
}

@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
    return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(connectionFactory())
            .destination("myTopic")
    ).get();
}



private static int counter = 1;

@Scheduled(initialDelay=5000, fixedDelay=2000)
public void send() {
     String s = "Message number " + counter;
     counter++;
     jmsOutputChannel().send(MessageBuilder.withPayload(s).build());
}

我没有使用嵌入式 ActiveMQ 代理。我在各自的 (docker) 容器中使用一个代理、一个生产者和两个消费者。

我的问题是,虽然我在 JmsListenerContainerFactoryJmsTemplate 上都调用了 setPubSubDomain(true),但我的 "topics" 表现为队列:一个消费者打印所有偶数消息,而另一个打印所有奇数消息。

实际上,通过访问ActiveMQ web界面,我看到我的"topics"(即在/topics.jsp页面下)被命名为ActiveMQ.Advisory.Consumer.Queue.myTopicActiveMQ.Advisory.Producer.Queue.myTopic, "myTopic" 确实出现在队列页面中(即 /queues.jsp)。

节点按以下顺序启动:

创建的第一个 "topic" 是 ActiveMQ.Advisory.Consumer.Queue.myTopic,而生产者显然只有在生产者启动后才会出现。

我不是 ActiveMQ 方面的专家,所以我的 producer/consumer "topics" 被命名为“.Queue”的事实可能只是一种误导。但是,我确实得到了 official ActiveMQ documentation 中描述的队列语义,而不是主题。

我也看过 ,但是我使用的所有频道都已经是 PublishSubscribeChannel 类型了。

我需要实现的是将所有消息传递给我的所有(可能> 2 个)消费者。

更新: 我忘了说,我的 application.properties 文件已经包含 spring.jms.pub-sub-domain=true,以及其他设置。

另外,我使用的Spring集成版本是4.3.12.RELEASE.

问题是,我得到的仍然是 RR 负载平衡语义,而不是发布-订阅语义。 至于我在@Hassen Bennour 提供的 link 中看到的内容,我希望在所有主题的列表中得到一个 ActiveMQ.Advisory.Producer.Topic.myTopic 和一个 ActiveMQ.Advisory.Consumer.Topic.myTopic 行。不知何故,我认为我没有很好地使用 Spring 集成库,因此当我想设置一个主题时我正在设置一个队列。

更新 2:对于造成的混乱,我们深表歉意。 jmsOutputChannel2这里其实是jmsOutputChannel,我把主要的部分编辑了。我在我的代码中使用辅助 "topic" 作为检查,生产者可以向其发送消息并自己接收回复。 "topic" 名称也不同,所以...它完全在一个单独的流程中。

我通过这种方式更改接收流程确实取得了一点进展:

@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {        
    //return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
            //.channel("jmsInputChannel").get();
return IntegrationFlows.from(Jms.publishSubscribeChannel(connectionFactory()).destination("myTopic")) //Jms.publishSubscribeChannel() rather than Jms.messageDrivenChannelAdapter()
            .channel("jmsInputChannel").get();
}

这会在代理上生成一个类型为 Consumer.Topic.myTopic 而不是 Consumer.Queue.myTopic 的咨询主题,并且实际上是一个名为 myTopic 的主题(正如我在主题选项卡中看到的那样)。但是,一旦生产者启动,就会创建一个 Producer.Queue 咨询主题,并且消息会在未传送的情况下发送到那里。

输入流中适配器的选择似乎决定了创建哪种咨询消费者主题(从 Jms.messageDrivenChannelAdapter() 切换到 Jms.publishSubscribeChannel() 时的主题与队列)。但是,我一直无法找到类似于输出流的内容。

更新 3:问题已解决,感谢@Hassen Bennour。回顾:

我在制作人的 Jms.outboundAdapter()

中连接了 jmsTemplate()
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
    return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jsaTemplate())
            .destination("myTopic")
    ).get();
}

消费者的配置更复杂 Jms.messageDrivenChannelAdapter():

@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {        
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
        Jms.container(connectionFactory(),"myTopic")
        .pubSubDomain(true).get()) )
        .channel("jmsInputChannel").get();
}

虽然这个可能是最流畅最灵活的方法了,有这么一个bean...

@Bean
public Topic topic() {
    return new ActiveMQTopic("myTopic");
}

连接作为适配器的目的地,而不仅仅是一个字符串。

再次感谢。

将 spring.jms.pub-sub-domain=true 添加到 application.properties

@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    // the configurer will use PubSubDomain from application.properties if defined or false if not
    //so setting it on the factory level need to be set after this
    configurer.configure(factory, connectionFactory);
    factory.setPubSubDomain(true);
    return factory;
}

ActiveMQ.Advisory.Consumer.Queue.myTopic 是名为 myTopic 的队列的咨询主题 看看这里阅读咨询 http://activemq.apache.org/advisory-message.html

更新:

更新您的定义如下

@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
    return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jmsTemplate())
            .destination("myTopic")
    ).get();
}

@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {        
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
            Jms.container(connectionFactory(),"myTopic")
            .pubSubDomain(true).get()) )
            .channel("jmsInputChannel").get();
}

或将 Destination 定义为主题并将 destination("myTopic") 替换为 destination(topic())

@Bean
public Topic topic() {
    return new ActiveMQTopic("myTopic");
}