Spring Rabbit - 信号量许可泄漏导致 "No available channels" 异常

Spring Rabbit - Semaphore permit leak leads to "No available channels" exception

我们为消费者使用 CachingConnectionFactory。随着每次连接断开,我都看到一个 checkoutPermit 被获取并且它从未被释放。所以假设如果我们使用默认缓存通道大小 25,下一次,当连接在断开后恢复时,可用许可数将为 24。一段时间后导致许可数为 0,从而导致异常 AmqpTimeoutException("No available channels").

我在 1.6.10-RELEASE、1.7.3-RELEASE 和 2.0.0-BUILD-SNAPSHOT 版本中观察到这种行为。

我们是否有可能以错误的方式使用该库,我们应该注意手动释放 checkoutPermit,可能是通过我们自己关闭通道? (在连接断开后永远不会调用 releasePermitIfNecessary)

提前致谢。


示例(使用 1.7.3-RELEASE)

配置

@Configuration
public class Config {

    @Bean
    public CachingConnectionFactory cachingConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("username");
        connectionFactory.setPassword("password");
        connectionFactory.setVirtualHost("vhost");
        connectionFactory.setChannelCheckoutTimeout(1200);
        connectionFactory.setConnectionTimeout(1000);
        connectionFactory.setPort(5672);
        return connectionFactory;
    }

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(CachingConnectionFactory cachingConnectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(cachingConnectionFactory);
        container.setQueueNames("test.queue");
        container.setMessageListener(new MessageListenerAdapter(new TestHandler()));
        container.start();
        return container;
    }

}

处理程序(仅供测试)

public class TestHandler {

    public String handleMessage(byte[] textBytes) {
        String text = new String(textBytes);
        System.out.println("Received: " + text);
        return text;
    }

}

我通过在 RabbitMQ 和我的应用程序之间使用代理来测试连接断开,我在其中手动断开与 RabbitMQ 的连接。

已确认。

这绝对是错误。当我们失去连接时,我们也会失去所有频道。因此我们必须重新设置关联的许可证。

请用正确的描述提出JIRA票。

同时我想作为一种解决方法,您不应该使用 etChannelCheckoutTimeout(1200) 并将其保留为 0,默认值是什么:

/**
 * Sets the channel checkout timeout. When greater than 0, enables channel limiting
 * in that the {@link #channelCacheSize} becomes the total number of available channels per
 * connection rather than a simple cache size. Note that changing the {@link #channelCacheSize}
 * does not affect the limit on existing connection(s), invoke {@link #destroy()} to cause a
 * new connection to be created with the new limit.
 * <p>
 * Since 1.5.5, also applies to getting a connection when the cache mode is CONNECTION.
 * @param channelCheckoutTimeout the timeout in milliseconds; default 0 (channel limiting not enabled).
 * @since 1.4.2
 * @see #setConnectionLimit(int)
 */
public void setChannelCheckoutTimeout(long channelCheckoutTimeout) {