通过 named:channels 处理消息,预取 >= 1

Processing messages through named:channels with prefetch >= 1

大家晚上好

我 运行 使用 SpringXD-1.3.0.RELEASE(配置:3 个管理员和 3 个容器)和 RabbitMQ 3.5.7(3 个节点)进入以下场景:

stream create --name fire  --definition "time --timeUnit=MILLISECONDS  > topic:fire" --deploy
stream create --name fireeater_1  --definition "topic:fire > null"
stream create --name fireeater_2  --definition "topic:fire > null"
stream create --name fireeater_3  --definition "topic:fire > null"
stream create --name fireeater_4  --definition "topic:fire > null"
job create test123 --definition "timestampfile --directory=/tmp"

stream deploy fireeater_1 --properties "module.null.consumer.durableSubscription=true,module.null.count=0,module.null.consumer.prefetch=500"
stream deploy fireeater_2 --properties "module.null.consumer.durableSubscription=true,module.null.count=0,module.null.consumer.prefetch=500"
stream deploy fireeater_3 --properties "module.null.consumer.durableSubscription=true,module.null.count=0,module.null.consumer.prefetch=500"
stream deploy fireeater_4 --properties "module.null.consumer.durableSubscription=true,module.null.count=0,module.null.consumer.prefetch=500"

job deploy test123

几分钟后在一个容器上抛出此异常。

2016-01-13T18:08:47+0100 1.3.0.RELEASE WARN xdbus.job:test123-6 listener.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:721) ~[amqp-client-3.5.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:711) ~[amqp-client-3.5.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:569) ~[amqp-client-3.5.2.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 6
    at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:80) ~[amqp-client-3.5.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552) ~[amqp-client-3.5.2.jar:na]
    ... 1 common frames omitted

最后容器处理来自rabbitmq的消息非常慢。只有取消部署 "fire"-stream 或重新启动容器才会有所帮助。

作业使用的消息通道似乎被流关闭了。

-- 更新-- 今天我从日志中看到了更多的输出,似乎通道被消息总线本身关闭了。这里是异常日志

2016-02-23T15:05:18+0100 1.3.0.RELEASE ERROR AMQP Connection 10.0.3.210:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-02-23T15:05:18+0100 1.3.0.RELEASE WARN SimpleAsyncTaskExecutor-2 listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=13=]1(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:183) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1352) [spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:661) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1096) [spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1080) [spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access0(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) [spring-rabbit-1.5.2.RELEASE.jar!/:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]

Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@69022500]; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus$SendingHandler.handleMessageInternal(RabbitMessageBus.java:891) ~[na:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access0(AmqpInboundChannelAdapter.java:45) ~[spring-integration-amqp-4.2.2.RELEASE.jar!/:na]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.onMessage(AmqpInboundChannelAdapter.java:93) ~[spring-integration-amqp-4.2.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:757) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
... 10 common frames omitted

Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:67) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:51) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:833) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access0(CachingConnectionFactory.java:822) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:474) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:450) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:419) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:412) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access00(CachingConnectionFactory.java:86) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:838) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createChannel(ConnectionFactoryUtils.java:90) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:139) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:71) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1278) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1271) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:619) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:717) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:329) ~[spring-integration-amqp-4.2.2.RELEASE.jar!/:na]
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:321) ~[spring-integration-amqp-4.2.2.RELEASE.jar!/:na]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
... 25 common frames omitted
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:125) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:143) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:503) ~[amqp-client-3.5.2.jar!/:na]
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:44) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
... 44 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ~[amqp-client-3.5.2.jar!/:na]
... 48 common frames omitted
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 3289
at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:82) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:556) ~[na:na]
... 1 common frames omitted

 2016-02-23T15:05:18+0100 1.3.0.RELEASE ERROR AMQP Connection 10.0.3.210:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
 2016-02-23T15:05:18+0100 1.3.0.RELEASE WARN xdbus.test123-6 listener.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it
 com.rabbitmq.client.ShutdownSignalException: connection error
at      com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:739) ~[amqp-client-3.5.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:729) ~[amqp-client-3.5.2.jar:na]
at   com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:573) ~[amqp-client-3.5.2.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
 Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 3289
at  com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:82) ~[amqp-client-3.5.2.jar:na]
at  com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:556) ~[amqp-client-3.5.2.jar:na]
... 1 common frames omitted

2016-02-23T15:05:18+0100 1.3.0.RELEASE WARN xdbus.fireeater_1-1 listener.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:739) ~[amqp-client-3.5.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:729) ~[amqp-client-3.5.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:573) ~[amqp-client-3.5.2.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
 Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 3289
at   com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:82) ~[amqp-client-3.5.2.jar:na]
at   com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:556) ~[amqp-client-3.5.2.jar:na]
... 1 common frames omitted

有人知道为什么会抛出异常吗?如何避免?

问题似乎是在一个已经关闭的频道上返回回复。发生这种情况时,连接将被断开,所有用户都会收到该异常。据我从兔子客户端代码中可以看出,这不应该发生。

虽然一切都应该恢复。

Spring AMQP有一个通道缓存来避免关闭通道并允许它们被重用;增加通道缓存大小(默认为 1)可能会解决问题。你似乎在搅动很多频道。

