使用并发用户在 ActiveMQ 中更快地处理待处理消息
Process pending messages faster in ActiveMQ using concurrent user
我有一个队列正在被 Spring 启动应用程序消耗。由于我的 Spring 引导应用程序正在等待来自 REST 的响应 API 它无法更快地处理传入的消息,因为我的队列中待处理消息的数量正在增加。
我做了一些研发并提出了下面提到的解决方案。请帮助我查看解决方案,以便我知道我是否在正确的轨道上。
我当前的 ActiveMQ 配置:
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL(brokerURL);
factory.setUserName(userName);
factory.setPassword(password);
return factory;
}
我的解决方案计划:-
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory());
factory.setConcurrency("1-50");
factory.setMaxMessagesPerTask(10);
factory.setReceiveTimeout(5000L); // 5 seconds
return factory;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL(brokerURL);
factory.setUserName(userName);
factory.setPassword(password);
ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
policy.setQueuePrefetch(1);
factory.setPrefetchPolicy(policy);
return factory;
}
一般来说,更多的消费者(尤其是并发消费者)处理消息的速度会比较少的消费者快,所以从这个角度来看这看起来不错。
我有一个队列正在被 Spring 启动应用程序消耗。由于我的 Spring 引导应用程序正在等待来自 REST 的响应 API 它无法更快地处理传入的消息,因为我的队列中待处理消息的数量正在增加。
我做了一些研发并提出了下面提到的解决方案。请帮助我查看解决方案,以便我知道我是否在正确的轨道上。
我当前的 ActiveMQ 配置:
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL(brokerURL);
factory.setUserName(userName);
factory.setPassword(password);
return factory;
}
我的解决方案计划:-
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory());
factory.setConcurrency("1-50");
factory.setMaxMessagesPerTask(10);
factory.setReceiveTimeout(5000L); // 5 seconds
return factory;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL(brokerURL);
factory.setUserName(userName);
factory.setPassword(password);
ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
policy.setQueuePrefetch(1);
factory.setPrefetchPolicy(policy);
return factory;
}
一般来说,更多的消费者(尤其是并发消费者)处理消息的速度会比较少的消费者快,所以从这个角度来看这看起来不错。