Rabbit SimpleMessageListenerContainer 不会关闭
Rabbit SimpleMessageListenerContainer won't shut down
从 开始,我们遇到了 Rabbit 凭证失效的情况,我们需要在 CachingConnectionFactory
上调用 resetConnection()
来获取一组新的凭证。
我们在 ShutdownSignalException
处理程序中执行此操作,它基本上可以正常工作。不起作用的是我们还需要重新启动我们的侦听器。我们有一些:
@RabbitListener(
id = ABC,
bindings = @QueueBinding(value = @Queue(value="myQ", durable="true"),
exchange = @Exchange(value="myExchange", durable="true"),
key = "myKey"),
containerFactory = "customQueueContainerFactory"
)
public void process(...) {
...
}
this answer (also )给人的印象是我们只需要做:
@Autowired RabbitListenerEndpointRegistry registry;
@Autowired CachingConnectionFactory connectionFactory;
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
refreshRabbitMQCredentials();
}
public void refreshRabbitMQCredentials() {
registry.stop(); // do this first
// Fetch credentials, update username/pass
connectionFactory.resetConnection(); // then this
registry.start(); // finally restart
}
问题是我调试了 SimpleMessageListenerContainer
,当这些容器中的第一个 doShutdown()
被调用时,Spring 试图取消 BlockingQueueConsumer
。
因为底层 Channel
仍然报告为打开 - 即使 RabbitMQ UI 没有报告任何连接或通道正在打开 - 一个取消事件被发送到内部的代理 ChannelN.basicCancel()
,但通道随后会永远阻塞以获取回复,结果容器关闭被完全阻塞。
我试过将 TaskExecutor
(Executors.newCachedThreadPool()
)注入容器并调用 shutdownNow()
或中断它们,但是 none 这会影响通道的阻塞等等。
看来我解锁频道的唯一选择是在取消期间触发额外的 ShutdownSignalException
,但是 (a) 我不知道该怎么做,并且 (b) 它看起来像在尝试再次关闭之前,我必须同时取消所有侦听器。
// com.rabbitmq.client.impl.ChannelN
@Override
public void basicCancel(final String consumerTag) throws IOException
{
// [snip]
rpc(new Basic.Cancel(consumerTag, false), k);
try {
k.getReply(); // <== BLOCKS HERE
} catch(ShutdownSignalException ex) {
throw wrap(ex);
}
metricsCollector.basicCancel(this, consumerTag);
}
我不确定为什么这证明如此困难。有没有更简单的方法强制 SimpleMessageListenerContainer
关机?
使用Spring Rabbit 1.7.6; AMQP 客户端 4.0.3; Spring 启动 1.5.10-RELEASE
更新
一些日志证明了消息容器在连接刷新完成之前重新启动的理论,这可能就是它们不重新连接的原因:
ERROR o.s.a.r.c.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
INFO u.c.c.c.r.ReauthenticatingChannelListener - Channel shutdown: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
INFO u.c.c.c.r.ReauthenticatingChannelListener - Channel closed with reply code 403. Assuming credentials have been revoked and refreshing config server properties to get new credentials. Cause: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
WARN u.c.c.c.r.ReauthenticatingChannelListener - Shutdown signalled: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
INFO u.c.c.c.r.RabbitMQReauthenticator - Refreshing Rabbit credentials for XXXXXXXX
INFO o.s.c.c.c.ConfigServicePropertySourceLocator - Fetching config from server at: http://localhost:8888/configuration
INFO u.c.c.c.r.ReauthenticatingChannelListener - Got ListenerContainerConsumerFailedEvent: Consumer raised exception, attempting restart
INFO o.s.a.r.l.SimpleMessageListenerContainer - Restarting Consumer@2db55dec: tags=[{amq.ctag-ebAfSnXLbw_W1hlZ5ag7sQ=consumer.myQ}], channel=Cached Rabbit Channel: AMQChannel(amqp://cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4@127.0.0.1:5672/,2), conn: Proxy@12de62aa Shared Rabbit Connection: SimpleConnection@56c95789 [delegate=amqp://cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4@127.0.0.1:5672/, localPort= 50052], acknowledgeMode=AUTO local queue size=0
INFO o.s.c.c.c.ConfigServicePropertySourceLocator - Located environment: name=myApp, profiles=[default], label=null, version=null, state=null
INFO com.zaxxer.hikari.HikariDataSource - XXXXXXXX - Shutdown initiated...
INFO com.zaxxer.hikari.HikariDataSource - XXXXXXXX - Shutdown completed.
INFO u.c.c.c.r.RabbitMQReauthenticator - Refreshed username: 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4' => 'cert-configserver-d7b54af2-0735-a9ed-7cc4-394803bf5e58'
INFO u.c.c.c.r.RabbitMQReauthenticator - CachingConnectionFactory reset, proceeding...
更新 2:
这似乎是某种竞争条件。删除容器停止/启动后,如果我向 SimpleMessageListenerContainer.restart()
添加一个线程断点让 resetConnection()
比赛过去,然后释放断点,那么我可以看到事情开始恢复:
16:18:47,208 INFO u.c.c.c.r.RabbitMQReauthenticator - CachingConnectionFactory reset
// Get ready to release the SMLC.restart() breakpoint...
16:19:02,072 INFO o.s.a.r.c.CachingConnectionFactory - Attempting to connect to: rabbitmq.service.consul:5672
16:19:02,083 INFO o.s.a.r.c.CachingConnectionFactory - Created new connection: connectionFactory#7489bca4:1/SimpleConnection@68546c13 [delegate=amqp://cert-configserver-132a07c2-94f3-0099-4de1-f0b1a9875d5a@127.0.0.1:5672/, localPort= 33350]
16:19:02,086 INFO o.s.amqp.rabbit.core.RabbitAdmin - Auto-declaring a non-durable, auto-delete, or exclusive Queue ...
16:19:02,095 DEBUG u.c.c.c.r.ReauthenticatingChannelListener - Active connection check succeeded for channel AMQChannel(amqp://cert-configserver-132a07c2-94f3-0099-4de1-f0b1a9875d5a@127.0.0.1:5672/,1)
16:19:02,120 INFO o.s.amqp.rabbit.core.RabbitAdmin - Auto-declaring a non-durable, auto-delete, or exclusive Queue (springCloudBus...
既然如此,我现在必须弄清楚如何延迟容器重新启动直到刷新完成(即我的 ShutdownSignalException
处理程序完成),或者以某种方式阻止刷新...
更新 3:
我的整体问题,这是一个症状,已通过以下方式解决:
完全不清楚为什么频道会报告为打开;这对我来说很好;它在删除用户 foo
...
后恢复
@SpringBootApplication
public class So49323291Application {
public static void main(String[] args) {
SpringApplication.run(So49323291Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitListenerEndpointRegistry registry, CachingConnectionFactory cf,
RabbitTemplate template) {
return args -> {
cf.setUsername("foo");
cf.setPassword("bar");
registry.start();
doSends(template);
registry.stop();
cf.resetConnection();
cf.setUsername("baz");
cf.setPassword("qux");
registry.start();
doSends(template);
};
}
public void doSends(RabbitTemplate template) {
while (true) {
try {
template.convertAndSend("foo", "Hello");
Thread.sleep(5_000);
}
catch (Exception e) {
e.printStackTrace();
break;
}
}
}
@RabbitListener(queues = "foo", autoStartup = "false")
public void in(Message in) {
System.out.println(in);
}
}
(Body:'Hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=foo, deliveryTag=4, consumerTag=amq.ctag-9zt3wUGYSJmoON3zw03wUw, consumerQueue=foo])
2018-03-16 11:24:01.451 ERROR 11867 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method(reply-code=320, reply-text=CONNECTION_FORCED - user 'foo' is deleted, class-id=0, method-id=0)
...
Caused by: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.
2018-03-16 11:24:01.745 ERROR 11867 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2018-03-16 11:24:03.740 INFO 11867 --- [cTaskExecutor-3] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2c4d1ac:3/SimpleConnection@5e9c036b [delegate=amqp://baz@127.0.0.1:5672/, localPort= 59346]
(Body:'Hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=foo, deliveryTag=1, consumerTag=amq.ctag-ljnY00TBuvy5cCAkpD3r4A, consumerQueue=foo])
但是,您确实不需要 stop/start 注册表,只需使用新凭据重新配置连接工厂并调用 resetConnection()
;容器将恢复。
从 CachingConnectionFactory
上调用 resetConnection()
来获取一组新的凭证。
我们在 ShutdownSignalException
处理程序中执行此操作,它基本上可以正常工作。不起作用的是我们还需要重新启动我们的侦听器。我们有一些:
@RabbitListener(
id = ABC,
bindings = @QueueBinding(value = @Queue(value="myQ", durable="true"),
exchange = @Exchange(value="myExchange", durable="true"),
key = "myKey"),
containerFactory = "customQueueContainerFactory"
)
public void process(...) {
...
}
this answer (also
@Autowired RabbitListenerEndpointRegistry registry;
@Autowired CachingConnectionFactory connectionFactory;
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
refreshRabbitMQCredentials();
}
public void refreshRabbitMQCredentials() {
registry.stop(); // do this first
// Fetch credentials, update username/pass
connectionFactory.resetConnection(); // then this
registry.start(); // finally restart
}
问题是我调试了 SimpleMessageListenerContainer
,当这些容器中的第一个 doShutdown()
被调用时,Spring 试图取消 BlockingQueueConsumer
。
因为底层 Channel
仍然报告为打开 - 即使 RabbitMQ UI 没有报告任何连接或通道正在打开 - 一个取消事件被发送到内部的代理 ChannelN.basicCancel()
,但通道随后会永远阻塞以获取回复,结果容器关闭被完全阻塞。
我试过将 TaskExecutor
(Executors.newCachedThreadPool()
)注入容器并调用 shutdownNow()
或中断它们,但是 none 这会影响通道的阻塞等等。
看来我解锁频道的唯一选择是在取消期间触发额外的 ShutdownSignalException
,但是 (a) 我不知道该怎么做,并且 (b) 它看起来像在尝试再次关闭之前,我必须同时取消所有侦听器。
// com.rabbitmq.client.impl.ChannelN
@Override
public void basicCancel(final String consumerTag) throws IOException
{
// [snip]
rpc(new Basic.Cancel(consumerTag, false), k);
try {
k.getReply(); // <== BLOCKS HERE
} catch(ShutdownSignalException ex) {
throw wrap(ex);
}
metricsCollector.basicCancel(this, consumerTag);
}
我不确定为什么这证明如此困难。有没有更简单的方法强制 SimpleMessageListenerContainer
关机?
使用Spring Rabbit 1.7.6; AMQP 客户端 4.0.3; Spring 启动 1.5.10-RELEASE
更新
一些日志证明了消息容器在连接刷新完成之前重新启动的理论,这可能就是它们不重新连接的原因:
ERROR o.s.a.r.c.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
INFO u.c.c.c.r.ReauthenticatingChannelListener - Channel shutdown: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
INFO u.c.c.c.r.ReauthenticatingChannelListener - Channel closed with reply code 403. Assuming credentials have been revoked and refreshing config server properties to get new credentials. Cause: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
WARN u.c.c.c.r.ReauthenticatingChannelListener - Shutdown signalled: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
INFO u.c.c.c.r.RabbitMQReauthenticator - Refreshing Rabbit credentials for XXXXXXXX
INFO o.s.c.c.c.ConfigServicePropertySourceLocator - Fetching config from server at: http://localhost:8888/configuration
INFO u.c.c.c.r.ReauthenticatingChannelListener - Got ListenerContainerConsumerFailedEvent: Consumer raised exception, attempting restart
INFO o.s.a.r.l.SimpleMessageListenerContainer - Restarting Consumer@2db55dec: tags=[{amq.ctag-ebAfSnXLbw_W1hlZ5ag7sQ=consumer.myQ}], channel=Cached Rabbit Channel: AMQChannel(amqp://cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4@127.0.0.1:5672/,2), conn: Proxy@12de62aa Shared Rabbit Connection: SimpleConnection@56c95789 [delegate=amqp://cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4@127.0.0.1:5672/, localPort= 50052], acknowledgeMode=AUTO local queue size=0
INFO o.s.c.c.c.ConfigServicePropertySourceLocator - Located environment: name=myApp, profiles=[default], label=null, version=null, state=null
INFO com.zaxxer.hikari.HikariDataSource - XXXXXXXX - Shutdown initiated...
INFO com.zaxxer.hikari.HikariDataSource - XXXXXXXX - Shutdown completed.
INFO u.c.c.c.r.RabbitMQReauthenticator - Refreshed username: 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4' => 'cert-configserver-d7b54af2-0735-a9ed-7cc4-394803bf5e58'
INFO u.c.c.c.r.RabbitMQReauthenticator - CachingConnectionFactory reset, proceeding...
更新 2:
这似乎是某种竞争条件。删除容器停止/启动后,如果我向 SimpleMessageListenerContainer.restart()
添加一个线程断点让 resetConnection()
比赛过去,然后释放断点,那么我可以看到事情开始恢复:
16:18:47,208 INFO u.c.c.c.r.RabbitMQReauthenticator - CachingConnectionFactory reset
// Get ready to release the SMLC.restart() breakpoint...
16:19:02,072 INFO o.s.a.r.c.CachingConnectionFactory - Attempting to connect to: rabbitmq.service.consul:5672
16:19:02,083 INFO o.s.a.r.c.CachingConnectionFactory - Created new connection: connectionFactory#7489bca4:1/SimpleConnection@68546c13 [delegate=amqp://cert-configserver-132a07c2-94f3-0099-4de1-f0b1a9875d5a@127.0.0.1:5672/, localPort= 33350]
16:19:02,086 INFO o.s.amqp.rabbit.core.RabbitAdmin - Auto-declaring a non-durable, auto-delete, or exclusive Queue ...
16:19:02,095 DEBUG u.c.c.c.r.ReauthenticatingChannelListener - Active connection check succeeded for channel AMQChannel(amqp://cert-configserver-132a07c2-94f3-0099-4de1-f0b1a9875d5a@127.0.0.1:5672/,1)
16:19:02,120 INFO o.s.amqp.rabbit.core.RabbitAdmin - Auto-declaring a non-durable, auto-delete, or exclusive Queue (springCloudBus...
既然如此,我现在必须弄清楚如何延迟容器重新启动直到刷新完成(即我的 ShutdownSignalException
处理程序完成),或者以某种方式阻止刷新...
更新 3:
我的整体问题,这是一个症状,已通过以下方式解决:
完全不清楚为什么频道会报告为打开;这对我来说很好;它在删除用户 foo
...
@SpringBootApplication
public class So49323291Application {
public static void main(String[] args) {
SpringApplication.run(So49323291Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitListenerEndpointRegistry registry, CachingConnectionFactory cf,
RabbitTemplate template) {
return args -> {
cf.setUsername("foo");
cf.setPassword("bar");
registry.start();
doSends(template);
registry.stop();
cf.resetConnection();
cf.setUsername("baz");
cf.setPassword("qux");
registry.start();
doSends(template);
};
}
public void doSends(RabbitTemplate template) {
while (true) {
try {
template.convertAndSend("foo", "Hello");
Thread.sleep(5_000);
}
catch (Exception e) {
e.printStackTrace();
break;
}
}
}
@RabbitListener(queues = "foo", autoStartup = "false")
public void in(Message in) {
System.out.println(in);
}
}
(Body:'Hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=foo, deliveryTag=4, consumerTag=amq.ctag-9zt3wUGYSJmoON3zw03wUw, consumerQueue=foo])
2018-03-16 11:24:01.451 ERROR 11867 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method(reply-code=320, reply-text=CONNECTION_FORCED - user 'foo' is deleted, class-id=0, method-id=0)
...
Caused by: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.
2018-03-16 11:24:01.745 ERROR 11867 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2018-03-16 11:24:03.740 INFO 11867 --- [cTaskExecutor-3] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2c4d1ac:3/SimpleConnection@5e9c036b [delegate=amqp://baz@127.0.0.1:5672/, localPort= 59346]
(Body:'Hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=foo, deliveryTag=1, consumerTag=amq.ctag-ljnY00TBuvy5cCAkpD3r4A, consumerQueue=foo])
但是,您确实不需要 stop/start 注册表,只需使用新凭据重新配置连接工厂并调用 resetConnection()
;容器将恢复。