在 Spring AMQP 中检测凭据删除并从中恢复

Detect and recover from credential deletion in Spring AMQP

我们有一个 Spring Cloud Config 设置,使用 Vault 数据库后端(MySQL 和 RabbitMQ),这使我们能够将生成的凭据注入属性,例如:

当我们的应用程序启动时,我们有一组新的 Rabbit 凭据,并且我们能够按需请求一组新的凭据。

由于我们的 Rabbit 凭据由 Vault 在外部管理,因此它们可能会在应用程序生命周期内随时过期/删除(这也是一个弹性测试场景)。

我的问题是我们如何(有效、可靠):

我们的工作基础是,作为弹性问题,这需要完全在客户端处理,即使服务器愿意或能够发送到期通知也是如此。

我们正在努力解决的问题是如何检测凭据过期,以便我们可以重新配置 CachingConnectionFactory

可能性包括:

  1. 我们现在拥有的是:ChannelListener 构建所有新创建的列表 Channels,并尝试 creating/deleting 匿名 Queue 每 x 秒,通过 ShutdownListener 监听任何 ShutdownSignalExceptions 可能有一个 403 状态码。这似乎可行,但有点复杂,我们已经看到并发问题在关闭处理程序中做任何重要的事情。
  2. 以某种方式连接到 CachingConnectionFactory。我们尝试使用 class 的克隆,但除了它的复杂性之外,我们最终遇到了 RESOURCE_LOCKED 创建队列的错误。
  3. 更简单、更轻量级的东西,例如只需每 x 秒轮询一次代理以验证当前凭据是否仍然存在。

部分问题是 ACCESS_REFUSED - 当 CachingConnectionFactory 尝试使用已删除的凭据时得到的结果 - 通常被视为致命的错误配置错误,而不是任何实际工作流程的一部分,或者可以从中恢复。

这里有优雅的解决方案吗?


使用:Spring Boot 1.5.10-RELEASE,Spring Cloud Dalston SR4


更新:

RabbitTemplate 方面,无论是否有 RetryTemplate,都不会抛出异常 - 即使 CachingConnectionFactory 正确检测到 ACCESS_REFUSED 到我正在发送的交换到.

配置为:

spring
  rabbitmq:
    host: rabbitmq.service.consul
    port: 5672
    virtualHost: /
    template:
      retry:
        enabled: true

代码是:

@Autowired private RabbitTemplate rt;  // From RabbitAutoConfiguration

@Bean
public DirectExchange emailExchange() {
    return new DirectExchange("email");
}

public void sendEmail() {
    this.rt.send("email", "email.send", "test payload");
}

应用程序启动,声明 email 交换。 RabbitMQ UI 显示我的(生成的)用户和与交换的连接,这在启动时很好。然后,我通过在 运行 本地测试之前使用 UI 手动删除该用户来模拟凭据过期,以调用上面的 sendEmail() 电子邮件。

RabbitTemplate 调用没有引发异常或记录错误,但记录了以下(预期的)错误:

[AMQP Connection 127.0.0.1:5672] ERROR o.s.a.r.c.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method(reply-code=403, reply-text=ACCESS_REFUSED - access to exchange 'email' in vhost '/' refused for user 'cert-configserver-75c3ae60-da76-3058-e7df-a7b90ef72171', class-id=60, method-id=40)

在所有 RabbitTemplate.send() 调用之前没有检查凭据,我想知道是否有任何方法可以在发送期间捕获 ACCESS_REFUSED 错误,以便我可以像为听众一样刷新凭据, 并给 RetryTemplate 一个重试的机会。

对于这种情况,侦听器容器会发出 ListenerContainerConsumerFailedEvent。你可以听这个,检查它的 reason 和异常,然后决定 stop() 容器并做你需要的其他事情。然后 start() 它再次使用新凭据使用代理。

RabbitTemplate 方面,出于同样的原因,只需要 try...catch 调用并分析异常。

