Spring rabbitmq任务队列并发
Spring rabbitmq task queue concurrency
我有一个使用代理发出 http 请求的任务队列。代理限制为 10 个并发线程/连接。我无权访问代理的日志。
我正在使用以下代码,这是在名为 ntContainer#1-1
和 container1
的两个线程上发出请求。这导致许多请求由于使用过多的代理连接而出错。
侦听器是否仅使用 1 个默认线程和额外的容器线程,还是 spring/rabbitmq 在幕后进行更多操作?
另外,我该如何进一步调试?
@Configuration
public class RabbitMQConfig {
public final static String EXCHANGE_NAME = "my-tx";
public final static String MY_PRODUCT_ROUTING_KEY = "my-product-routing-key";
public final static String MY_PRODUCT_QUEUE = "my-product";
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE_NAME);
}
@Bean
public Queue myProductQueue() {
return new Queue(MY_PRODUCT_QUEUE);
}
@Bean
Binding myProductBinding() {
return BindingBuilder.bind(myProductQueue()).to(topicExchange()).with(MY_PRODUCT_ROUTING_KEY);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(MY_PRODUCT_QUEUE);
container.setMessageListener(messageListenerAdapter);
container.setPrefetchCount(1);
container.setConcurrentConsumers(1);
return container;
}
@Bean
MessageListenerAdapter messageListenerAdapter(MyListener myListener) {
return new MessageListenerAdapter(myListener, "process");
}
}
// 监听器
@RabbitListener(queues = RabbitMQConfig.MY_PRODUCT_QUEUE)
public void process(final Message message) {
// something like this
Jsoup.connect(message.getUrl()).proxy().execute()
}
糟糕;我正在看我的 'phone; 上的问题。我跳过了容器 bean;我以为容器 bean 是容器工厂而不是容器。
您有 2 个侦听器容器 -
@RabbitListener(queues = RabbitMQConfig.MY_PRODUCT_QUEUE)
public void process(final Message message) {
// something like this
Jsoup.connect(message.getUrl()).proxy().execute()
}
框架将自动为该侦听器创建一个容器(它会检测注释)并且您已经明确声明了另一个容器@Bean
。
The proxy is limited to 10 concurrent threads / connections.
即使有 2 个容器,您也只会得到 2 个线程,而不是 10 个。
我有一个使用代理发出 http 请求的任务队列。代理限制为 10 个并发线程/连接。我无权访问代理的日志。
我正在使用以下代码,这是在名为 ntContainer#1-1
和 container1
的两个线程上发出请求。这导致许多请求由于使用过多的代理连接而出错。
侦听器是否仅使用 1 个默认线程和额外的容器线程,还是 spring/rabbitmq 在幕后进行更多操作? 另外,我该如何进一步调试?
@Configuration
public class RabbitMQConfig {
public final static String EXCHANGE_NAME = "my-tx";
public final static String MY_PRODUCT_ROUTING_KEY = "my-product-routing-key";
public final static String MY_PRODUCT_QUEUE = "my-product";
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE_NAME);
}
@Bean
public Queue myProductQueue() {
return new Queue(MY_PRODUCT_QUEUE);
}
@Bean
Binding myProductBinding() {
return BindingBuilder.bind(myProductQueue()).to(topicExchange()).with(MY_PRODUCT_ROUTING_KEY);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(MY_PRODUCT_QUEUE);
container.setMessageListener(messageListenerAdapter);
container.setPrefetchCount(1);
container.setConcurrentConsumers(1);
return container;
}
@Bean
MessageListenerAdapter messageListenerAdapter(MyListener myListener) {
return new MessageListenerAdapter(myListener, "process");
}
}
// 监听器
@RabbitListener(queues = RabbitMQConfig.MY_PRODUCT_QUEUE)
public void process(final Message message) {
// something like this
Jsoup.connect(message.getUrl()).proxy().execute()
}
糟糕;我正在看我的 'phone; 上的问题。我跳过了容器 bean;我以为容器 bean 是容器工厂而不是容器。
您有 2 个侦听器容器 -
@RabbitListener(queues = RabbitMQConfig.MY_PRODUCT_QUEUE)
public void process(final Message message) {
// something like this
Jsoup.connect(message.getUrl()).proxy().execute()
}
框架将自动为该侦听器创建一个容器(它会检测注释)并且您已经明确声明了另一个容器@Bean
。
The proxy is limited to 10 concurrent threads / connections.
即使有 2 个容器,您也只会得到 2 个线程,而不是 10 个。