Spring XD 当前不会将频道缓存大小公开为 属性。要解决此问题,请将以下文件(名为 rabbit-bus.xml)放在 xd/config/META-INF/spring-xd/bus.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg ref="rabbitFactory" />
        <property name="addresses" value="${spring.rabbitmq.addresses}" />
        <property name="username" value="${spring.rabbitmq.username}" />
        <property name="password" value="${spring.rabbitmq.password}" />
        <property name="virtualHost" value="${spring.rabbitmq.virtual_host}" />
        <property name="channelCacheSize" value="${spring.rabbitmq.channelCacheSize:100}" />
    </bean>

    <bean id="rabbitFactory" class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
        <property name="useSSL" value="${spring.rabbitmq.useSSL:false}" />
        <property name="sslPropertiesLocation" value="${spring.rabbitmq.sslProperties:}"/>
    </bean>

    <bean id="messageBus" class="org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus">
        <constructor-arg ref="rabbitConnectionFactory" />
        <constructor-arg ref="codec"/>
        <property name="defaultAcknowledgeMode" value="#{T(org.springframework.amqp.core.AcknowledgeMode).${xd.messagebus.rabbit.default.ackMode}}" />
        <property name="defaultBackOffInitialInterval" value="${xd.messagebus.rabbit.default.backOffInitialInterval}" />
        <property name="defaultBackOffMaxInterval" value="${xd.messagebus.rabbit.default.backOffMaxInterval}" />
        <property name="defaultBackOffMultiplier" value="${xd.messagebus.rabbit.default.backOffMultiplier}" />
        <property name="defaultChannelTransacted" value="${xd.messagebus.rabbit.default.transacted}" />
        <property name="defaultConcurrency" value="${xd.messagebus.rabbit.default.concurrency}" />
        <property name="defaultDefaultDeliveryMode" value="#{T(org.springframework.amqp.core.MessageDeliveryMode).${xd.messagebus.rabbit.default.deliveryMode}}" />
        <property name="defaultDefaultRequeueRejected" value="${xd.messagebus.rabbit.default.requeue}" />
        <property name="defaultMaxAttempts" value="${xd.messagebus.rabbit.default.maxAttempts}" />
        <property name="defaultMaxConcurrency" value="${xd.messagebus.rabbit.default.maxConcurrency}" />
        <property name="defaultPrefetchCount" value="${xd.messagebus.rabbit.default.prefetch}" />
        <property name="defaultPrefix" value="${xd.messagebus.rabbit.default.prefix}" />
        <property name="defaultReplyHeaderPatterns" value="${xd.messagebus.rabbit.default.replyHeaderPatterns}" />
        <property name="defaultRequestHeaderPatterns" value="${xd.messagebus.rabbit.default.requestHeaderPatterns}" />
        <property name="defaultTxSize" value="${xd.messagebus.rabbit.default.txSize}" />
        <property name="defaultAutoBindDLQ" value="${xd.messagebus.rabbit.default.autoBindDLQ}" />
        <property name="defaultRepublishToDLQ" value="${xd.messagebus.rabbit.default.republishToDLQ}" />
        <property name="defaultBatchingEnabled" value="${xd.messagebus.rabbit.default.batchingEnabled}" />
        <property name="defaultBatchSize" value="${xd.messagebus.rabbit.default.batchSize}" />
        <property name="defaultBatchBufferLimit" value="${xd.messagebus.rabbit.default.batchBufferLimit}" />
        <property name="defaultBatchTimeout" value="${xd.messagebus.rabbit.default.batchTimeout}" />
        <property name="defaultCompress" value="${xd.messagebus.rabbit.default.compress}" />
        <property name="compressingPostProcessor">
            <bean class="org.springframework.amqp.support.postprocessor.GZipPostProcessor">
                <property name="level" value="${xd.messagebus.rabbit.compressionLevel:#{T(java.util.zip.Deflater).BEST_SPEED}}" />
            </bean>
        </property>
        <property name="decompressingPostProcessor">
            <bean class="org.springframework.amqp.support.postprocessor.DelegatingDecompressingPostProcessor">
                <!-- set a map of decompressors here if using other than the default -->
            </bean>
        </property>
        <property name="defaultDurableSubscription" value="${xd.messagebus.rabbit.default.durableSubscription}" />
        <property name="addresses" value="${spring.rabbitmq.addresses:}" />
        <property name="adminAddresses" value="${spring.rabbitmq.adminAddresses:}" />
        <property name="nodes" value="${spring.rabbitmq.nodes:}" />
        <property name="username" value="${spring.rabbitmq.username:}" />
        <property name="password" value="${spring.rabbitmq.password:}" />
        <property name="vhost" value="${spring.rabbitmq.virtual_host:}" />
        <property name="useSSL" value="${spring.rabbitmq.useSSL:false}" />
        <property name="sslPropertiesLocation" value="${spring.rabbitmq.sslProperties:}" />
    </bean>

</beans>

这将替换标准总线配置(它是除了连接工厂配置之外的副本)。

默认情况下会将缓存设置为 100,但您可以在 servers.yml 中使用

覆盖它
spring:
  rabbitmq:
    channelCacheSize: 200

编辑

有一个更好的解决方法,使用总线扩展机制 - 只需将替换连接工厂 bean 放在总线扩展目录中...

$ cat xd/config/META-INF/spring-xd/bus/ext/cf.xml 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg ref="rabbitFactory" />
        <property name="addresses" value="${spring.rabbitmq.addresses}" />
        <property name="username" value="${spring.rabbitmq.username}" />
        <property name="password" value="${spring.rabbitmq.password}" />
        <property name="virtualHost" value="${spring.rabbitmq.virtual_host}" />
        <property name="channelCacheSize" value="${spring.rabbitmq.channelCacheSize:100}" />
    </bean>

</beans>

任何以 .xml 结尾的文件名都将在该目录中找到。