Spring - 普通 RabbitMQ 比普通 RabbitMQ + JMS 快很多?

Spring - plain RabbitMQ a lot faster than plain RabbitMQ + JMS?

我有 2 个 Spring RabbitMq 配置,一个使用 RabbitTemplate,一个使用 JmsTemplate。


使用 RabbitTemplate 的配置:

Class AmqpMailIntegrationPerfTestConfig:

@Configuration
@ComponentScan(basePackages = {
    "com.test.perf.amqp.receiver",
    "com.test.perf.amqp.sender"
})
@EnableRabbit
public class AmqpMailIntegrationPerfTestConfig {

    @Bean
    public DefaultClassMapper classMapper() {
        DefaultClassMapper classMapper = new DefaultClassMapper();
        Map<String, Class<?>> idClassMapping = new HashMap<>();
        idClassMapping.put("mail", MailMessage.class);
        classMapper.setIdClassMapping(idClassMapping);
        return classMapper;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
        jsonConverter.setClassMapper(classMapper());
        return jsonConverter;
    }

    @Bean
    public RabbitTemplate myRabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public ConnectionFactory createConnectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        return connectionFactory;
    }

    @Bean
    Queue queue() {
        return new Queue(AmqpMailSenderImpl.QUEUE_NAME, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(AmqpMailSenderImpl.TOPIC_EXCHANGE_NAME);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(AmqpMailSenderImpl.ROUTING_KEY);
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(createConnectionFactory());
    }

    @Bean
    public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(createConnectionFactory());
        factory.setMaxConcurrentConsumers(5);
        factory.setMessageConverter(jsonMessageConverter());
        return factory;
    }

}

com.test.perf.amqp.sender包中的AmqpMailSenderPerfImplclass:

@Component
public class AmqpMailSenderPerfImpl implements MailSender {

    public static final String TOPIC_EXCHANGE_NAME = "mails-exchange";
    public static final String ROUTING_KEY = "mails";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public boolean sendMail(MailMessage message) {
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY, message);
        return true;
    }
}

com.test.perf.amqp.receiver包中的AmqpMailReceiverPerfImplclass:

@Component
public class AmqpMailReceiverPerfImpl implements ReceivedDatesKeeper {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private Map<String,Date> datesReceived = new HashMap<String, Date>();

    @RabbitListener(containerFactory = "myRabbitListenerContainerFactory", queues = AmqpMailSenderImpl.QUEUE_NAME)
    public void receiveMessage(MailMessage message) {
        logger.info("------ Received mail! ------\nmessage:" + message.getSubject());
        datesReceived.put(message.getSubject(), new Date());
    }

    public Map<String, Date> getDatesReceived() {
        return datesReceived;
    }

}

JmsTemplate 的配置:

Class JmsMailIntegrationPerfTestConfig:

@Configuration
@EnableJms
@ComponentScan(basePackages = {
        "com.test.perf.jms.receiver",
        "com.test.jms.sender"
})
public class JmsMailIntegrationPerfTestConfig {

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();

        Map<String,Class<?>> typeIdMappings = new HashMap<String,Class<?>>();
        typeIdMappings.put("mail", MailMessage.class);
        converter.setTypeIdMappings(typeIdMappings);

        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");

        return converter;
    }

    @Bean
    public ConnectionFactory createConnectionFactory(){
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);

        return connectionFactory;
    }

    @Bean(name = "myJmsFactory")
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("10-50");
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    @Bean
    public Destination jmsDestination() {
        RMQDestination jmsDestination = new RMQDestination();
        jmsDestination.setDestinationName("myQueue");
        jmsDestination.setAmqp(false);
        jmsDestination.setAmqpQueueName("mails");
        return jmsDestination;
    }

    @Bean
    public JmsTemplate myJmsTemplate(ConnectionFactory connectionFactory) {
        final JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
        return jmsTemplate;
    }

}

包com.test.jms.sender中的JmsMailSenderImpl class:

@Component
public class JmsMailSenderImpl implements MailSender {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    public boolean sendMail(MailMessage message) {
        logger.info("Sending message!");
        jmsTemplate.convertAndSend("mailbox", message);

        return false;
    }

}

包com.test.perf.jms.receiver:

中的JmsMailReceiverPerfImpl class
@Component
public class JmsMailReceiverPerfImpl implements ReceivedDatesKeeper {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private Map<String,Date> datesReceived = new HashMap<String, Date>();

    @JmsListener(destination = "mailbox", containerFactory = "myJmsFactory", concurrency = "10")
    public void receiveMail(MailMessage message) {
        datesReceived.put(message.getSubject(), new Date());
        logger.info("Received <" + message.getSubject() + ">");
    }

    public Map<String, Date> getDatesReceived() {
        return datesReceived;
    }

}

我通过启动 10 个线程并使相应的 MailSenders 每个发送 1000 封邮件来测试上述配置。

对于使用 RabbitTemplate 的配置,我得到: * 所有消息的总吞吐时间:3687ms * 处理一条消息的时间:817ms

对于 JmsTemplate 的配置,我得到: * 所有消息的总吞吐时间:41653ms * 处理一条消息的时间:67ms

这似乎表明带有 JmsTemplate 的版本无法并行工作,或者至少没有以最佳方式使用资源。

有人知道是什么原因造成的吗?我试过不同的事务和并发参数,但无济于事。

我们想要的是使用 JmsTemplate 获得与 RabbitTemplate 相同的吞吐时间,因此我们可以使用 JMS 作为抽象层。

我明白为什么消费者端速度较慢 - Consumer.receive() 对每条消息使用同步 basicGet()@RabbitListener 容器使用 basicConsume 和预取计数250.

在JMS发送端,你需要使用一个CachingConnectionFactory,否则每次发送都会创建一个新的session/producer/channel。

尽管如此,它还是慢了很多;我建议您在 RabbitMQ 工程师常去的 rabbitmq-users Google 组询问。他们维护 JMS 客户端。