如何使用 RestTemplate 在 qpid 中创建队列?
How to create queues in qpid with RestTemplate?
我正在尝试为使用 RabbitMQ 的应用程序编写集成测试,为此我正在使用 Qpid 代理。我设法启动了服务器并且我的测试正在连接到它,但我需要在启动前在 Qpid 中创建队列。
因为我的队列比较多,所以动态创建bean:
applicationContext.getBeanFactory().registerSingleton(queueName, queue);
这需要在启动前创建队列。
这是 qpid 配置文件:
{
"name": "tst",
"modelVersion": "2.0",
"defaultVirtualHost" : "default",
"authenticationproviders" : [ {
"name" : "noPassword",
"type" : "Anonymous",
"secureOnlyMechanisms": []
},
{
"name" : "passwordFile",
"type" : "PlainPasswordFile",
"path" : "/src/test/resources/passwd.txt",
"secureOnlyMechanisms": [],
"preferencesproviders" : [{
"name": "fileSystemPreferences",
"type": "FileSystemPreferences",
"path" : "${qpid.work_dir}${file.separator}user.preferences.json"
}
]
}
],
"ports" : [
{
"name": "AMQP",
"port": "5673",
"authenticationProvider": "passwordFile",
"protocols": [
"AMQP_0_10",
"AMQP_0_8",
"AMQP_0_9",
"AMQP_0_9_1"
]
}],
"virtualhostnodes" : [ {
"name" : "default",
"type" : "JSON",
"virtualHostInitialConfiguration" : "{ \"type\" : \"Memory\" }"
}]
}
来自官方文档(https://qpid.apache.org/releases/qpid-broker-j-7.1.4/book/Java-Broker-Management-Channel-REST-API.html#d0e2130)
我读到可以为 REST 调用创建队列,所以我尝试使用 RestTemplate 来实现它,但它似乎没有创建队列。
@BeforeClass
public static void startup() throws Exception {
brokerStarter = new BrokerManager();
brokerStarter.startBroker();
RestTemplate restTemplate = new RestTemplate();
restTemplate.put("http://localhost:5673/api/latest/queue/default/queue1", "");
restTemplate.put("http://localhost:5673/api/latest/queue/default/queue-2", "");
}
谁能解释一下我做错了什么?谢谢!
我设法通过使用连接工厂解决了这个问题:
@Autowired
ConnectionFactory factory;
....
factory.setHost("localhost");
factory.setPort(qpid_server_port);
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
String queue = "queue-x";
channel.queueDeclare(queue, true, false, false, null);
//channel.queueBind(queue, "exchange-x" , "routing-key-x");
} catch (Exception e) {
e.printStackTrace();
}
我使用 REST API 解决了同样的问题。为了 create/delete 队列以进行集成测试,我使用以下配置文件 (qpid-config.json
):
{
"name": "EmbeddedBroker",
"modelVersion": "8.0",
"authenticationproviders": [
{
"name": "anonymous",
"type": "Anonymous"
}
],
"ports": [
{
"name": "AMQP",
"bindingAddress": "localhost",
"port": "${qpid.amqp_port}",
"protocols": [ "AMQP_1_0" ],
"authenticationProvider": "anonymous",
"virtualhostaliases" : [ {
"name" : "nameAlias",
"type" : "nameAlias"
}, {
"name" : "defaultAlias",
"type" : "defaultAlias"
}, {
"name" : "hostnameAlias",
"type" : "hostnameAlias"
} ]
},
{
"name" : "HTTP",
"port" : "${qpid.http_port}",
"protocols" : [ "HTTP" ],
"authenticationProvider" : "anonymous"
}
],
"virtualhostnodes": [
{
"name": "default",
"defaultVirtualHostNode": "true",
"type": "Memory",
"virtualHostInitialConfiguration": "{\"type\": \"Memory\" }"
}
],
"plugins" : [
{
"type" : "MANAGEMENT-HTTP",
"name" : "httpManagement"
}
]
}
相关Gradle 依赖项:
testImplementation("org.apache.qpid:qpid-broker-core:${Versions.qpidBroker}") // tested with 8.0.0
testImplementation("org.apache.qpid:qpid-broker-plugins-amqp-1-0-protocol:${Versions.qpidBroker}")
testImplementation("org.apache.qpid:qpid-broker-plugins-memory-store:${Versions.qpidBroker}")
testImplementation("org.apache.qpid:qpid-broker-plugins-management-http:${Versions.qpidBroker}")
testImplementation("org.springframework.boot:spring-boot-starter-webflux")
testImplementation("org.projectreactor:reactor-spring:${Versions.reactorSpring}")
启动代理 (Kotlin) 的代码:
private fun startQpidBroker() {
val attributes: MutableMap<String, Any> = HashMap()
val initialConfig = EmbeddedAMQPBroker::class.java.classLoader.getResource("qpid-config.json")!!
attributes["type"] = "Memory"
attributes["initialConfigurationLocation"] = initialConfig.toExternalForm()
attributes["startupLoggedToSystemOut"] = true
System.setProperty("qpid.amqp_port", "$amqpPort")
System.setProperty("qpid.http_port", "$httpPort")
// needed to avoid "AMQP precondition failed" due to durable message being sent to non-durable queues
System.setProperty("qpid.tests.mms.messagestore.persistence", "true")
broker.startup(attributes)
}
代码到 delete/create 队列:
private fun recreateQueue(queueName: String) {
val client = WebClient.create("http://localhost:${EmbeddedAMQPBroker.httpPort}");
try {
client.method(HttpMethod.DELETE)
.uri("/api/latest/queue/default/$queueName")
.retrieve()
.toBodilessEntity()
.block()
.statusCode
} catch (e: WebClientResponseException) {
if (e.statusCode != HttpStatus.NOT_FOUND) { // queue might not yet exist so 404 is acceptable
throw e
}
}
client.method(HttpMethod.PUT)
.uri("/api/latest/queue/default/default/$queueName")
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.body(BodyInserters.fromValue(mapOf("name" to queueName, "type" to "standard")))
.retrieve()
.toBodilessEntity()
.block()
.statusCode
}
我正在尝试为使用 RabbitMQ 的应用程序编写集成测试,为此我正在使用 Qpid 代理。我设法启动了服务器并且我的测试正在连接到它,但我需要在启动前在 Qpid 中创建队列。 因为我的队列比较多,所以动态创建bean:
applicationContext.getBeanFactory().registerSingleton(queueName, queue);
这需要在启动前创建队列。
这是 qpid 配置文件:
{
"name": "tst",
"modelVersion": "2.0",
"defaultVirtualHost" : "default",
"authenticationproviders" : [ {
"name" : "noPassword",
"type" : "Anonymous",
"secureOnlyMechanisms": []
},
{
"name" : "passwordFile",
"type" : "PlainPasswordFile",
"path" : "/src/test/resources/passwd.txt",
"secureOnlyMechanisms": [],
"preferencesproviders" : [{
"name": "fileSystemPreferences",
"type": "FileSystemPreferences",
"path" : "${qpid.work_dir}${file.separator}user.preferences.json"
}
]
}
],
"ports" : [
{
"name": "AMQP",
"port": "5673",
"authenticationProvider": "passwordFile",
"protocols": [
"AMQP_0_10",
"AMQP_0_8",
"AMQP_0_9",
"AMQP_0_9_1"
]
}],
"virtualhostnodes" : [ {
"name" : "default",
"type" : "JSON",
"virtualHostInitialConfiguration" : "{ \"type\" : \"Memory\" }"
}]
}
来自官方文档(https://qpid.apache.org/releases/qpid-broker-j-7.1.4/book/Java-Broker-Management-Channel-REST-API.html#d0e2130) 我读到可以为 REST 调用创建队列,所以我尝试使用 RestTemplate 来实现它,但它似乎没有创建队列。
@BeforeClass
public static void startup() throws Exception {
brokerStarter = new BrokerManager();
brokerStarter.startBroker();
RestTemplate restTemplate = new RestTemplate();
restTemplate.put("http://localhost:5673/api/latest/queue/default/queue1", "");
restTemplate.put("http://localhost:5673/api/latest/queue/default/queue-2", "");
}
谁能解释一下我做错了什么?谢谢!
我设法通过使用连接工厂解决了这个问题:
@Autowired
ConnectionFactory factory;
....
factory.setHost("localhost");
factory.setPort(qpid_server_port);
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
String queue = "queue-x";
channel.queueDeclare(queue, true, false, false, null);
//channel.queueBind(queue, "exchange-x" , "routing-key-x");
} catch (Exception e) {
e.printStackTrace();
}
我使用 REST API 解决了同样的问题。为了 create/delete 队列以进行集成测试,我使用以下配置文件 (qpid-config.json
):
{
"name": "EmbeddedBroker",
"modelVersion": "8.0",
"authenticationproviders": [
{
"name": "anonymous",
"type": "Anonymous"
}
],
"ports": [
{
"name": "AMQP",
"bindingAddress": "localhost",
"port": "${qpid.amqp_port}",
"protocols": [ "AMQP_1_0" ],
"authenticationProvider": "anonymous",
"virtualhostaliases" : [ {
"name" : "nameAlias",
"type" : "nameAlias"
}, {
"name" : "defaultAlias",
"type" : "defaultAlias"
}, {
"name" : "hostnameAlias",
"type" : "hostnameAlias"
} ]
},
{
"name" : "HTTP",
"port" : "${qpid.http_port}",
"protocols" : [ "HTTP" ],
"authenticationProvider" : "anonymous"
}
],
"virtualhostnodes": [
{
"name": "default",
"defaultVirtualHostNode": "true",
"type": "Memory",
"virtualHostInitialConfiguration": "{\"type\": \"Memory\" }"
}
],
"plugins" : [
{
"type" : "MANAGEMENT-HTTP",
"name" : "httpManagement"
}
]
}
相关Gradle 依赖项:
testImplementation("org.apache.qpid:qpid-broker-core:${Versions.qpidBroker}") // tested with 8.0.0
testImplementation("org.apache.qpid:qpid-broker-plugins-amqp-1-0-protocol:${Versions.qpidBroker}")
testImplementation("org.apache.qpid:qpid-broker-plugins-memory-store:${Versions.qpidBroker}")
testImplementation("org.apache.qpid:qpid-broker-plugins-management-http:${Versions.qpidBroker}")
testImplementation("org.springframework.boot:spring-boot-starter-webflux")
testImplementation("org.projectreactor:reactor-spring:${Versions.reactorSpring}")
启动代理 (Kotlin) 的代码:
private fun startQpidBroker() {
val attributes: MutableMap<String, Any> = HashMap()
val initialConfig = EmbeddedAMQPBroker::class.java.classLoader.getResource("qpid-config.json")!!
attributes["type"] = "Memory"
attributes["initialConfigurationLocation"] = initialConfig.toExternalForm()
attributes["startupLoggedToSystemOut"] = true
System.setProperty("qpid.amqp_port", "$amqpPort")
System.setProperty("qpid.http_port", "$httpPort")
// needed to avoid "AMQP precondition failed" due to durable message being sent to non-durable queues
System.setProperty("qpid.tests.mms.messagestore.persistence", "true")
broker.startup(attributes)
}
代码到 delete/create 队列:
private fun recreateQueue(queueName: String) {
val client = WebClient.create("http://localhost:${EmbeddedAMQPBroker.httpPort}");
try {
client.method(HttpMethod.DELETE)
.uri("/api/latest/queue/default/$queueName")
.retrieve()
.toBodilessEntity()
.block()
.statusCode
} catch (e: WebClientResponseException) {
if (e.statusCode != HttpStatus.NOT_FOUND) { // queue might not yet exist so 404 is acceptable
throw e
}
}
client.method(HttpMethod.PUT)
.uri("/api/latest/queue/default/default/$queueName")
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.body(BodyInserters.fromValue(mapOf("name" to queueName, "type" to "standard")))
.retrieve()
.toBodilessEntity()
.block()
.statusCode
}