这不是我到目前为止尝试过的方法,但这是我对如何处理 ACCESS_REFUSED 状态的最佳感觉。从CachingConnectionFactory的角度来看,你真的无能为力。

更新

我的申请是这样的:

spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1ms
logging.level.org.springframework.retry=DEBUG

@SpringBootApplication
public class So49155945Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext = SpringApplication.run(So49155945Application.class, args);
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        try {
            rabbitTemplate.convertAndSend("foo", "foo");
        }
        catch (AmqpException e) {
            System.err.println("Error during sending: " + e.getCause().getCause().getMessage());
        }
    }

}

这就是我 运行 这个 non-existing 用户的应用程序时我在控制台中看到的内容:

Error during sending: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.

更新 2

我还发现了我们可以制作这些道具:

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.template.mandatory=true

然后添加一个rabbitTemplate.setConfirmCallback(),我们的异步发送拒绝消息将被拒绝。然而它仍然是一个异步回调,类似于提到的 ChannelListener。从 Spring AMQP 的角度来看,确实没有什么可做的。一切都是 AMQP 协议的异步性质,可能真的需要来自 Rabbit Client 库的一些 "fail fast" 钩子。

请在rabbitmq-usersGoogle群里提问。那是 RabbitMQ 工程师常去的地方。

更新 3

作为 Broker 上此类事件的解决方案,可以使用 Event Exchange Plugin。特定的 user.deleteduser.password.changed 事件由 Broker 发出。

经过多次实验和调试,我选择了 and adopted the RabbitMQ Event Exchange Plugin

所以现在,与其尝试在 Spring 和 Rabbit 代码之间跟踪 ShutdownSignalExceptionListenerContainerConsumerFailedEvent 事件,一方面在 SimpleMessageListenerContainer 和 [=14= 之间] 另一方面,我只是订阅了一个交易所,让我的新 @RabbitListener 通知我凭证问题。这没有任何其他活动部件或 bean 声明,没有任何同步问题或线程阻塞,并且通常会顺应自动配置的流程而不是与之抗争。

我现在需要的是:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.cloud.endpoint.RefreshEndpoint;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

import static org.springframework.amqp.core.ExchangeTypes.TOPIC;

@Component
public class ReuathenticationListener {

    private static Logger log = LoggerFactory.getLogger(ReuathenticationListener.class);

    @Autowired private RabbitProperties rabbitProperties;
    @Autowired private RefreshEndpoint refreshEndpoint;
    @Autowired private CachingConnectionFactory connectionFactory;

    @RabbitListener(
        id = "credential_expiry_listener",
        bindings = @QueueBinding(value = @Queue(value="credentials.expiry", autoDelete="true", durable="false"),
            exchange = @Exchange(value="amq.rabbitmq.event", type=TOPIC, internal="true", durable="true"),
            key = "user.#")
    )
    public void expiryHandler(final MessageHeaders headers) {
        final String key = (String) headers.get("amqp_receivedRoutingKey");
        // See: https://www.rabbitmq.com/event-exchange.html
        if (!key.equals("user.deleted") &&
            !key.equals("user.authentication.failure")) {
            return;
        }

        final String failedName = (String) headers.get("name");
        final String prevUsername = rabbitProperties.getUsername();

        if (!failedName.equals(prevUsername)) {
            log.debug("Ignore expiry of unrelated user: " + failedName);
            return;
        }

        log.info("Refreshing Rabbit credentials...");
        refreshEndpoint.refresh();
        log.info("Refreshed username: '" + prevUsername + "' => '" + rabbitProperties.getUsername() + "'");

        connectionFactory.setUsername(rabbitProperties.getUsername());
        connectionFactory.setPassword(rabbitProperties.getPassword());
        connectionFactory.resetConnection();

        log.info("CachingConnectionFactory reset, reconnection should now begin.");
    }
}