如何同时使用 Rabbit 连接缓存模式和自动声明功能?

How to use Rabbit Connection Cache Mode and Autodeclare Features Together?

Spring AMQP Reference 说:

Starting with version 1.3, the CachingConnectionFactory can be configured to cache connections as well as just channels. In this case, each call to createConnection() creates a new connection (or retrieves an idle one from the cache). Closing a connection returns it to the cache (if the cache size has not been reached). Channels created on such connections are cached too. The use of separate connections might be useful in some environments, such as consuming from an HA cluster, in conjunction with a load balancer, to connect to different cluster members. Set the cacheMode to CacheMode.CONNECTION.

我在负载均衡器后面有一个 HA 集群,我可以清楚地看到当我使用 CacheMode.CHANEL 我的所有通道都来自一个连接并且它们都位于同一主机中。

当我更改为 CacheMode.CONNECTION 时,我确实看到通道中的差异,即并非所有通道都位于同一连接中,因此我看到正在使用的集群中有不同的主机。

所以我认为在这种特殊情况下,第二种配置对我来说很有意义。但是,文档还说:

When the cache mode is CONNECTION, automatic declaration of queues etc. is NOT supported.

显然这也是一个重要的功能,因为我不想手动应用我的配置中的任何更改。

所以,我的问题是,有没有办法同时使用这两种东西?有两个连接工厂是否有意义,一个用于 RabbitAdmin,另一个用于 RabbitTemplate、侦听器等?

问题是我们不知道在哪个连接上声明元素;我们最终会为创建的每个新连接重新声明所有内容。

如果您同意,您可以将自己的连接侦听器添加到连接工厂并在 RabbitAdmin bean 上调用 rabbitAdmin.initialize()(如果缓存模式是通道)。

或者您可以子类化 RabbitAdmin 并覆盖 afterPropertiesSet()

我想您可以添加更多逻辑来确定您是否真的需要进行声明 - 请参阅 RabbitAdmin.afterPropertiesSet() ...

/**
 * If {@link #setAutoStartup(boolean) autoStartup} is set to true, registers a callback on the
 * {@link ConnectionFactory} to declare all exchanges and queues in the enclosing application context. If the
 * callback fails then it may cause other clients of the connection factory to fail, but since only exchanges,
 * queues and bindings are declared failure is not expected.
 *
 * @see InitializingBean#afterPropertiesSet()
 * @see #initialize()
 */
@Override
public void afterPropertiesSet() {

    synchronized (this.lifecycleMonitor) {

        if (this.running || !this.autoStartup) {
            return;
        }

        if (this.connectionFactory instanceof CachingConnectionFactory &&
                ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
            this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
            return;
        }

        // Prevent stack overflow...
        final AtomicBoolean initializing = new AtomicBoolean(false);

        this.connectionFactory.addConnectionListener(connection -> {

            if (!initializing.compareAndSet(false, true)) {
                // If we are already initializing, we don't need to do it again...
                return;
            }
            try {
                /*
                 * ...but it is possible for this to happen twice in the same ConnectionFactory (if more than
                 * one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network
                 * chatter). In fact it might even be a good thing: exclusive queues only make sense if they are
                 * declared for every connection. If anyone has a problem with it: use auto-startup="false".
                 */
                initialize();
            }
            finally {
                initializing.compareAndSet(true, false);
            }

        });

        this.running = true;

    }
}

我们或许可以在管理员中将其设置为一个选项 - 请随意设置 contribution