如何从多个消费者的主题中读取消息?

How to read messages from topic with multiple consumer?

我从一个主题中读取了 10 个消费者并将这些消息发送到一个队列。当我通过 jmeter 向主题发送 50 条消息时,队列有 500 条消息。因此,每个消费者都从主题中读取相同的消息,然后发送到队列。每个消费者是否可以从主题中读取不同的消息?

非常感谢。

JmsConfig.java

@Configuration
@EnableJms
@ComponentScan(basePackages = "com.jms.config")
public class JmsConfig {
    String BROKER_URL = "tcp://localhost:61616";
    String BROKER_USERNAME = "admin";
    String BROKER_PASSWORD = "admin";

    @Bean
    public ActiveMQConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(BROKER_URL);
        connectionFactory.setPassword(BROKER_USERNAME);
        connectionFactory.setUserName(BROKER_PASSWORD);
        return connectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate(){
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory());
        template.setPubSubDomain(true);
        return template;
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrency("1-10");
        factory.setPubSubDomain(true);
        return factory;
    }

JmsSender.java

@Service
public class JmsSender{

    private JmsTemplate jmsTemplate;

    @Value("#{appProperties.toQueueName}")
    private String queueName;

    @Autowired
    private ApplicationContextUtil applicationContextUtil;

    public void send(String rawData){
        getJmsTemplate().send(queueName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                 return session.createObjectMessage(rawData);
            }
        });
    }
    public JmsTemplate getJmsTemplate(){
        if (jmsTemplate == null){
            jmsTemplate = (JmsTemplate) applicationContextUtil.getBeanFromAppContext("jmsForQueue");
        }
        return  jmsTemplate;
    }
}

Worker.java

@Component
public class Worker {

    @Autowired
    private JmsSender jmsSender;

    @JmsListener(destination = "#{appProperties.fromTopicName}")
    public String receiveMessageFromTopic(final String jsonMessage) throws JMSException {
        System.out.println("Received message " + jsonMessage);
        jmsSender.send(jsonMessage);
        return response;
    }
}

QueueConfig.xml

<?xml version="1.0" encoding="UTF-8"?>
   <beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:aop="http://www.springframework.org/schema/aop"
   xmlns:context="http://www.springframework.org/schema/context"
   xmlns:tx="http://www.springframework.org/schema/tx"
   xmlns:mvc="http://www.springframework.org/schema/mvc"
   xmlns:task="http://www.springframework.org/schema/task"
   xmlns:amq="http://activemq.apache.org/schema/core"
   xmlns:cache="http://www.springframework.org/schema/cache"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
   xmlns:jms="http://www.springframework.org/schema/jms"
   xsi:schemaLocation="http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
   http://www.springframework.org/schema/aop
   http://www.springframework.org/schema/aop/spring-aop-4.3.xsd
   http://www.springframework.org/schema/context
   http://www.springframework.org/schema/context/spring-context-4.3.xsd
   http://www.springframework.org/schema/tx
   http://www.springframework.org/schema/tx/spring-tx-4.3.xsd
   http://activemq.apache.org/schema/core
   http://activemq.apache.org/schema/core/activemq-core-5.4.0.xsd
   http://www.springframework.org/schema/task
   http://www.springframework.org/schema/task/spring-task-4.3.xsd
   http://www.springframework.org/schema/mvc
   http://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd
   http://www.springframework.org/schema/cache
   http://www.springframework.org/schema/cache/spring-cache.xsd
   http://www.springframework.org/schema/integration
   http://www.springframework.org/schema/integration/spring-integration.xsd
   http://www.springframework.org/schema/integration/jms
   http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.3.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"
   default-lazy-init="false">     

   <bean id="brokerUrl" class="java.lang.String">
       <constructor-arg value="#{appProperties.queueUrl}"/>
   </bean>

   <amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>

   <bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
       <constructor-arg ref="amqConnectionFactory"/>
       <property name="maxConnections" value="#{appProperties.maxConnections}"/>
       <property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
       <property name="maximumActiveSessionPerConnection" value = "10"/> 

   </bean>

   <bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
       <constructor-arg ref="connectionFactory1"/>
   </bean>

   <bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
       <constructor-arg value="#{appProperties.toQueueName}"/>
   </bean>
</beans>

您看到的行为是正常的。 JMS 主题遵循 publish/subscribe(即 pub/sub)语义,其中所有订阅者都获得发送到该主题的所有消息。在您的例子中,您有 10 个订阅者,并且您要发送 50 条消息。这 10 个订阅者中的每一个都接收 50 条消息中的每一条(根据 pub/sub 语义),然后将其转发到队列中。因此,队列收到 500 条消息。

如果您希望所有消费者共享所有消息,那么您不应使用 JMS 主题,而应使用 JMS 队列。