将 Spring AMQP consumer/producer 服务迁移到 Spring 流源的问题
Issues migrating a Spring AMQP consumer/producer service to a Spring Stream source
我正在迁移一个 Spring 引导微服务,它使用服务器 A 上的 3 个 RabbitMQ 队列中的数据,将其保存到 Redis 中,最后将消息生成到服务器 B 上不同 RabbitMQ 中的交换中,这样这些消息就可以被另一个微服务消费。此流程运行良好,但我想使用 RabbitMQ 活页夹将其迁移到 Spring Cloud Stream。所有 Spring AMQP 配置都在属性文件中自定义,没有 spring 属性 用于创建连接、队列、绑定等...
我的第一个想法是在 Spring Cloud Stream 中设置两个绑定,一个连接到服务器 A(消费者),另一个连接到服务器 B(生产者),并将现有代码迁移到处理器但是我放弃了它,因为如果使用多个绑定器,似乎还无法设置连接名称,我需要添加几个绑定以从服务器 A 的队列中使用,而 bindingRoutingKey 属性 不支持值列表(我知道它可以是按照说明 here).
以编程方式完成
所以我决定只重构与生产者相关的代码部分,以便在 RabbitMQ 上使用 Spring Cloud Stream,因此相同的微服务应该通过来自服务器 A 的 Spring AMQP 使用(原始代码)并且应该通过 Spring Cloud Stream 生成到服务器 B。
我发现的第一个问题是 Spring Cloud Stream 中的 NonUniqueBeanDefinitionException,因为 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory bean 被 handlerMethodFactory 和 定义了两次integrationMessageHandlerMethodFactory 个名称。
org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type 'org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory' available: expected single matching bean but found 2: handlerMethodFactory,integrationMessageHandlerMethodFactory
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveNamedBean(DefaultListableBeanFactory.java:1144)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveBean(DefaultListableBeanFactory.java:411)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:344)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:337)
at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1123)
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:317)
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:113)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:862)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:877)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:743)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:390)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:312)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1214)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1203)
似乎前一个 bean 是由 Spring AMQP 创建的,而后者是由 Spring Cloud Stream 创建的,所以我创建了自己的主 bean:
@Bean
@Primary
public MessageHandlerMethodFactory messageHandlerMethodFactory() {
return new DefaultMessageHandlerMethodFactory();
}
现在应用程序可以启动,但输出通道是由服务器 A 中的 Spring Cloud Stream 创建的,而不是服务器 B。似乎 Spring Cloud Stream 配置正在使用创建的连接通过 Spring AMQP 而不是使用它自己的配置。
Spring AMQP 的配置是这样的:
@Bean
public SimpleRabbitListenerContainerFactory priceRabbitListenerContainerFactory(
ConnectionFactory consumerConnectionFactory) {
return
getSimpleRabbitListenerContainerFactory(
consumerConnectionFactory,
rabbitProperties.getConsumer().getListeners().get(LISTENER_A));
}
@Bean
public SimpleRabbitListenerContainerFactory maxbetRabbitListenerContainerFactory(
ConnectionFactory consumerConnectionFactory) {
return
getSimpleRabbitListenerContainerFactory(
consumerConnectionFactory,
rabbitProperties.getConsumer().getListeners().get(LISTENER_B));
}
@Bean
public ConnectionFactory consumerConnectionFactory() throws Exception {
return
new CachingConnectionFactory(
getRabbitConnectionFactoryBean(
rabbitProperties.getConsumer()
).getObject()
);
}
private SimpleRabbitListenerContainerFactory getSimpleRabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
RabbitProperties.ListenerProperties listenerProperties) {
//return a SimpleRabbitListenerContainerFactory set up from external properties
}
/**
* Create the AMQ Admin.
*/
@Bean
public AmqpAdmin consumerAmqpAdmin(ConnectionFactory consumerConnectionFactory) {
return new RabbitAdmin(consumerConnectionFactory);
}
/**
* Create the map of available queues and declare them in the admin.
*/
@Bean
public Map<String, Queue> queues(AmqpAdmin consumerAmqpAdmin) {
return
rabbitProperties.getConsumer().getListeners().entrySet().stream()
.map(listenerEntry -> {
Queue queue =
QueueBuilder
.nonDurable(listenerEntry.getValue().getQueueName())
.autoDelete()
.build();
consumerAmqpAdmin.declareQueue(queue);
return new AbstractMap.SimpleEntry<>(listenerEntry.getKey(), queue);
}).collect(
Collectors.toMap(
AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue
)
);
}
/**
* Create the map of available exchanges and declare them in the admin.
*/
@Bean
public Map<String, TopicExchange> exchanges(AmqpAdmin consumerAmqpAdmin) {
return
rabbitProperties.getConsumer().getListeners().entrySet().stream()
.map(listenerEntry -> {
TopicExchange exchange =
new TopicExchange(listenerEntry.getValue().getExchangeName());
consumerAmqpAdmin.declareExchange(exchange);
return new AbstractMap.SimpleEntry<>(listenerEntry.getKey(), exchange);
}).collect(
Collectors.toMap(
AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue
)
);
}
/**
* Create the list of bindings and declare them in the admin.
*/
@Bean
public List<Binding> bindings(Map<String, Queue> queues, Map<String, TopicExchange> exchanges, AmqpAdmin consumerAmqpAdmin) {
return
rabbitProperties.getConsumer().getListeners().keySet().stream()
.map(listenerName -> {
Queue queue = queues.get(listenerName);
TopicExchange exchange = exchanges.get(listenerName);
return
rabbitProperties.getConsumer().getListeners().get(listenerName).getKeys().stream()
.map(bindingKey -> {
Binding binding = BindingBuilder.bind(queue).to(exchange).with(bindingKey);
consumerAmqpAdmin.declareBinding(binding);
return binding;
}).collect(Collectors.toList());
}).flatMap(Collection::stream)
.collect(Collectors.toList());
}
消息侦听器是:
@RabbitListener(
queues="${consumer.listeners.LISTENER_A.queue-name}",
containerFactory = "priceRabbitListenerContainerFactory"
)
public void handleMessage(Message rawMessage, org.springframework.messaging.Message<ModelPayload> message) {
// call a service to process the message payload
}
@RabbitListener(
queues="${consumer.listeners.LISTENER_B.queue-name}",
containerFactory = "maxbetRabbitListenerContainerFactory"
)
public void handleMessage(Message rawMessage, org.springframework.messaging.Message<ModelPayload> message) {
// call a service to process the message payload
}
属性:
#
# Server A config (Spring AMQP)
#
consumer.host=server-a
consumer.username=
consumer.password=
consumer.port=5671
consumer.ssl.enabled=true
consumer.ssl.algorithm=TLSv1.2
consumer.ssl.validate-server-certificate=false
consumer.connection-name=local:microservice-1
consumer.thread-factory.thread-group-name=server-a-consumer
consumer.thread-factory.thread-name-prefix=server-a-consumer-
# LISTENER_A configuration
consumer.listeners.LISTENER_A.queue-name=local.listenerA
consumer.listeners.LISTENER_A.exchange-name=exchangeA
consumer.listeners.LISTENER_A.keys[0]=*.1.*.*
consumer.listeners.LISTENER_A.keys[1]=*.3.*.*
consumer.listeners.LISTENER_A.keys[2]=*.6.*.*
consumer.listeners.LISTENER_A.keys[3]=*.8.*.*
consumer.listeners.LISTENER_A.keys[4]=*.9.*.*
consumer.listeners.LISTENER_A.initial-concurrency=5
consumer.listeners.LISTENER_A.maximum-concurrency=20
consumer.listeners.LISTENER_A.thread-name-prefix=listenerA-consumer-
# LISTENER_B configuration
consumer.listeners.LISTENER_B.queue-name=local.listenerB
consumer.listeners.LISTENER_B.exchange-name=exchangeB
consumer.listeners.LISTENER_B.keys[0]=*.1.*
consumer.listeners.LISTENER_B.keys[1]=*.3.*
consumer.listeners.LISTENER_B.keys[2]=*.6.*
consumer.listeners.LISTENER_B.initial-concurrency=5
consumer.listeners.LISTENER_B.maximum-concurrency=20
consumer.listeners.LISTENER_B.thread-name-prefix=listenerB-consumer-
#
# Server B config (Spring Cloud Stream)
#
spring.rabbitmq.host=server-b
spring.rabbitmq.port=5672
spring.rabbitmq.username=
spring.rabbitmq.password=
spring.cloud.stream.bindings.outbound.destination=microservice-out
spring.cloud.stream.bindings.outbound.group=default
spring.cloud.stream.rabbit.binder.connection-name-prefix=local:microservice
所以我的问题是:是否可以在同一个 Spring 引导应用程序代码中使用,该代码通过 Spring AMQP 从 RabbitMQ 获取数据并通过 Spring 将消息生成到不同的服务器] 云流RabbitMQ?如果是,有人可以告诉我我做错了什么吗?
Spring AMQP版本为Boot version 2.1.7 (2.1.8-RELEASE)提供的版本 Spring Cloud Stream版本为Spring Cloud train提供的版本格林威治.SR2 (2.1.3.RELEASE).
编辑
我能够通过多个配置属性而不是默认配置属性来配置活页夹。所以使用这个配置它可以工作:
#
# Server B config (Spring Cloud Stream)
#
spring.cloud.stream.binders.transport-layer.type=rabbit
spring.cloud.stream.binders.transport-layer.environment.spring.rabbitmq.host=server-b
spring.cloud.stream.binders.transport-layer.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.transport-layer.environment.spring.rabbitmq.username=
spring.cloud.stream.binders.transport-layer.environment.spring.rabbitmq.password=
spring.cloud.stream.bindings.stream-output.destination=microservice-out
spring.cloud.stream.bindings.stream-output.group=default
不幸的是,在多个活页夹配置中还无法设置连接名称:A custom ConnectionNameStrategy
is ignored if there is a custom binder configuration。
无论如何,我仍然不明白为什么使用 Spring AMQP 和 Spring Cloud Stream RabbitMQ 时上下文似乎是 "mixed"。仍然需要设置一个主要的 MessageHandlerMethodFactory bean 才能使实现工作。
编辑
我发现 NoUniqueBeanDefinitionException 是因为微服务本身正在创建 ConditionalGenericConverter 以供 Spring 使用AMQP 部分反序列化来自服务器 A 的消息。
我删除了它并添加了一些 MessageConverters。现在问题解决了,不再需要@Primary bean了。
无关,但是
consumerAmqpAdmin.declareQueue(queue);
您永远不应在 @Bean
定义内与经纪人交流;在应用程序上下文生命周期中还为时过早。它可能有效,但 YMMV;如果代理不可用,它也会阻止您的应用程序启动。
最好定义类型为 Declarables
的 bean,其中包含队列、通道、绑定的列表,当连接首次成功打开时,管理员会自动声明它们。请参阅参考手册。
没见过MessageHandlerFactory
的问题; Spring AMQP 声明没有这样的 bean。如果您可以提供一个展示该行为的小型示例应用程序,那将会很有用。
我看看能否找到解决连接名称问题的方法。
编辑
我找到了解决连接名称问题的方法;它涉及一些反思,但它有效。我建议你打开一个new feature request against the binder请求一个机制来设置使用多个绑定器时的连接名称策略。
无论如何;这是解决方法...
@SpringBootApplication
@EnableBinding(Processor.class)
public class So57725710Application {
public static void main(String[] args) {
SpringApplication.run(So57725710Application.class, args);
}
@Bean
public Object connectionNameConfigurer(BinderFactory binderFactory) throws Exception {
setConnectionName(binderFactory, "rabbit1", "myAppProducerSide");
setConnectionName(binderFactory, "rabbit2", "myAppConsumerSide");
return null;
}
private void setConnectionName(BinderFactory binderFactory, String binderName,
String conName) throws Exception {
binderFactory.getBinder(binderName, MessageChannel.class); // force creation
@SuppressWarnings("unchecked")
Map<String, Map.Entry<Binder<?, ?, ?>, ApplicationContext>> binders =
(Map<String, Entry<Binder<?, ?, ?>, ApplicationContext>>) new DirectFieldAccessor(binderFactory)
.getPropertyValue("binderInstanceCache");
binders.get(binderName)
.getValue()
.getBean(CachingConnectionFactory.class).setConnectionNameStrategy(queue -> conName);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String listen(String in) {
System.out.println(in);
return in.toUpperCase();
}
}
和
spring.cloud.stream.binders.rabbit1.type=rabbit
spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.host=localhost
spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.password=guest
spring.cloud.stream.bindings.output.destination=outDest
spring.cloud.stream.bindings.output.producer.required-groups=outQueue
spring.cloud.stream.bindings.output.binder=rabbit1
spring.cloud.stream.binders.rabbit2.type=rabbit
spring.cloud.stream.binders.rabbit2.environment.spring.rabbitmq.host=localhost
spring.cloud.stream.binders.rabbit2.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.rabbit2.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit2.environment.spring.rabbitmq.password=guest
spring.cloud.stream.bindings.input.destination=inDest
spring.cloud.stream.bindings.input.group=default
spring.cloud.stream.bindings.input.binder=rabbit2
和
我正在迁移一个 Spring 引导微服务,它使用服务器 A 上的 3 个 RabbitMQ 队列中的数据,将其保存到 Redis 中,最后将消息生成到服务器 B 上不同 RabbitMQ 中的交换中,这样这些消息就可以被另一个微服务消费。此流程运行良好,但我想使用 RabbitMQ 活页夹将其迁移到 Spring Cloud Stream。所有 Spring AMQP 配置都在属性文件中自定义,没有 spring 属性 用于创建连接、队列、绑定等...
我的第一个想法是在 Spring Cloud Stream 中设置两个绑定,一个连接到服务器 A(消费者),另一个连接到服务器 B(生产者),并将现有代码迁移到处理器但是我放弃了它,因为如果使用多个绑定器,似乎还无法设置连接名称,我需要添加几个绑定以从服务器 A 的队列中使用,而 bindingRoutingKey 属性 不支持值列表(我知道它可以是按照说明 here).
以编程方式完成所以我决定只重构与生产者相关的代码部分,以便在 RabbitMQ 上使用 Spring Cloud Stream,因此相同的微服务应该通过来自服务器 A 的 Spring AMQP 使用(原始代码)并且应该通过 Spring Cloud Stream 生成到服务器 B。
我发现的第一个问题是 Spring Cloud Stream 中的 NonUniqueBeanDefinitionException,因为 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory bean 被 handlerMethodFactory 和 定义了两次integrationMessageHandlerMethodFactory 个名称。
org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type 'org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory' available: expected single matching bean but found 2: handlerMethodFactory,integrationMessageHandlerMethodFactory
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveNamedBean(DefaultListableBeanFactory.java:1144)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveBean(DefaultListableBeanFactory.java:411)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:344)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:337)
at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1123)
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:317)
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:113)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:862)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:877)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:743)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:390)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:312)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1214)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1203)
似乎前一个 bean 是由 Spring AMQP 创建的,而后者是由 Spring Cloud Stream 创建的,所以我创建了自己的主 bean:
@Bean
@Primary
public MessageHandlerMethodFactory messageHandlerMethodFactory() {
return new DefaultMessageHandlerMethodFactory();
}
现在应用程序可以启动,但输出通道是由服务器 A 中的 Spring Cloud Stream 创建的,而不是服务器 B。似乎 Spring Cloud Stream 配置正在使用创建的连接通过 Spring AMQP 而不是使用它自己的配置。
Spring AMQP 的配置是这样的:
@Bean
public SimpleRabbitListenerContainerFactory priceRabbitListenerContainerFactory(
ConnectionFactory consumerConnectionFactory) {
return
getSimpleRabbitListenerContainerFactory(
consumerConnectionFactory,
rabbitProperties.getConsumer().getListeners().get(LISTENER_A));
}
@Bean
public SimpleRabbitListenerContainerFactory maxbetRabbitListenerContainerFactory(
ConnectionFactory consumerConnectionFactory) {
return
getSimpleRabbitListenerContainerFactory(
consumerConnectionFactory,
rabbitProperties.getConsumer().getListeners().get(LISTENER_B));
}
@Bean
public ConnectionFactory consumerConnectionFactory() throws Exception {
return
new CachingConnectionFactory(
getRabbitConnectionFactoryBean(
rabbitProperties.getConsumer()
).getObject()
);
}
private SimpleRabbitListenerContainerFactory getSimpleRabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
RabbitProperties.ListenerProperties listenerProperties) {
//return a SimpleRabbitListenerContainerFactory set up from external properties
}
/**
* Create the AMQ Admin.
*/
@Bean
public AmqpAdmin consumerAmqpAdmin(ConnectionFactory consumerConnectionFactory) {
return new RabbitAdmin(consumerConnectionFactory);
}
/**
* Create the map of available queues and declare them in the admin.
*/
@Bean
public Map<String, Queue> queues(AmqpAdmin consumerAmqpAdmin) {
return
rabbitProperties.getConsumer().getListeners().entrySet().stream()
.map(listenerEntry -> {
Queue queue =
QueueBuilder
.nonDurable(listenerEntry.getValue().getQueueName())
.autoDelete()
.build();
consumerAmqpAdmin.declareQueue(queue);
return new AbstractMap.SimpleEntry<>(listenerEntry.getKey(), queue);
}).collect(
Collectors.toMap(
AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue
)
);
}
/**
* Create the map of available exchanges and declare them in the admin.
*/
@Bean
public Map<String, TopicExchange> exchanges(AmqpAdmin consumerAmqpAdmin) {
return
rabbitProperties.getConsumer().getListeners().entrySet().stream()
.map(listenerEntry -> {
TopicExchange exchange =
new TopicExchange(listenerEntry.getValue().getExchangeName());
consumerAmqpAdmin.declareExchange(exchange);
return new AbstractMap.SimpleEntry<>(listenerEntry.getKey(), exchange);
}).collect(
Collectors.toMap(
AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue
)
);
}
/**
* Create the list of bindings and declare them in the admin.
*/
@Bean
public List<Binding> bindings(Map<String, Queue> queues, Map<String, TopicExchange> exchanges, AmqpAdmin consumerAmqpAdmin) {
return
rabbitProperties.getConsumer().getListeners().keySet().stream()
.map(listenerName -> {
Queue queue = queues.get(listenerName);
TopicExchange exchange = exchanges.get(listenerName);
return
rabbitProperties.getConsumer().getListeners().get(listenerName).getKeys().stream()
.map(bindingKey -> {
Binding binding = BindingBuilder.bind(queue).to(exchange).with(bindingKey);
consumerAmqpAdmin.declareBinding(binding);
return binding;
}).collect(Collectors.toList());
}).flatMap(Collection::stream)
.collect(Collectors.toList());
}
消息侦听器是:
@RabbitListener(
queues="${consumer.listeners.LISTENER_A.queue-name}",
containerFactory = "priceRabbitListenerContainerFactory"
)
public void handleMessage(Message rawMessage, org.springframework.messaging.Message<ModelPayload> message) {
// call a service to process the message payload
}
@RabbitListener(
queues="${consumer.listeners.LISTENER_B.queue-name}",
containerFactory = "maxbetRabbitListenerContainerFactory"
)
public void handleMessage(Message rawMessage, org.springframework.messaging.Message<ModelPayload> message) {
// call a service to process the message payload
}
属性:
#
# Server A config (Spring AMQP)
#
consumer.host=server-a
consumer.username=
consumer.password=
consumer.port=5671
consumer.ssl.enabled=true
consumer.ssl.algorithm=TLSv1.2
consumer.ssl.validate-server-certificate=false
consumer.connection-name=local:microservice-1
consumer.thread-factory.thread-group-name=server-a-consumer
consumer.thread-factory.thread-name-prefix=server-a-consumer-
# LISTENER_A configuration
consumer.listeners.LISTENER_A.queue-name=local.listenerA
consumer.listeners.LISTENER_A.exchange-name=exchangeA
consumer.listeners.LISTENER_A.keys[0]=*.1.*.*
consumer.listeners.LISTENER_A.keys[1]=*.3.*.*
consumer.listeners.LISTENER_A.keys[2]=*.6.*.*
consumer.listeners.LISTENER_A.keys[3]=*.8.*.*
consumer.listeners.LISTENER_A.keys[4]=*.9.*.*
consumer.listeners.LISTENER_A.initial-concurrency=5
consumer.listeners.LISTENER_A.maximum-concurrency=20
consumer.listeners.LISTENER_A.thread-name-prefix=listenerA-consumer-
# LISTENER_B configuration
consumer.listeners.LISTENER_B.queue-name=local.listenerB
consumer.listeners.LISTENER_B.exchange-name=exchangeB
consumer.listeners.LISTENER_B.keys[0]=*.1.*
consumer.listeners.LISTENER_B.keys[1]=*.3.*
consumer.listeners.LISTENER_B.keys[2]=*.6.*
consumer.listeners.LISTENER_B.initial-concurrency=5
consumer.listeners.LISTENER_B.maximum-concurrency=20
consumer.listeners.LISTENER_B.thread-name-prefix=listenerB-consumer-
#
# Server B config (Spring Cloud Stream)
#
spring.rabbitmq.host=server-b
spring.rabbitmq.port=5672
spring.rabbitmq.username=
spring.rabbitmq.password=
spring.cloud.stream.bindings.outbound.destination=microservice-out
spring.cloud.stream.bindings.outbound.group=default
spring.cloud.stream.rabbit.binder.connection-name-prefix=local:microservice
所以我的问题是:是否可以在同一个 Spring 引导应用程序代码中使用,该代码通过 Spring AMQP 从 RabbitMQ 获取数据并通过 Spring 将消息生成到不同的服务器] 云流RabbitMQ?如果是,有人可以告诉我我做错了什么吗?
Spring AMQP版本为Boot version 2.1.7 (2.1.8-RELEASE)提供的版本 Spring Cloud Stream版本为Spring Cloud train提供的版本格林威治.SR2 (2.1.3.RELEASE).
编辑
我能够通过多个配置属性而不是默认配置属性来配置活页夹。所以使用这个配置它可以工作:
#
# Server B config (Spring Cloud Stream)
#
spring.cloud.stream.binders.transport-layer.type=rabbit
spring.cloud.stream.binders.transport-layer.environment.spring.rabbitmq.host=server-b
spring.cloud.stream.binders.transport-layer.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.transport-layer.environment.spring.rabbitmq.username=
spring.cloud.stream.binders.transport-layer.environment.spring.rabbitmq.password=
spring.cloud.stream.bindings.stream-output.destination=microservice-out
spring.cloud.stream.bindings.stream-output.group=default
不幸的是,在多个活页夹配置中还无法设置连接名称:A custom ConnectionNameStrategy
is ignored if there is a custom binder configuration。
无论如何,我仍然不明白为什么使用 Spring AMQP 和 Spring Cloud Stream RabbitMQ 时上下文似乎是 "mixed"。仍然需要设置一个主要的 MessageHandlerMethodFactory bean 才能使实现工作。
编辑
我发现 NoUniqueBeanDefinitionException 是因为微服务本身正在创建 ConditionalGenericConverter 以供 Spring 使用AMQP 部分反序列化来自服务器 A 的消息。
我删除了它并添加了一些 MessageConverters。现在问题解决了,不再需要@Primary bean了。
无关,但是
consumerAmqpAdmin.declareQueue(queue);
您永远不应在 @Bean
定义内与经纪人交流;在应用程序上下文生命周期中还为时过早。它可能有效,但 YMMV;如果代理不可用,它也会阻止您的应用程序启动。
最好定义类型为 Declarables
的 bean,其中包含队列、通道、绑定的列表,当连接首次成功打开时,管理员会自动声明它们。请参阅参考手册。
没见过
MessageHandlerFactory
的问题; Spring AMQP 声明没有这样的 bean。如果您可以提供一个展示该行为的小型示例应用程序,那将会很有用。我看看能否找到解决连接名称问题的方法。
编辑
我找到了解决连接名称问题的方法;它涉及一些反思,但它有效。我建议你打开一个new feature request against the binder请求一个机制来设置使用多个绑定器时的连接名称策略。
无论如何;这是解决方法...
@SpringBootApplication
@EnableBinding(Processor.class)
public class So57725710Application {
public static void main(String[] args) {
SpringApplication.run(So57725710Application.class, args);
}
@Bean
public Object connectionNameConfigurer(BinderFactory binderFactory) throws Exception {
setConnectionName(binderFactory, "rabbit1", "myAppProducerSide");
setConnectionName(binderFactory, "rabbit2", "myAppConsumerSide");
return null;
}
private void setConnectionName(BinderFactory binderFactory, String binderName,
String conName) throws Exception {
binderFactory.getBinder(binderName, MessageChannel.class); // force creation
@SuppressWarnings("unchecked")
Map<String, Map.Entry<Binder<?, ?, ?>, ApplicationContext>> binders =
(Map<String, Entry<Binder<?, ?, ?>, ApplicationContext>>) new DirectFieldAccessor(binderFactory)
.getPropertyValue("binderInstanceCache");
binders.get(binderName)
.getValue()
.getBean(CachingConnectionFactory.class).setConnectionNameStrategy(queue -> conName);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String listen(String in) {
System.out.println(in);
return in.toUpperCase();
}
}
和
spring.cloud.stream.binders.rabbit1.type=rabbit
spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.host=localhost
spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.password=guest
spring.cloud.stream.bindings.output.destination=outDest
spring.cloud.stream.bindings.output.producer.required-groups=outQueue
spring.cloud.stream.bindings.output.binder=rabbit1
spring.cloud.stream.binders.rabbit2.type=rabbit
spring.cloud.stream.binders.rabbit2.environment.spring.rabbitmq.host=localhost
spring.cloud.stream.binders.rabbit2.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.rabbit2.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit2.environment.spring.rabbitmq.password=guest
spring.cloud.stream.bindings.input.destination=inDest
spring.cloud.stream.bindings.input.group=default
spring.cloud.stream.bindings.input.binder=rabbit2
和