在 Spring AMQP 中检测凭据删除并从中恢复
Detect and recover from credential deletion in Spring AMQP
我们有一个 Spring Cloud Config 设置,使用 Vault 数据库后端(MySQL 和 RabbitMQ),这使我们能够将生成的凭据注入属性,例如:
spring.rabbitmq.username
spring.rabbitmq.password
当我们的应用程序启动时,我们有一组新的 Rabbit 凭据,并且我们能够按需请求一组新的凭据。
由于我们的 Rabbit 凭据由 Vault 在外部管理,因此它们可能会在应用程序生命周期内随时过期/删除(这也是一个弹性测试场景)。
我的问题是我们如何(有效、可靠):
- 检测生成的凭证是否过期
- 使用新凭据更新我们现有的 Spring AMQP
CachingConnectionFactory
。
我们的工作基础是,作为弹性问题,这需要完全在客户端处理,即使服务器愿意或能够发送到期通知也是如此。
我们正在努力解决的问题是如何检测凭据过期,以便我们可以重新配置 CachingConnectionFactory
。
可能性包括:
- 我们现在拥有的是:
ChannelListener
构建所有新创建的列表
Channel
s,并尝试 creating/deleting 匿名 Queue
每 x 秒,通过 ShutdownListener
监听任何 ShutdownSignalException
s
可能有一个 403 状态码。这似乎可行,但有点复杂,我们已经看到并发问题在关闭处理程序中做任何重要的事情。
- 以某种方式连接到
CachingConnectionFactory
。我们尝试使用 class 的克隆,但除了它的复杂性之外,我们最终遇到了 RESOURCE_LOCKED
创建队列的错误。
- 更简单、更轻量级的东西,例如只需每 x 秒轮询一次代理以验证当前凭据是否仍然存在。
部分问题是 ACCESS_REFUSED
- 当 CachingConnectionFactory
尝试使用已删除的凭据时得到的结果 - 通常被视为致命的错误配置错误,而不是任何实际工作流程的一部分,或者可以从中恢复。
这里有优雅的解决方案吗?
使用:Spring Boot 1.5.10-RELEASE,Spring Cloud Dalston SR4
更新:
在 RabbitTemplate
方面,无论是否有 RetryTemplate
,都不会抛出异常 - 即使 CachingConnectionFactory
正确检测到 ACCESS_REFUSED
到我正在发送的交换到.
配置为:
spring
rabbitmq:
host: rabbitmq.service.consul
port: 5672
virtualHost: /
template:
retry:
enabled: true
代码是:
@Autowired private RabbitTemplate rt; // From RabbitAutoConfiguration
@Bean
public DirectExchange emailExchange() {
return new DirectExchange("email");
}
public void sendEmail() {
this.rt.send("email", "email.send", "test payload");
}
应用程序启动,声明 email
交换。 RabbitMQ UI 显示我的(生成的)用户和与交换的连接,这在启动时很好。然后,我通过在 运行 本地测试之前使用 UI 手动删除该用户来模拟凭据过期,以调用上面的 sendEmail()
电子邮件。
RabbitTemplate
调用没有引发异常或记录错误,但记录了以下(预期的)错误:
[AMQP Connection 127.0.0.1:5672] ERROR
o.s.a.r.c.CachingConnectionFactory - Channel shutdown: channel error;
protocol method: #method(reply-code=403,
reply-text=ACCESS_REFUSED - access to exchange 'email' in vhost '/'
refused for user
'cert-configserver-75c3ae60-da76-3058-e7df-a7b90ef72171', class-id=60,
method-id=40)
在所有 RabbitTemplate.send()
调用之前没有检查凭据,我想知道是否有任何方法可以在发送期间捕获 ACCESS_REFUSED
错误,以便我可以像为听众一样刷新凭据, 并给 RetryTemplate
一个重试的机会。
对于这种情况,侦听器容器会发出 ListenerContainerConsumerFailedEvent
。你可以听这个,检查它的 reason
和异常,然后决定 stop()
容器并做你需要的其他事情。然后 start()
它再次使用新凭据使用代理。
在 RabbitTemplate
方面,出于同样的原因,只需要 try...catch
调用并分析异常。
这不是我到目前为止尝试过的方法,但这是我对如何处理 ACCESS_REFUSED
状态的最佳感觉。从CachingConnectionFactory
的角度来看,你真的无能为力。
更新
我的申请是这样的:
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1ms
logging.level.org.springframework.retry=DEBUG
@SpringBootApplication
public class So49155945Application {
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(So49155945Application.class, args);
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
try {
rabbitTemplate.convertAndSend("foo", "foo");
}
catch (AmqpException e) {
System.err.println("Error during sending: " + e.getCause().getCause().getMessage());
}
}
}
这就是我 运行 这个 non-existing 用户的应用程序时我在控制台中看到的内容:
Error during sending: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.
更新 2
我还发现了我们可以制作这些道具:
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.template.mandatory=true
然后添加一个rabbitTemplate.setConfirmCallback()
,我们的异步发送拒绝消息将被拒绝。然而它仍然是一个异步回调,类似于提到的 ChannelListener
。从 Spring AMQP 的角度来看,确实没有什么可做的。一切都是 AMQP 协议的异步性质,可能真的需要来自 Rabbit Client 库的一些 "fail fast" 钩子。
请在rabbitmq-users
Google群里提问。那是 RabbitMQ 工程师常去的地方。
更新 3
作为 Broker 上此类事件的解决方案,可以使用 Event Exchange Plugin。特定的 user.deleted
或 user.password.changed
事件由 Broker 发出。
经过多次实验和调试,我选择了 and adopted the RabbitMQ Event Exchange Plugin。
所以现在,与其尝试在 Spring 和 Rabbit 代码之间跟踪 ShutdownSignalException
和 ListenerContainerConsumerFailedEvent
事件,一方面在 SimpleMessageListenerContainer
和 [=14= 之间] 另一方面,我只是订阅了一个交易所,让我的新 @RabbitListener
通知我凭证问题。这没有任何其他活动部件或 bean 声明,没有任何同步问题或线程阻塞,并且通常会顺应自动配置的流程而不是与之抗争。
我现在需要的是:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.cloud.endpoint.RefreshEndpoint;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import static org.springframework.amqp.core.ExchangeTypes.TOPIC;
@Component
public class ReuathenticationListener {
private static Logger log = LoggerFactory.getLogger(ReuathenticationListener.class);
@Autowired private RabbitProperties rabbitProperties;
@Autowired private RefreshEndpoint refreshEndpoint;
@Autowired private CachingConnectionFactory connectionFactory;
@RabbitListener(
id = "credential_expiry_listener",
bindings = @QueueBinding(value = @Queue(value="credentials.expiry", autoDelete="true", durable="false"),
exchange = @Exchange(value="amq.rabbitmq.event", type=TOPIC, internal="true", durable="true"),
key = "user.#")
)
public void expiryHandler(final MessageHeaders headers) {
final String key = (String) headers.get("amqp_receivedRoutingKey");
// See: https://www.rabbitmq.com/event-exchange.html
if (!key.equals("user.deleted") &&
!key.equals("user.authentication.failure")) {
return;
}
final String failedName = (String) headers.get("name");
final String prevUsername = rabbitProperties.getUsername();
if (!failedName.equals(prevUsername)) {
log.debug("Ignore expiry of unrelated user: " + failedName);
return;
}
log.info("Refreshing Rabbit credentials...");
refreshEndpoint.refresh();
log.info("Refreshed username: '" + prevUsername + "' => '" + rabbitProperties.getUsername() + "'");
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
connectionFactory.resetConnection();
log.info("CachingConnectionFactory reset, reconnection should now begin.");
}
}
我们有一个 Spring Cloud Config 设置,使用 Vault 数据库后端(MySQL 和 RabbitMQ),这使我们能够将生成的凭据注入属性,例如:
spring.rabbitmq.username
spring.rabbitmq.password
当我们的应用程序启动时,我们有一组新的 Rabbit 凭据,并且我们能够按需请求一组新的凭据。
由于我们的 Rabbit 凭据由 Vault 在外部管理,因此它们可能会在应用程序生命周期内随时过期/删除(这也是一个弹性测试场景)。
我的问题是我们如何(有效、可靠):
- 检测生成的凭证是否过期
- 使用新凭据更新我们现有的 Spring AMQP
CachingConnectionFactory
。
我们的工作基础是,作为弹性问题,这需要完全在客户端处理,即使服务器愿意或能够发送到期通知也是如此。
我们正在努力解决的问题是如何检测凭据过期,以便我们可以重新配置 CachingConnectionFactory
。
可能性包括:
- 我们现在拥有的是:
ChannelListener
构建所有新创建的列表Channel
s,并尝试 creating/deleting 匿名Queue
每 x 秒,通过ShutdownListener
监听任何ShutdownSignalException
s 可能有一个 403 状态码。这似乎可行,但有点复杂,我们已经看到并发问题在关闭处理程序中做任何重要的事情。 - 以某种方式连接到
CachingConnectionFactory
。我们尝试使用 class 的克隆,但除了它的复杂性之外,我们最终遇到了RESOURCE_LOCKED
创建队列的错误。 - 更简单、更轻量级的东西,例如只需每 x 秒轮询一次代理以验证当前凭据是否仍然存在。
部分问题是 ACCESS_REFUSED
- 当 CachingConnectionFactory
尝试使用已删除的凭据时得到的结果 - 通常被视为致命的错误配置错误,而不是任何实际工作流程的一部分,或者可以从中恢复。
这里有优雅的解决方案吗?
使用:Spring Boot 1.5.10-RELEASE,Spring Cloud Dalston SR4
更新:
在 RabbitTemplate
方面,无论是否有 RetryTemplate
,都不会抛出异常 - 即使 CachingConnectionFactory
正确检测到 ACCESS_REFUSED
到我正在发送的交换到.
配置为:
spring
rabbitmq:
host: rabbitmq.service.consul
port: 5672
virtualHost: /
template:
retry:
enabled: true
代码是:
@Autowired private RabbitTemplate rt; // From RabbitAutoConfiguration
@Bean
public DirectExchange emailExchange() {
return new DirectExchange("email");
}
public void sendEmail() {
this.rt.send("email", "email.send", "test payload");
}
应用程序启动,声明 email
交换。 RabbitMQ UI 显示我的(生成的)用户和与交换的连接,这在启动时很好。然后,我通过在 运行 本地测试之前使用 UI 手动删除该用户来模拟凭据过期,以调用上面的 sendEmail()
电子邮件。
RabbitTemplate
调用没有引发异常或记录错误,但记录了以下(预期的)错误:
[AMQP Connection 127.0.0.1:5672] ERROR o.s.a.r.c.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method(reply-code=403, reply-text=ACCESS_REFUSED - access to exchange 'email' in vhost '/' refused for user 'cert-configserver-75c3ae60-da76-3058-e7df-a7b90ef72171', class-id=60, method-id=40)
在所有 RabbitTemplate.send()
调用之前没有检查凭据,我想知道是否有任何方法可以在发送期间捕获 ACCESS_REFUSED
错误,以便我可以像为听众一样刷新凭据, 并给 RetryTemplate
一个重试的机会。
对于这种情况,侦听器容器会发出 ListenerContainerConsumerFailedEvent
。你可以听这个,检查它的 reason
和异常,然后决定 stop()
容器并做你需要的其他事情。然后 start()
它再次使用新凭据使用代理。
在 RabbitTemplate
方面,出于同样的原因,只需要 try...catch
调用并分析异常。
这不是我到目前为止尝试过的方法,但这是我对如何处理 ACCESS_REFUSED
状态的最佳感觉。从CachingConnectionFactory
的角度来看,你真的无能为力。
更新
我的申请是这样的:
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1ms
logging.level.org.springframework.retry=DEBUG
@SpringBootApplication
public class So49155945Application {
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(So49155945Application.class, args);
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
try {
rabbitTemplate.convertAndSend("foo", "foo");
}
catch (AmqpException e) {
System.err.println("Error during sending: " + e.getCause().getCause().getMessage());
}
}
}
这就是我 运行 这个 non-existing 用户的应用程序时我在控制台中看到的内容:
Error during sending: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.
更新 2
我还发现了我们可以制作这些道具:
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.template.mandatory=true
然后添加一个rabbitTemplate.setConfirmCallback()
,我们的异步发送拒绝消息将被拒绝。然而它仍然是一个异步回调,类似于提到的 ChannelListener
。从 Spring AMQP 的角度来看,确实没有什么可做的。一切都是 AMQP 协议的异步性质,可能真的需要来自 Rabbit Client 库的一些 "fail fast" 钩子。
请在rabbitmq-users
Google群里提问。那是 RabbitMQ 工程师常去的地方。
更新 3
作为 Broker 上此类事件的解决方案,可以使用 Event Exchange Plugin。特定的 user.deleted
或 user.password.changed
事件由 Broker 发出。
经过多次实验和调试,我选择了
所以现在,与其尝试在 Spring 和 Rabbit 代码之间跟踪 ShutdownSignalException
和 ListenerContainerConsumerFailedEvent
事件,一方面在 SimpleMessageListenerContainer
和 [=14= 之间] 另一方面,我只是订阅了一个交易所,让我的新 @RabbitListener
通知我凭证问题。这没有任何其他活动部件或 bean 声明,没有任何同步问题或线程阻塞,并且通常会顺应自动配置的流程而不是与之抗争。
我现在需要的是:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.cloud.endpoint.RefreshEndpoint;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import static org.springframework.amqp.core.ExchangeTypes.TOPIC;
@Component
public class ReuathenticationListener {
private static Logger log = LoggerFactory.getLogger(ReuathenticationListener.class);
@Autowired private RabbitProperties rabbitProperties;
@Autowired private RefreshEndpoint refreshEndpoint;
@Autowired private CachingConnectionFactory connectionFactory;
@RabbitListener(
id = "credential_expiry_listener",
bindings = @QueueBinding(value = @Queue(value="credentials.expiry", autoDelete="true", durable="false"),
exchange = @Exchange(value="amq.rabbitmq.event", type=TOPIC, internal="true", durable="true"),
key = "user.#")
)
public void expiryHandler(final MessageHeaders headers) {
final String key = (String) headers.get("amqp_receivedRoutingKey");
// See: https://www.rabbitmq.com/event-exchange.html
if (!key.equals("user.deleted") &&
!key.equals("user.authentication.failure")) {
return;
}
final String failedName = (String) headers.get("name");
final String prevUsername = rabbitProperties.getUsername();
if (!failedName.equals(prevUsername)) {
log.debug("Ignore expiry of unrelated user: " + failedName);
return;
}
log.info("Refreshing Rabbit credentials...");
refreshEndpoint.refresh();
log.info("Refreshed username: '" + prevUsername + "' => '" + rabbitProperties.getUsername() + "'");
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
connectionFactory.resetConnection();
log.info("CachingConnectionFactory reset, reconnection should now begin.");
}
}