Spring rabbitmq任务队列并发

Spring rabbitmq task queue concurrency

我有一个使用代理发出 http 请求的任务队列。代理限制为 10 个并发线程/连接。我无权访问代理的日志。

我正在使用以下代码,这是在名为 ntContainer#1-1container1 的两个线程上发出请求。这导致许多请求由于使用过多的代理连接而出错。

侦听器是否仅使用 1 个默认线程和额外的容器线程,还是 spring/rabbitmq 在幕后进行更多操作? 另外,我该如何进一步调试?

@Configuration
public class RabbitMQConfig {

    public final static String EXCHANGE_NAME = "my-tx";
    public final static String MY_PRODUCT_ROUTING_KEY = "my-product-routing-key";
    public final static String MY_PRODUCT_QUEUE = "my-product";

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    public Queue myProductQueue() {
        return new Queue(MY_PRODUCT_QUEUE);
    }

    @Bean
    Binding myProductBinding() {
        return BindingBuilder.bind(myProductQueue()).to(topicExchange()).with(MY_PRODUCT_ROUTING_KEY);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(MY_PRODUCT_QUEUE);
        container.setMessageListener(messageListenerAdapter);
        container.setPrefetchCount(1);
        container.setConcurrentConsumers(1);
        return container;
    }

    @Bean
    MessageListenerAdapter messageListenerAdapter(MyListener myListener) {
        return new MessageListenerAdapter(myListener, "process");
    }
}

// 监听器

@RabbitListener(queues = RabbitMQConfig.MY_PRODUCT_QUEUE)
public void process(final Message message) {
    // something like this
    Jsoup.connect(message.getUrl()).proxy().execute()
}

糟糕;我正在看我的 'phone; 上的问题。我跳过了容器 bean;我以为容器 bean 是容器工厂而不是容器。

您有 2 个侦听器容器 -

@RabbitListener(queues = RabbitMQConfig.MY_PRODUCT_QUEUE)
public void process(final Message message) {
    // something like this
    Jsoup.connect(message.getUrl()).proxy().execute()
}

框架将自动为该侦听器创建一个容器(它会检测注释)并且您已经明确声明了另一个容器@Bean

The proxy is limited to 10 concurrent threads / connections.

即使有 2 个容器,您也只会得到 2 个线程,而不是 10 个。