如何从多个消费者的主题中读取消息?
How to read messages from topic with multiple consumer?
我从一个主题中读取了 10 个消费者并将这些消息发送到一个队列。当我通过 jmeter 向主题发送 50 条消息时,队列有 500 条消息。因此,每个消费者都从主题中读取相同的消息,然后发送到队列。每个消费者是否可以从主题中读取不同的消息?
非常感谢。
JmsConfig.java
@Configuration
@EnableJms
@ComponentScan(basePackages = "com.jms.config")
public class JmsConfig {
String BROKER_URL = "tcp://localhost:61616";
String BROKER_USERNAME = "admin";
String BROKER_PASSWORD = "admin";
@Bean
public ActiveMQConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
connectionFactory.setPassword(BROKER_USERNAME);
connectionFactory.setUserName(BROKER_PASSWORD);
return connectionFactory;
}
@Bean
public JmsTemplate jmsTemplate(){
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setPubSubDomain(true);
return template;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-10");
factory.setPubSubDomain(true);
return factory;
}
JmsSender.java
@Service
public class JmsSender{
private JmsTemplate jmsTemplate;
@Value("#{appProperties.toQueueName}")
private String queueName;
@Autowired
private ApplicationContextUtil applicationContextUtil;
public void send(String rawData){
getJmsTemplate().send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(rawData);
}
});
}
public JmsTemplate getJmsTemplate(){
if (jmsTemplate == null){
jmsTemplate = (JmsTemplate) applicationContextUtil.getBeanFromAppContext("jmsForQueue");
}
return jmsTemplate;
}
}
Worker.java
@Component
public class Worker {
@Autowired
private JmsSender jmsSender;
@JmsListener(destination = "#{appProperties.fromTopicName}")
public String receiveMessageFromTopic(final String jsonMessage) throws JMSException {
System.out.println("Received message " + jsonMessage);
jmsSender.send(jsonMessage);
return response;
}
}
QueueConfig.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:cache="http://www.springframework.org/schema/cache"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.3.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.3.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.3.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.4.0.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-4.3.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd
http://www.springframework.org/schema/cache
http://www.springframework.org/schema/cache/spring-cache.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.3.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"
default-lazy-init="false">
<bean id="brokerUrl" class="java.lang.String">
<constructor-arg value="#{appProperties.queueUrl}"/>
</bean>
<amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>
<bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<constructor-arg ref="amqConnectionFactory"/>
<property name="maxConnections" value="#{appProperties.maxConnections}"/>
<property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
<property name="maximumActiveSessionPerConnection" value = "10"/>
</bean>
<bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory1"/>
</bean>
<bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="#{appProperties.toQueueName}"/>
</bean>
</beans>
您看到的行为是正常的。 JMS 主题遵循 publish/subscribe(即 pub/sub)语义,其中所有订阅者都获得发送到该主题的所有消息。在您的例子中,您有 10 个订阅者,并且您要发送 50 条消息。这 10 个订阅者中的每一个都接收 50 条消息中的每一条(根据 pub/sub 语义),然后将其转发到队列中。因此,队列收到 500 条消息。
如果您希望所有消费者共享所有消息,那么您不应使用 JMS 主题,而应使用 JMS 队列。
我从一个主题中读取了 10 个消费者并将这些消息发送到一个队列。当我通过 jmeter 向主题发送 50 条消息时,队列有 500 条消息。因此,每个消费者都从主题中读取相同的消息,然后发送到队列。每个消费者是否可以从主题中读取不同的消息?
非常感谢。
JmsConfig.java
@Configuration
@EnableJms
@ComponentScan(basePackages = "com.jms.config")
public class JmsConfig {
String BROKER_URL = "tcp://localhost:61616";
String BROKER_USERNAME = "admin";
String BROKER_PASSWORD = "admin";
@Bean
public ActiveMQConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
connectionFactory.setPassword(BROKER_USERNAME);
connectionFactory.setUserName(BROKER_PASSWORD);
return connectionFactory;
}
@Bean
public JmsTemplate jmsTemplate(){
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setPubSubDomain(true);
return template;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-10");
factory.setPubSubDomain(true);
return factory;
}
JmsSender.java
@Service
public class JmsSender{
private JmsTemplate jmsTemplate;
@Value("#{appProperties.toQueueName}")
private String queueName;
@Autowired
private ApplicationContextUtil applicationContextUtil;
public void send(String rawData){
getJmsTemplate().send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(rawData);
}
});
}
public JmsTemplate getJmsTemplate(){
if (jmsTemplate == null){
jmsTemplate = (JmsTemplate) applicationContextUtil.getBeanFromAppContext("jmsForQueue");
}
return jmsTemplate;
}
}
Worker.java
@Component
public class Worker {
@Autowired
private JmsSender jmsSender;
@JmsListener(destination = "#{appProperties.fromTopicName}")
public String receiveMessageFromTopic(final String jsonMessage) throws JMSException {
System.out.println("Received message " + jsonMessage);
jmsSender.send(jsonMessage);
return response;
}
}
QueueConfig.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:cache="http://www.springframework.org/schema/cache"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.3.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.3.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.3.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.4.0.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-4.3.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd
http://www.springframework.org/schema/cache
http://www.springframework.org/schema/cache/spring-cache.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.3.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"
default-lazy-init="false">
<bean id="brokerUrl" class="java.lang.String">
<constructor-arg value="#{appProperties.queueUrl}"/>
</bean>
<amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>
<bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<constructor-arg ref="amqConnectionFactory"/>
<property name="maxConnections" value="#{appProperties.maxConnections}"/>
<property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
<property name="maximumActiveSessionPerConnection" value = "10"/>
</bean>
<bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory1"/>
</bean>
<bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="#{appProperties.toQueueName}"/>
</bean>
</beans>
您看到的行为是正常的。 JMS 主题遵循 publish/subscribe(即 pub/sub)语义,其中所有订阅者都获得发送到该主题的所有消息。在您的例子中,您有 10 个订阅者,并且您要发送 50 条消息。这 10 个订阅者中的每一个都接收 50 条消息中的每一条(根据 pub/sub 语义),然后将其转发到队列中。因此,队列收到 500 条消息。
如果您希望所有消费者共享所有消息,那么您不应使用 JMS 主题,而应使用 JMS 队列。