Spring - 普通 RabbitMQ 比普通 RabbitMQ + JMS 快很多?
Spring - plain RabbitMQ a lot faster than plain RabbitMQ + JMS?
我有 2 个 Spring RabbitMq 配置,一个使用 RabbitTemplate,一个使用 JmsTemplate。
使用 RabbitTemplate 的配置:
Class AmqpMailIntegrationPerfTestConfig:
@Configuration
@ComponentScan(basePackages = {
"com.test.perf.amqp.receiver",
"com.test.perf.amqp.sender"
})
@EnableRabbit
public class AmqpMailIntegrationPerfTestConfig {
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("mail", MailMessage.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}
@Bean
public RabbitTemplate myRabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public ConnectionFactory createConnectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
return connectionFactory;
}
@Bean
Queue queue() {
return new Queue(AmqpMailSenderImpl.QUEUE_NAME, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(AmqpMailSenderImpl.TOPIC_EXCHANGE_NAME);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(AmqpMailSenderImpl.ROUTING_KEY);
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(createConnectionFactory());
}
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(createConnectionFactory());
factory.setMaxConcurrentConsumers(5);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
}
com.test.perf.amqp.sender包中的AmqpMailSenderPerfImplclass:
@Component
public class AmqpMailSenderPerfImpl implements MailSender {
public static final String TOPIC_EXCHANGE_NAME = "mails-exchange";
public static final String ROUTING_KEY = "mails";
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public boolean sendMail(MailMessage message) {
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY, message);
return true;
}
}
com.test.perf.amqp.receiver包中的AmqpMailReceiverPerfImplclass:
@Component
public class AmqpMailReceiverPerfImpl implements ReceivedDatesKeeper {
private Logger logger = LoggerFactory.getLogger(getClass());
private Map<String,Date> datesReceived = new HashMap<String, Date>();
@RabbitListener(containerFactory = "myRabbitListenerContainerFactory", queues = AmqpMailSenderImpl.QUEUE_NAME)
public void receiveMessage(MailMessage message) {
logger.info("------ Received mail! ------\nmessage:" + message.getSubject());
datesReceived.put(message.getSubject(), new Date());
}
public Map<String, Date> getDatesReceived() {
return datesReceived;
}
}
JmsTemplate 的配置:
Class JmsMailIntegrationPerfTestConfig:
@Configuration
@EnableJms
@ComponentScan(basePackages = {
"com.test.perf.jms.receiver",
"com.test.jms.sender"
})
public class JmsMailIntegrationPerfTestConfig {
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
Map<String,Class<?>> typeIdMappings = new HashMap<String,Class<?>>();
typeIdMappings.put("mail", MailMessage.class);
converter.setTypeIdMappings(typeIdMappings);
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public ConnectionFactory createConnectionFactory(){
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
return connectionFactory;
}
@Bean(name = "myJmsFactory")
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("10-50");
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
@Bean
public Destination jmsDestination() {
RMQDestination jmsDestination = new RMQDestination();
jmsDestination.setDestinationName("myQueue");
jmsDestination.setAmqp(false);
jmsDestination.setAmqpQueueName("mails");
return jmsDestination;
}
@Bean
public JmsTemplate myJmsTemplate(ConnectionFactory connectionFactory) {
final JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
return jmsTemplate;
}
}
包com.test.jms.sender中的JmsMailSenderImpl class:
@Component
public class JmsMailSenderImpl implements MailSender {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private JmsTemplate jmsTemplate;
@Override
public boolean sendMail(MailMessage message) {
logger.info("Sending message!");
jmsTemplate.convertAndSend("mailbox", message);
return false;
}
}
包com.test.perf.jms.receiver:
中的JmsMailReceiverPerfImpl class
@Component
public class JmsMailReceiverPerfImpl implements ReceivedDatesKeeper {
private Logger logger = LoggerFactory.getLogger(getClass());
private Map<String,Date> datesReceived = new HashMap<String, Date>();
@JmsListener(destination = "mailbox", containerFactory = "myJmsFactory", concurrency = "10")
public void receiveMail(MailMessage message) {
datesReceived.put(message.getSubject(), new Date());
logger.info("Received <" + message.getSubject() + ">");
}
public Map<String, Date> getDatesReceived() {
return datesReceived;
}
}
我通过启动 10 个线程并使相应的 MailSenders 每个发送 1000 封邮件来测试上述配置。
对于使用 RabbitTemplate 的配置,我得到:
* 所有消息的总吞吐时间:3687ms
* 处理一条消息的时间:817ms
对于 JmsTemplate 的配置,我得到:
* 所有消息的总吞吐时间:41653ms
* 处理一条消息的时间:67ms
这似乎表明带有 JmsTemplate 的版本无法并行工作,或者至少没有以最佳方式使用资源。
有人知道是什么原因造成的吗?我试过不同的事务和并发参数,但无济于事。
我们想要的是使用 JmsTemplate 获得与 RabbitTemplate 相同的吞吐时间,因此我们可以使用 JMS 作为抽象层。
我明白为什么消费者端速度较慢 - Consumer.receive()
对每条消息使用同步 basicGet()
而 @RabbitListener
容器使用 basicConsume
和预取计数250.
在JMS发送端,你需要使用一个CachingConnectionFactory
,否则每次发送都会创建一个新的session/producer/channel。
尽管如此,它还是慢了很多;我建议您在 RabbitMQ 工程师常去的 rabbitmq-users Google 组询问。他们维护 JMS 客户端。
我有 2 个 Spring RabbitMq 配置,一个使用 RabbitTemplate,一个使用 JmsTemplate。
使用 RabbitTemplate 的配置:
Class AmqpMailIntegrationPerfTestConfig:
@Configuration
@ComponentScan(basePackages = {
"com.test.perf.amqp.receiver",
"com.test.perf.amqp.sender"
})
@EnableRabbit
public class AmqpMailIntegrationPerfTestConfig {
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("mail", MailMessage.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}
@Bean
public RabbitTemplate myRabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public ConnectionFactory createConnectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
return connectionFactory;
}
@Bean
Queue queue() {
return new Queue(AmqpMailSenderImpl.QUEUE_NAME, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(AmqpMailSenderImpl.TOPIC_EXCHANGE_NAME);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(AmqpMailSenderImpl.ROUTING_KEY);
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(createConnectionFactory());
}
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(createConnectionFactory());
factory.setMaxConcurrentConsumers(5);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
}
com.test.perf.amqp.sender包中的AmqpMailSenderPerfImplclass:
@Component
public class AmqpMailSenderPerfImpl implements MailSender {
public static final String TOPIC_EXCHANGE_NAME = "mails-exchange";
public static final String ROUTING_KEY = "mails";
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public boolean sendMail(MailMessage message) {
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY, message);
return true;
}
}
com.test.perf.amqp.receiver包中的AmqpMailReceiverPerfImplclass:
@Component
public class AmqpMailReceiverPerfImpl implements ReceivedDatesKeeper {
private Logger logger = LoggerFactory.getLogger(getClass());
private Map<String,Date> datesReceived = new HashMap<String, Date>();
@RabbitListener(containerFactory = "myRabbitListenerContainerFactory", queues = AmqpMailSenderImpl.QUEUE_NAME)
public void receiveMessage(MailMessage message) {
logger.info("------ Received mail! ------\nmessage:" + message.getSubject());
datesReceived.put(message.getSubject(), new Date());
}
public Map<String, Date> getDatesReceived() {
return datesReceived;
}
}
JmsTemplate 的配置:
Class JmsMailIntegrationPerfTestConfig:
@Configuration
@EnableJms
@ComponentScan(basePackages = {
"com.test.perf.jms.receiver",
"com.test.jms.sender"
})
public class JmsMailIntegrationPerfTestConfig {
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
Map<String,Class<?>> typeIdMappings = new HashMap<String,Class<?>>();
typeIdMappings.put("mail", MailMessage.class);
converter.setTypeIdMappings(typeIdMappings);
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public ConnectionFactory createConnectionFactory(){
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
return connectionFactory;
}
@Bean(name = "myJmsFactory")
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("10-50");
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
@Bean
public Destination jmsDestination() {
RMQDestination jmsDestination = new RMQDestination();
jmsDestination.setDestinationName("myQueue");
jmsDestination.setAmqp(false);
jmsDestination.setAmqpQueueName("mails");
return jmsDestination;
}
@Bean
public JmsTemplate myJmsTemplate(ConnectionFactory connectionFactory) {
final JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
return jmsTemplate;
}
}
包com.test.jms.sender中的JmsMailSenderImpl class:
@Component
public class JmsMailSenderImpl implements MailSender {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private JmsTemplate jmsTemplate;
@Override
public boolean sendMail(MailMessage message) {
logger.info("Sending message!");
jmsTemplate.convertAndSend("mailbox", message);
return false;
}
}
包com.test.perf.jms.receiver:
中的JmsMailReceiverPerfImpl class@Component
public class JmsMailReceiverPerfImpl implements ReceivedDatesKeeper {
private Logger logger = LoggerFactory.getLogger(getClass());
private Map<String,Date> datesReceived = new HashMap<String, Date>();
@JmsListener(destination = "mailbox", containerFactory = "myJmsFactory", concurrency = "10")
public void receiveMail(MailMessage message) {
datesReceived.put(message.getSubject(), new Date());
logger.info("Received <" + message.getSubject() + ">");
}
public Map<String, Date> getDatesReceived() {
return datesReceived;
}
}
我通过启动 10 个线程并使相应的 MailSenders 每个发送 1000 封邮件来测试上述配置。
对于使用 RabbitTemplate 的配置,我得到: * 所有消息的总吞吐时间:3687ms * 处理一条消息的时间:817ms
对于 JmsTemplate 的配置,我得到: * 所有消息的总吞吐时间:41653ms * 处理一条消息的时间:67ms
这似乎表明带有 JmsTemplate 的版本无法并行工作,或者至少没有以最佳方式使用资源。
有人知道是什么原因造成的吗?我试过不同的事务和并发参数,但无济于事。
我们想要的是使用 JmsTemplate 获得与 RabbitTemplate 相同的吞吐时间,因此我们可以使用 JMS 作为抽象层。
我明白为什么消费者端速度较慢 - Consumer.receive()
对每条消息使用同步 basicGet()
而 @RabbitListener
容器使用 basicConsume
和预取计数250.
在JMS发送端,你需要使用一个CachingConnectionFactory
,否则每次发送都会创建一个新的session/producer/channel。
尽管如此,它还是慢了很多;我建议您在 RabbitMQ 工程师常去的 rabbitmq-users Google 组询问。他们维护 JMS 客户端。