Spring Rabbit - 信号量许可泄漏导致 "No available channels" 异常
Spring Rabbit - Semaphore permit leak leads to "No available channels" exception
我们为消费者使用 CachingConnectionFactory。随着每次连接断开,我都看到一个 checkoutPermit 被获取并且它从未被释放。所以假设如果我们使用默认缓存通道大小 25,下一次,当连接在断开后恢复时,可用许可数将为 24。一段时间后导致许可数为 0,从而导致异常 AmqpTimeoutException("No available channels").
我在 1.6.10-RELEASE、1.7.3-RELEASE 和 2.0.0-BUILD-SNAPSHOT 版本中观察到这种行为。
我们是否有可能以错误的方式使用该库,我们应该注意手动释放 checkoutPermit,可能是通过我们自己关闭通道? (在连接断开后永远不会调用 releasePermitIfNecessary)
提前致谢。
示例(使用 1.7.3-RELEASE)
配置
@Configuration
public class Config {
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("username");
connectionFactory.setPassword("password");
connectionFactory.setVirtualHost("vhost");
connectionFactory.setChannelCheckoutTimeout(1200);
connectionFactory.setConnectionTimeout(1000);
connectionFactory.setPort(5672);
return connectionFactory;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(CachingConnectionFactory cachingConnectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(cachingConnectionFactory);
container.setQueueNames("test.queue");
container.setMessageListener(new MessageListenerAdapter(new TestHandler()));
container.start();
return container;
}
}
处理程序(仅供测试)
public class TestHandler {
public String handleMessage(byte[] textBytes) {
String text = new String(textBytes);
System.out.println("Received: " + text);
return text;
}
}
我通过在 RabbitMQ 和我的应用程序之间使用代理来测试连接断开,我在其中手动断开与 RabbitMQ 的连接。
已确认。
这绝对是错误。当我们失去连接时,我们也会失去所有频道。因此我们必须重新设置关联的许可证。
请用正确的描述提出JIRA票。
同时我想作为一种解决方法,您不应该使用 etChannelCheckoutTimeout(1200)
并将其保留为 0
,默认值是什么:
/**
* Sets the channel checkout timeout. When greater than 0, enables channel limiting
* in that the {@link #channelCacheSize} becomes the total number of available channels per
* connection rather than a simple cache size. Note that changing the {@link #channelCacheSize}
* does not affect the limit on existing connection(s), invoke {@link #destroy()} to cause a
* new connection to be created with the new limit.
* <p>
* Since 1.5.5, also applies to getting a connection when the cache mode is CONNECTION.
* @param channelCheckoutTimeout the timeout in milliseconds; default 0 (channel limiting not enabled).
* @since 1.4.2
* @see #setConnectionLimit(int)
*/
public void setChannelCheckoutTimeout(long channelCheckoutTimeout) {
我们为消费者使用 CachingConnectionFactory。随着每次连接断开,我都看到一个 checkoutPermit 被获取并且它从未被释放。所以假设如果我们使用默认缓存通道大小 25,下一次,当连接在断开后恢复时,可用许可数将为 24。一段时间后导致许可数为 0,从而导致异常 AmqpTimeoutException("No available channels").
我在 1.6.10-RELEASE、1.7.3-RELEASE 和 2.0.0-BUILD-SNAPSHOT 版本中观察到这种行为。
我们是否有可能以错误的方式使用该库,我们应该注意手动释放 checkoutPermit,可能是通过我们自己关闭通道? (在连接断开后永远不会调用 releasePermitIfNecessary)
提前致谢。
示例(使用 1.7.3-RELEASE)
配置
@Configuration
public class Config {
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("username");
connectionFactory.setPassword("password");
connectionFactory.setVirtualHost("vhost");
connectionFactory.setChannelCheckoutTimeout(1200);
connectionFactory.setConnectionTimeout(1000);
connectionFactory.setPort(5672);
return connectionFactory;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(CachingConnectionFactory cachingConnectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(cachingConnectionFactory);
container.setQueueNames("test.queue");
container.setMessageListener(new MessageListenerAdapter(new TestHandler()));
container.start();
return container;
}
}
处理程序(仅供测试)
public class TestHandler {
public String handleMessage(byte[] textBytes) {
String text = new String(textBytes);
System.out.println("Received: " + text);
return text;
}
}
我通过在 RabbitMQ 和我的应用程序之间使用代理来测试连接断开,我在其中手动断开与 RabbitMQ 的连接。
已确认。
这绝对是错误。当我们失去连接时,我们也会失去所有频道。因此我们必须重新设置关联的许可证。
请用正确的描述提出JIRA票。
同时我想作为一种解决方法,您不应该使用 etChannelCheckoutTimeout(1200)
并将其保留为 0
,默认值是什么:
/**
* Sets the channel checkout timeout. When greater than 0, enables channel limiting
* in that the {@link #channelCacheSize} becomes the total number of available channels per
* connection rather than a simple cache size. Note that changing the {@link #channelCacheSize}
* does not affect the limit on existing connection(s), invoke {@link #destroy()} to cause a
* new connection to be created with the new limit.
* <p>
* Since 1.5.5, also applies to getting a connection when the cache mode is CONNECTION.
* @param channelCheckoutTimeout the timeout in milliseconds; default 0 (channel limiting not enabled).
* @since 1.4.2
* @see #setConnectionLimit(int)
*/
public void setChannelCheckoutTimeout(long channelCheckoutTimeout) {