在 Qpid broker 中创建 Exchange 和队列
Create Exchange and queues in Qpid broker
您好,我正在使用嵌入式代理 Qpid 测试 spring 集成项目。但问题是我如何在 qpid 中进行队列和交换。我以为 rabbit-config.xml 会在 qpid 代理中进行队列和交换,但无济于事。我的流程是在 qpid 代理中创建队列和交换,将消息传递给它们,并且绑定到这些队列的入站 amqp 适配器将收到消息,我可以继续测试
错误:队列:'push.customer.arkona.controller.search' 在 VirtualHost 'default' 上找不到。
qpid-config.json:
{ "name": "EmbeddedBroker", "modelVersion": "2.0", "storeVersion" : 1, "authenticationproviders" : [ {
"name" : "noPassword",
"type" : "Anonymous",
"secureOnlyMechanisms": []
},
{
"name" : "passwordFile",
"type" : "PlainPasswordFile",
"path" : "${qpid.home_dir}${file.separator}src${file.separator}main${file.separator}resources${file.separator}password.properties",
"secureOnlyMechanisms": []
} ], "ports" : [
{
"name": "AMQP",
"port": "${qpid.amqp_port}",
"authenticationProvider": "passwordFile",
"protocols": [
"AMQP_0_10",
"AMQP_0_8",
"AMQP_0_9",
"AMQP_0_9_1"
]
}],
"virtualhostnodes" : [ {
"name" : "default",
"type" : "JSON",
"defaultVirtualHostNode" : "true",
"virtualHostInitialConfiguration" : "${qpid.initial_config_virtualhost_config}",
"storeType" : "DERBY"
}
]
}
password.properties 有
guest:guest
我已经为 运行 我的测试创建了一个单独的配置文件。这是 rabbitmq 配置。除此之外,我还有一个 rabbit-context xml 文件,其中定义了所有队列和交换。
@Configuration
@Profile("qpid")
public class QpidConfig {
String amqpPort = "5672";
//String qpidHomeDir = "complete";
String configFileName = "src/main/resources/qpid-config.json";
@Bean
BrokerOptions brokerOptions() {
File tmpFolder= Files.createTempDir();
//small hack, because userDir is not same when running Application and ApplicationTest
//it leads to some issue locating the files after, so hacking it here
String userDir=System.getProperty("user.dir").toString();
File file = new File(userDir);
String homePath = file.getAbsolutePath();
BrokerOptions brokerOptions=new BrokerOptions();
brokerOptions.setConfigProperty("qpid.work_dir", tmpFolder.getAbsolutePath());
brokerOptions.setConfigProperty("qpid.amqp_port",amqpPort);
brokerOptions.setConfigProperty("qpid.home_dir", homePath);
brokerOptions.setInitialConfigurationLocation(homePath + "/"+configFileName);
return brokerOptions;
}
@SuppressWarnings("rawtypes")
@Bean
Broker broker() throws Exception {
org.apache.qpid.server.Broker broker = new org.apache.qpid.server.Broker();
broker.startup(brokerOptions());
return (Broker) broker;
}
private ConnectionFactory connectionFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("127.0.0.1");
factory.setPort(5672);
return factory;
}
@Bean(name ="rabbitConnectionFactory")
public CachingConnectionFactory rabbitConnectionFactory(){
return new CachingConnectionFactory(connectionFactory());
}
@Bean(name="rabbitTemplate")
public RabbitTemplate rabbitTemplate(){
return new RabbitTemplate(rabbitConnectionFactory());
}
@Bean(name ="arkonaHeaderMapper")
public DefaultAmqpHeaderMapper syncerHeaderMapper() {
DefaultAmqpHeaderMapper amqpHeaderMapper = DefaultAmqpHeaderMapper.inboundMapper();
amqpHeaderMapper.setRequestHeaderNames("*");
amqpHeaderMapper.setReplyHeaderNames("*");
return amqpHeaderMapper;
}
}
编辑
我的兔子-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:queue name="pull.appt.arkona.scheduler.adapter" />
<rabbit:queue name="pull.appt.arkona.adapter.processor" />
<rabbit:queue name="pull.customer.arkona.to.lookup" />
<rabbit:queue name="pull.customer.arkona.lookup.processor" />
<rabbit:queue name="pull.customer.arkona.scheduler.adapter" />
<rabbit:queue name="pull.ro.arkona.to.lookup" />
<rabbit:queue name="pull.ro.arkona.adapter.processor" />
<rabbit:queue name="pull.ro.arkona.scheduler.adapter" />
<rabbit:queue name="pull.closed.arkona.scheduler.adapter" />
<rabbit:queue name="pull.parts.arkona.scheduler.adapter" />
<rabbit:queue name="pull.closed.arkona.adapter.processor" />
<rabbit:queue name="pull.parts.arkona.adapter.processor" />
<rabbit:queue name="pull.vehicle.arkona.to.lookup" />
<rabbit:queue name="pull.vehicle.arkona.lookup.processor" />
<rabbit:direct-exchange name="dms.arkona.exchange" durable="true">
<rabbit:bindings>
<rabbit:binding queue="pull.appt.arkona.scheduler.adapter" key="pull.appt.arkona.scheduler.adapter.key"></rabbit:binding>
<rabbit:binding queue="pull.appt.arkona.adapter.processor" key="pull.appt.arkona.adapter.processor.key"></rabbit:binding>
<rabbit:binding queue="pull.customer.arkona.to.lookup" key="pull.customer.arkona.to.lookup.key"></rabbit:binding>
<rabbit:binding queue="pull.customer.arkona.lookup.processor" key="pull.customer.arkona.lookup.processor.key"></rabbit:binding>
<rabbit:binding queue="pull.customer.arkona.scheduler.adapter" key="pull.customer.arkona.scheduler.adapter.key"></rabbit:binding>
<rabbit:binding queue="pull.ro.arkona.to.lookup" key="pull.ro.arkona.to.lookup.key"></rabbit:binding>
<rabbit:binding queue="pull.ro.arkona.adapter.processor" key="pull.ro.arkona.adapter.processor.key"></rabbit:binding>
<rabbit:binding queue="pull.ro.arkona.scheduler.adapter" key="pull.ro.arkona.scheduler.adapter.key"></rabbit:binding>
<rabbit:binding queue="pull.vehicle.arkona.to.lookup" key="pull.vehicle.arkona.to.lookup.key"></rabbit:binding>
<rabbit:binding queue="pull.vehicle.arkona.lookup.processor" key="pull.vehicle.arkona.lookup.processor.key"></rabbit:binding>
<rabbit:binding queue="pull.closed.arkona.scheduler.adapter" key="pull.closed.arkona.scheduler.adapter.key"></rabbit:binding>
<rabbit:binding queue="pull.closed.arkona.adapter.processor" key="pull.closed.arkona.adapter.processor.key"></rabbit:binding>
<rabbit:binding queue="pull.parts.arkona.scheduler.adapter" key="pull.parts.arkona.scheduler.adapter.key"></rabbit:binding>
<rabbit:binding queue="pull.parts.arkona.adapter.processor" key="pull.parts.arkona.adapter.processor.key"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
您的应用程序上下文中是否有 RabbitAdmin
? (它检测 queues/exchanges/bindings 并在建立连接时声明它们)。
我刚刚用 QPID 6.1.2 测试了 Spring Integration AMQP Sample,它创建的一切正常...
<!-- Infrastructure -->
<rabbit:connection-factory id="connectionFactory" host="xx.xx.xx.xx" virtual-host="default" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="si.test.queue" />
<rabbit:direct-exchange name="si.test.exchange">
<rabbit:bindings>
<rabbit:binding queue="si.test.queue" key="si.test.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>
编辑
启动应用程序对我来说也很好用...
@SpringBootApplication
public class So50364236Application {
public static void main(String[] args) {
SpringApplication.run(So50364236Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> template.convertAndSend("so50364236", "foo");
}
@Bean
public Queue queue() {
return new Queue("so50364236");
}
@RabbitListener(queues = "so50364236")
public void listen(String in) {
System.out.println(in);
}
}
和
spring.rabbitmq.addresses=xx.x.x.x
spring.rabbitmq.virtual-host=default
和
2018-05-16 13:17:25.013 INFO 34714 --- [ main] com.example.So50364236Application : Started So50364236Application in 1.151 seconds (JVM running for 1.579)
foo
而且我在经纪人的管理页面上看到队列。
EDIT2
这是另一个启动应用程序,其中队列在 XML 文件中声明;使用嵌入的 QPID 6.1.6...
qpid-config.json
{
"name": "EmbeddedBroker",
"modelVersion": "2.0",
"storeVersion": 1,
"authenticationproviders": [
{
"name": "noPassword",
"type": "Anonymous",
"secureOnlyMechanisms": []
},
{
"name": "passwordFile",
"type": "PlainPasswordFile",
"path": "${qpid.home_dir}${file.separator}etc${file.separator}passwd",
"secureOnlyMechanisms": []
}
],
"ports": [
{
"name": "AMQP",
"port": "${qpid.amqp_port}",
"authenticationProvider": "passwordFile",
"protocols": [
"AMQP_0_10",
"AMQP_0_8",
"AMQP_0_9",
"AMQP_0_9_1"
]
}
],
"virtualhostnodes": [
{
"name": "default",
"type": "JSON",
"defaultVirtualHostNode": "true",
"virtualHostInitialConfiguration": "${qpid.initial_config_virtualhost_config}",
"storeType": "DERBY"
}
]
}
config.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:queue name="so50364236b" />
</beans>
application.properties
spring.rabbitmq.addresses=localhost:8888
启动应用程序
@SpringBootApplication
@ImportResource("config.xml")
public class So50364236Application {
public static void main(String[] args) {
new SpringApplicationBuilder(So50364236Application.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> template.convertAndSend("so50364236b", "foo");
}
@Bean
BrokerOptions brokerOptions() throws Exception {
Path tmpFolder = Files.createTempDirectory("qpidWork");
Path homeFolder = Files.createTempDirectory("qpidHome");
File etc = new File(homeFolder.toFile(), "etc");
etc.mkdir();
FileOutputStream fos = new FileOutputStream(new File(etc, "passwd"));
fos.write("guest:guest\n".getBytes());
fos.close();
BrokerOptions brokerOptions = new BrokerOptions();
brokerOptions.setConfigProperty("qpid.work_dir", tmpFolder.toAbsolutePath().toString());
brokerOptions.setConfigProperty("qpid.amqp_port", "8888");
brokerOptions.setConfigProperty("qpid.home_dir", homeFolder.toAbsolutePath().toString());
Resource config = new ClassPathResource("qpid-config.json");
brokerOptions.setInitialConfigurationLocation(config.getFile().getAbsolutePath());
return brokerOptions;
}
@Bean
Broker broker() throws Exception {
org.apache.qpid.server.Broker broker = new org.apache.qpid.server.Broker();
broker.startup(brokerOptions());
return broker;
}
@RabbitListener(queues = "so50364236b")
public void listen(String in) {
System.out.println(in);
}
}
和
[Broker] BRK-1004 : Qpid Broker Ready
received: foo
也许您正在做一些导致未声明引导的 Admin 的事情。不清楚您为什么要添加自己的连接工厂和模板;您也尝试过添加自己的 RabbitAdmin
吗?
您好,我正在使用嵌入式代理 Qpid 测试 spring 集成项目。但问题是我如何在 qpid 中进行队列和交换。我以为 rabbit-config.xml 会在 qpid 代理中进行队列和交换,但无济于事。我的流程是在 qpid 代理中创建队列和交换,将消息传递给它们,并且绑定到这些队列的入站 amqp 适配器将收到消息,我可以继续测试
错误:队列:'push.customer.arkona.controller.search' 在 VirtualHost 'default' 上找不到。
qpid-config.json:
{ "name": "EmbeddedBroker", "modelVersion": "2.0", "storeVersion" : 1, "authenticationproviders" : [ {
"name" : "noPassword",
"type" : "Anonymous",
"secureOnlyMechanisms": []
},
{
"name" : "passwordFile",
"type" : "PlainPasswordFile",
"path" : "${qpid.home_dir}${file.separator}src${file.separator}main${file.separator}resources${file.separator}password.properties",
"secureOnlyMechanisms": []
} ], "ports" : [
{
"name": "AMQP",
"port": "${qpid.amqp_port}",
"authenticationProvider": "passwordFile",
"protocols": [
"AMQP_0_10",
"AMQP_0_8",
"AMQP_0_9",
"AMQP_0_9_1"
]
}],
"virtualhostnodes" : [ {
"name" : "default",
"type" : "JSON",
"defaultVirtualHostNode" : "true",
"virtualHostInitialConfiguration" : "${qpid.initial_config_virtualhost_config}",
"storeType" : "DERBY"
}
]
}
password.properties 有
guest:guest
我已经为 运行 我的测试创建了一个单独的配置文件。这是 rabbitmq 配置。除此之外,我还有一个 rabbit-context xml 文件,其中定义了所有队列和交换。
@Configuration
@Profile("qpid")
public class QpidConfig {
String amqpPort = "5672";
//String qpidHomeDir = "complete";
String configFileName = "src/main/resources/qpid-config.json";
@Bean
BrokerOptions brokerOptions() {
File tmpFolder= Files.createTempDir();
//small hack, because userDir is not same when running Application and ApplicationTest
//it leads to some issue locating the files after, so hacking it here
String userDir=System.getProperty("user.dir").toString();
File file = new File(userDir);
String homePath = file.getAbsolutePath();
BrokerOptions brokerOptions=new BrokerOptions();
brokerOptions.setConfigProperty("qpid.work_dir", tmpFolder.getAbsolutePath());
brokerOptions.setConfigProperty("qpid.amqp_port",amqpPort);
brokerOptions.setConfigProperty("qpid.home_dir", homePath);
brokerOptions.setInitialConfigurationLocation(homePath + "/"+configFileName);
return brokerOptions;
}
@SuppressWarnings("rawtypes")
@Bean
Broker broker() throws Exception {
org.apache.qpid.server.Broker broker = new org.apache.qpid.server.Broker();
broker.startup(brokerOptions());
return (Broker) broker;
}
private ConnectionFactory connectionFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("127.0.0.1");
factory.setPort(5672);
return factory;
}
@Bean(name ="rabbitConnectionFactory")
public CachingConnectionFactory rabbitConnectionFactory(){
return new CachingConnectionFactory(connectionFactory());
}
@Bean(name="rabbitTemplate")
public RabbitTemplate rabbitTemplate(){
return new RabbitTemplate(rabbitConnectionFactory());
}
@Bean(name ="arkonaHeaderMapper")
public DefaultAmqpHeaderMapper syncerHeaderMapper() {
DefaultAmqpHeaderMapper amqpHeaderMapper = DefaultAmqpHeaderMapper.inboundMapper();
amqpHeaderMapper.setRequestHeaderNames("*");
amqpHeaderMapper.setReplyHeaderNames("*");
return amqpHeaderMapper;
}
}
编辑
我的兔子-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:queue name="pull.appt.arkona.scheduler.adapter" />
<rabbit:queue name="pull.appt.arkona.adapter.processor" />
<rabbit:queue name="pull.customer.arkona.to.lookup" />
<rabbit:queue name="pull.customer.arkona.lookup.processor" />
<rabbit:queue name="pull.customer.arkona.scheduler.adapter" />
<rabbit:queue name="pull.ro.arkona.to.lookup" />
<rabbit:queue name="pull.ro.arkona.adapter.processor" />
<rabbit:queue name="pull.ro.arkona.scheduler.adapter" />
<rabbit:queue name="pull.closed.arkona.scheduler.adapter" />
<rabbit:queue name="pull.parts.arkona.scheduler.adapter" />
<rabbit:queue name="pull.closed.arkona.adapter.processor" />
<rabbit:queue name="pull.parts.arkona.adapter.processor" />
<rabbit:queue name="pull.vehicle.arkona.to.lookup" />
<rabbit:queue name="pull.vehicle.arkona.lookup.processor" />
<rabbit:direct-exchange name="dms.arkona.exchange" durable="true">
<rabbit:bindings>
<rabbit:binding queue="pull.appt.arkona.scheduler.adapter" key="pull.appt.arkona.scheduler.adapter.key"></rabbit:binding>
<rabbit:binding queue="pull.appt.arkona.adapter.processor" key="pull.appt.arkona.adapter.processor.key"></rabbit:binding>
<rabbit:binding queue="pull.customer.arkona.to.lookup" key="pull.customer.arkona.to.lookup.key"></rabbit:binding>
<rabbit:binding queue="pull.customer.arkona.lookup.processor" key="pull.customer.arkona.lookup.processor.key"></rabbit:binding>
<rabbit:binding queue="pull.customer.arkona.scheduler.adapter" key="pull.customer.arkona.scheduler.adapter.key"></rabbit:binding>
<rabbit:binding queue="pull.ro.arkona.to.lookup" key="pull.ro.arkona.to.lookup.key"></rabbit:binding>
<rabbit:binding queue="pull.ro.arkona.adapter.processor" key="pull.ro.arkona.adapter.processor.key"></rabbit:binding>
<rabbit:binding queue="pull.ro.arkona.scheduler.adapter" key="pull.ro.arkona.scheduler.adapter.key"></rabbit:binding>
<rabbit:binding queue="pull.vehicle.arkona.to.lookup" key="pull.vehicle.arkona.to.lookup.key"></rabbit:binding>
<rabbit:binding queue="pull.vehicle.arkona.lookup.processor" key="pull.vehicle.arkona.lookup.processor.key"></rabbit:binding>
<rabbit:binding queue="pull.closed.arkona.scheduler.adapter" key="pull.closed.arkona.scheduler.adapter.key"></rabbit:binding>
<rabbit:binding queue="pull.closed.arkona.adapter.processor" key="pull.closed.arkona.adapter.processor.key"></rabbit:binding>
<rabbit:binding queue="pull.parts.arkona.scheduler.adapter" key="pull.parts.arkona.scheduler.adapter.key"></rabbit:binding>
<rabbit:binding queue="pull.parts.arkona.adapter.processor" key="pull.parts.arkona.adapter.processor.key"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
您的应用程序上下文中是否有 RabbitAdmin
? (它检测 queues/exchanges/bindings 并在建立连接时声明它们)。
我刚刚用 QPID 6.1.2 测试了 Spring Integration AMQP Sample,它创建的一切正常...
<!-- Infrastructure -->
<rabbit:connection-factory id="connectionFactory" host="xx.xx.xx.xx" virtual-host="default" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="si.test.queue" />
<rabbit:direct-exchange name="si.test.exchange">
<rabbit:bindings>
<rabbit:binding queue="si.test.queue" key="si.test.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>
编辑
启动应用程序对我来说也很好用...
@SpringBootApplication
public class So50364236Application {
public static void main(String[] args) {
SpringApplication.run(So50364236Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> template.convertAndSend("so50364236", "foo");
}
@Bean
public Queue queue() {
return new Queue("so50364236");
}
@RabbitListener(queues = "so50364236")
public void listen(String in) {
System.out.println(in);
}
}
和
spring.rabbitmq.addresses=xx.x.x.x
spring.rabbitmq.virtual-host=default
和
2018-05-16 13:17:25.013 INFO 34714 --- [ main] com.example.So50364236Application : Started So50364236Application in 1.151 seconds (JVM running for 1.579)
foo
而且我在经纪人的管理页面上看到队列。
EDIT2
这是另一个启动应用程序,其中队列在 XML 文件中声明;使用嵌入的 QPID 6.1.6...
qpid-config.json
{
"name": "EmbeddedBroker",
"modelVersion": "2.0",
"storeVersion": 1,
"authenticationproviders": [
{
"name": "noPassword",
"type": "Anonymous",
"secureOnlyMechanisms": []
},
{
"name": "passwordFile",
"type": "PlainPasswordFile",
"path": "${qpid.home_dir}${file.separator}etc${file.separator}passwd",
"secureOnlyMechanisms": []
}
],
"ports": [
{
"name": "AMQP",
"port": "${qpid.amqp_port}",
"authenticationProvider": "passwordFile",
"protocols": [
"AMQP_0_10",
"AMQP_0_8",
"AMQP_0_9",
"AMQP_0_9_1"
]
}
],
"virtualhostnodes": [
{
"name": "default",
"type": "JSON",
"defaultVirtualHostNode": "true",
"virtualHostInitialConfiguration": "${qpid.initial_config_virtualhost_config}",
"storeType": "DERBY"
}
]
}
config.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:queue name="so50364236b" />
</beans>
application.properties
spring.rabbitmq.addresses=localhost:8888
启动应用程序
@SpringBootApplication
@ImportResource("config.xml")
public class So50364236Application {
public static void main(String[] args) {
new SpringApplicationBuilder(So50364236Application.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> template.convertAndSend("so50364236b", "foo");
}
@Bean
BrokerOptions brokerOptions() throws Exception {
Path tmpFolder = Files.createTempDirectory("qpidWork");
Path homeFolder = Files.createTempDirectory("qpidHome");
File etc = new File(homeFolder.toFile(), "etc");
etc.mkdir();
FileOutputStream fos = new FileOutputStream(new File(etc, "passwd"));
fos.write("guest:guest\n".getBytes());
fos.close();
BrokerOptions brokerOptions = new BrokerOptions();
brokerOptions.setConfigProperty("qpid.work_dir", tmpFolder.toAbsolutePath().toString());
brokerOptions.setConfigProperty("qpid.amqp_port", "8888");
brokerOptions.setConfigProperty("qpid.home_dir", homeFolder.toAbsolutePath().toString());
Resource config = new ClassPathResource("qpid-config.json");
brokerOptions.setInitialConfigurationLocation(config.getFile().getAbsolutePath());
return brokerOptions;
}
@Bean
Broker broker() throws Exception {
org.apache.qpid.server.Broker broker = new org.apache.qpid.server.Broker();
broker.startup(brokerOptions());
return broker;
}
@RabbitListener(queues = "so50364236b")
public void listen(String in) {
System.out.println(in);
}
}
和
[Broker] BRK-1004 : Qpid Broker Ready
received: foo
也许您正在做一些导致未声明引导的 Admin 的事情。不清楚您为什么要添加自己的连接工厂和模板;您也尝试过添加自己的 RabbitAdmin
吗?