Spring Amqp:将 SimpleRoutingConnectionFactory 与 @RabbitListener 混合
Spring Amqp: Mix SimpleRoutingConnectionFactory with @RabbitListener
我有一个应用程序会监听多个队列,这些队列在不同的虚拟主机上声明。我使用了一个 SimpleRoutingConnectionFactory 来存储一个 connectionFactoryMap,我希望用@RabbitListener 设置我的监听器。
根据 Spring AMQP 文档:
Also starting with version 1.4, you can configure a routing connection
factory in a SimpleMessageListenerContainer. In that case, the list of
queue names is used as the lookup key. For example, if you configure
the container with setQueueNames("foo, bar"), the lookup key will be
"[foo,bar]" (no spaces).
我用了@RabbitListener(queues = "some-key")
。不幸的是,spring 抱怨 "lookup key [null]"。见下文。
18:52:44.528 WARN --- [cTaskExecutor-1]
o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception,
processing can restart if the connection factory supports it
java.lang.IllegalStateException: Cannot determine target
ConnectionFactory for lookup key [null] at
org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:119)
at
org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:97)
at
org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:90)
at
org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:140)
at
org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:76)
at
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:472)
at
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1306)
at java.lang.Thread.run(Thread.java:745)
我是不是做错了什么?如果队列属性用作查找键(用于连接工厂查找),我应该使用什么来指定我想收听哪个队列?
最终,我希望做programmatic/dynamic监听器设置。如果我使用 "Programmatic Endpoint Registration",我是否应该删除 "Annotation-driven listener endpoints"?我喜欢 "Annotation-driven listener endpoints",因为一个侦听器可以有多个消息句柄,将不同的传入数据类型作为参数,这非常干净整洁。如果我使用编程端点注册,我将不得不解析消息输入变量,并根据消息 type/content.
调用我的特定自定义消息处理程序
编辑:
嗨,加里,
我稍微修改了您的代码 #2,以便它使用 Jackson2JsonMessageConverter 序列化 class 对象(在 RabbitTemplate bean 中),并使用它将它们反序列化回对象(在 inboundAdapter 中)。我还删除了@RabbitListener,因为在我的例子中,所有的监听器都会在运行时添加。现在 fooBean 可以毫无问题地接收整数、字符串和 TestData 消息!唯一留下的问题是程序不断报警告:
"[erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer:消费者引发异常,如果连接工厂支持,处理可以重新启动
java.lang.IllegalStateException:无法确定查找键 [null] 的目标 ConnectionFactory。有关完整的堆栈跟踪,请参阅底部。
我错过了什么吗?
@SpringBootApplication
public class App2 implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(App2.class, args);
}
@Autowired
private IntegrationFlowContext flowContext;
@Autowired
private ConnectionFactory routingCf;
@Autowired
private RabbitTemplate template;
@Override
public void run(String... args) throws Exception {
// dynamically add a listener for queue qux
IntegrationFlow flow = IntegrationFlows.from(Amqp.inboundAdapter(this.routingCf, "qux").messageConverter(new Jackson2JsonMessageConverter()))
.handle(fooBean())
.get();
this.flowContext.registration(flow).register();
// now test it
SimpleResourceHolder.bind(this.routingCf, "[qux]");
this.template.convertAndSend("qux", 42);
this.template.convertAndSend("qux", "fizbuz");
this.template.convertAndSend("qux", new TestData(1, "test"));
SimpleResourceHolder.unbind(this.routingCf);
}
@Bean
RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(routingCf);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
@Primary
public ConnectionFactory routingCf() {
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
Map<Object, ConnectionFactory> map = new HashMap<>();
map.put("[foo,bar]", routedCf());
map.put("[baz]", routedCf());
map.put("[qux]", routedCf());
rcf.setTargetConnectionFactories(map);
return rcf;
}
@Bean
public ConnectionFactory routedCf() {
return new CachingConnectionFactory("127.0.0.1");
}
@Bean
public Foo fooBean() {
return new Foo();
}
public static class Foo {
@ServiceActivator
public void handleInteger(Integer in) {
System.out.println("int: " + in);
}
@ServiceActivator
public void handleString(String in) {
System.out.println("str: " + in);
}
@ServiceActivator
public void handleData(TestData data) {
System.out.println("TestData: " + data);
}
}
}
完整堆栈跟踪:
2017-03-15 21:43:06.413 INFO 1003 --- [ main] hello.App2 : Started App2 in 3.003 seconds (JVM running for 3.69)
2017-03-15 21:43:11.415 WARN 1003 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it
java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:119) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:97) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1430) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1387) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:500) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.onCreate(RabbitAdmin.java:419) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:33) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:571) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:90) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:140) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:76) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:505) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1382) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_112]
请展示您的配置 - 它对我来说工作正常...
@SpringBootApplication
public class So42784471Application {
public static void main(String[] args) {
SpringApplication.run(So42784471Application.class, args);
}
@Bean
@Primary
public ConnectionFactory routing() {
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
Map<Object, ConnectionFactory> map = new HashMap<>();
map.put("[foo,bar]", routedCf());
map.put("[baz]", routedCf());
rcf.setTargetConnectionFactories(map);
return rcf;
}
@Bean
public ConnectionFactory routedCf() {
return new CachingConnectionFactory("10.0.0.3");
}
@RabbitListener(queues = { "foo" , "bar" })
public void foobar(String in) {
System.out.println(in);
}
@RabbitListener(queues = "baz")
public void bazzer(String in) {
System.out.println(in);
}
}
关于你的第二个问题,你可以手动构建端点,但它非常复杂。在 Spring 集成 @ServiceActivator
.
中使用类似功能可能更容易
我会尽快用详细信息更新此答案。
编辑
这是使用 Spring 集成技术在运行时动态添加多方法侦听器的更新...
@SpringBootApplication
public class So42784471Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So42784471Application.class, args);
}
@Autowired
private IntegrationFlowContext flowContext;
@Autowired
private ConnectionFactory routingCf;
@Autowired
private RabbitTemplate template;
@Override
public void run(String... args) throws Exception {
// dynamically add a listener for queue qux
IntegrationFlow flow = IntegrationFlows.from(Amqp.inboundAdapter(this.routingCf, "qux"))
.handle(fooBean())
.get();
this.flowContext.registration(flow).register();
// now test it
SimpleResourceHolder.bind(this.routingCf, "[qux]");
this.template.convertAndSend("qux", 42);
this.template.convertAndSend("qux", "fizbuz");
SimpleResourceHolder.unbind(this.routingCf);
}
@Bean
@Primary
public ConnectionFactory routingCf() {
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
Map<Object, ConnectionFactory> map = new HashMap<>();
map.put("[foo,bar]", routedCf());
map.put("[baz]", routedCf());
map.put("[qux]", routedCf());
rcf.setTargetConnectionFactories(map);
return rcf;
}
@Bean
public ConnectionFactory routedCf() {
return new CachingConnectionFactory("10.0.0.3");
}
@RabbitListener(queues = { "foo" , "bar" })
public void foobar(String in) {
System.out.println(in);
}
@RabbitListener(queues = "baz")
public void bazzer(String in) {
System.out.println(in);
}
@Bean
public Foo fooBean() {
return new Foo();
}
public static class Foo {
@ServiceActivator
public void handleInteger(Integer in) {
System.out.println("int: " + in);
}
@ServiceActivator
public void handleString(String in) {
System.out.println("str: " + in);
}
}
}
我有一个应用程序会监听多个队列,这些队列在不同的虚拟主机上声明。我使用了一个 SimpleRoutingConnectionFactory 来存储一个 connectionFactoryMap,我希望用@RabbitListener 设置我的监听器。
根据 Spring AMQP 文档:
Also starting with version 1.4, you can configure a routing connection factory in a SimpleMessageListenerContainer. In that case, the list of queue names is used as the lookup key. For example, if you configure the container with setQueueNames("foo, bar"), the lookup key will be "[foo,bar]" (no spaces).
我用了@RabbitListener(queues = "some-key")
。不幸的是,spring 抱怨 "lookup key [null]"。见下文。
18:52:44.528 WARN --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null] at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:119) at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:97) at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:90) at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:140) at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:76) at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:472) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1306) at java.lang.Thread.run(Thread.java:745)
我是不是做错了什么?如果队列属性用作查找键(用于连接工厂查找),我应该使用什么来指定我想收听哪个队列?
最终,我希望做programmatic/dynamic监听器设置。如果我使用 "Programmatic Endpoint Registration",我是否应该删除 "Annotation-driven listener endpoints"?我喜欢 "Annotation-driven listener endpoints",因为一个侦听器可以有多个消息句柄,将不同的传入数据类型作为参数,这非常干净整洁。如果我使用编程端点注册,我将不得不解析消息输入变量,并根据消息 type/content.
调用我的特定自定义消息处理程序
编辑: 嗨,加里, 我稍微修改了您的代码 #2,以便它使用 Jackson2JsonMessageConverter 序列化 class 对象(在 RabbitTemplate bean 中),并使用它将它们反序列化回对象(在 inboundAdapter 中)。我还删除了@RabbitListener,因为在我的例子中,所有的监听器都会在运行时添加。现在 fooBean 可以毫无问题地接收整数、字符串和 TestData 消息!唯一留下的问题是程序不断报警告:
"[erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer:消费者引发异常,如果连接工厂支持,处理可以重新启动
java.lang.IllegalStateException:无法确定查找键 [null] 的目标 ConnectionFactory。有关完整的堆栈跟踪,请参阅底部。
我错过了什么吗?
@SpringBootApplication
public class App2 implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(App2.class, args);
}
@Autowired
private IntegrationFlowContext flowContext;
@Autowired
private ConnectionFactory routingCf;
@Autowired
private RabbitTemplate template;
@Override
public void run(String... args) throws Exception {
// dynamically add a listener for queue qux
IntegrationFlow flow = IntegrationFlows.from(Amqp.inboundAdapter(this.routingCf, "qux").messageConverter(new Jackson2JsonMessageConverter()))
.handle(fooBean())
.get();
this.flowContext.registration(flow).register();
// now test it
SimpleResourceHolder.bind(this.routingCf, "[qux]");
this.template.convertAndSend("qux", 42);
this.template.convertAndSend("qux", "fizbuz");
this.template.convertAndSend("qux", new TestData(1, "test"));
SimpleResourceHolder.unbind(this.routingCf);
}
@Bean
RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(routingCf);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
@Primary
public ConnectionFactory routingCf() {
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
Map<Object, ConnectionFactory> map = new HashMap<>();
map.put("[foo,bar]", routedCf());
map.put("[baz]", routedCf());
map.put("[qux]", routedCf());
rcf.setTargetConnectionFactories(map);
return rcf;
}
@Bean
public ConnectionFactory routedCf() {
return new CachingConnectionFactory("127.0.0.1");
}
@Bean
public Foo fooBean() {
return new Foo();
}
public static class Foo {
@ServiceActivator
public void handleInteger(Integer in) {
System.out.println("int: " + in);
}
@ServiceActivator
public void handleString(String in) {
System.out.println("str: " + in);
}
@ServiceActivator
public void handleData(TestData data) {
System.out.println("TestData: " + data);
}
}
}
完整堆栈跟踪:
2017-03-15 21:43:06.413 INFO 1003 --- [ main] hello.App2 : Started App2 in 3.003 seconds (JVM running for 3.69)
2017-03-15 21:43:11.415 WARN 1003 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it
java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:119) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:97) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1430) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1387) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:500) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.onCreate(RabbitAdmin.java:419) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:33) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:571) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:90) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:140) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:76) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:505) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1382) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_112]
请展示您的配置 - 它对我来说工作正常...
@SpringBootApplication
public class So42784471Application {
public static void main(String[] args) {
SpringApplication.run(So42784471Application.class, args);
}
@Bean
@Primary
public ConnectionFactory routing() {
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
Map<Object, ConnectionFactory> map = new HashMap<>();
map.put("[foo,bar]", routedCf());
map.put("[baz]", routedCf());
rcf.setTargetConnectionFactories(map);
return rcf;
}
@Bean
public ConnectionFactory routedCf() {
return new CachingConnectionFactory("10.0.0.3");
}
@RabbitListener(queues = { "foo" , "bar" })
public void foobar(String in) {
System.out.println(in);
}
@RabbitListener(queues = "baz")
public void bazzer(String in) {
System.out.println(in);
}
}
关于你的第二个问题,你可以手动构建端点,但它非常复杂。在 Spring 集成 @ServiceActivator
.
我会尽快用详细信息更新此答案。
编辑
这是使用 Spring 集成技术在运行时动态添加多方法侦听器的更新...
@SpringBootApplication
public class So42784471Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So42784471Application.class, args);
}
@Autowired
private IntegrationFlowContext flowContext;
@Autowired
private ConnectionFactory routingCf;
@Autowired
private RabbitTemplate template;
@Override
public void run(String... args) throws Exception {
// dynamically add a listener for queue qux
IntegrationFlow flow = IntegrationFlows.from(Amqp.inboundAdapter(this.routingCf, "qux"))
.handle(fooBean())
.get();
this.flowContext.registration(flow).register();
// now test it
SimpleResourceHolder.bind(this.routingCf, "[qux]");
this.template.convertAndSend("qux", 42);
this.template.convertAndSend("qux", "fizbuz");
SimpleResourceHolder.unbind(this.routingCf);
}
@Bean
@Primary
public ConnectionFactory routingCf() {
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
Map<Object, ConnectionFactory> map = new HashMap<>();
map.put("[foo,bar]", routedCf());
map.put("[baz]", routedCf());
map.put("[qux]", routedCf());
rcf.setTargetConnectionFactories(map);
return rcf;
}
@Bean
public ConnectionFactory routedCf() {
return new CachingConnectionFactory("10.0.0.3");
}
@RabbitListener(queues = { "foo" , "bar" })
public void foobar(String in) {
System.out.println(in);
}
@RabbitListener(queues = "baz")
public void bazzer(String in) {
System.out.println(in);
}
@Bean
public Foo fooBean() {
return new Foo();
}
public static class Foo {
@ServiceActivator
public void handleInteger(Integer in) {
System.out.println("int: " + in);
}
@ServiceActivator
public void handleString(String in) {
System.out.println("str: " + in);
}
}
}