如何使用 RestTemplate 在 qpid 中创建队列?

How to create queues in qpid with RestTemplate?

我正在尝试为使用 RabbitMQ 的应用程序编写集成测试,为此我正在使用 Qpid 代理。我设法启动了服务器并且我的测试正在连接到它,但我需要在启动前在 Qpid 中创建队列。 因为我的队列比较多,所以动态创建bean:

applicationContext.getBeanFactory().registerSingleton(queueName, queue);

这需要在启动前创建队列。

这是 qpid 配置文件:

{
  "name": "tst",
  "modelVersion": "2.0",
  "defaultVirtualHost" : "default",
  "authenticationproviders" : [ {
    "name" : "noPassword",
    "type" : "Anonymous",
    "secureOnlyMechanisms": []
        },
    {
      "name" : "passwordFile",
      "type" : "PlainPasswordFile",
      "path" : "/src/test/resources/passwd.txt",
      "secureOnlyMechanisms": [],
      "preferencesproviders" : [{
        "name": "fileSystemPreferences",
        "type": "FileSystemPreferences",
        "path" : "${qpid.work_dir}${file.separator}user.preferences.json"
        }
      ]
    }
   ],
  "ports" : [
    {
      "name": "AMQP",
      "port": "5673",
      "authenticationProvider": "passwordFile",
      "protocols": [
        "AMQP_0_10",
        "AMQP_0_8",
        "AMQP_0_9",
        "AMQP_0_9_1"
      ]
    }],
  "virtualhostnodes" : [ {
    "name" : "default",
    "type" : "JSON",
    "virtualHostInitialConfiguration" : "{ \"type\" : \"Memory\" }"
  }]

}

来自官方文档(https://qpid.apache.org/releases/qpid-broker-j-7.1.4/book/Java-Broker-Management-Channel-REST-API.html#d0e2130) 我读到可以为 REST 调用创建队列,所以我尝试使用 RestTemplate 来实现它,但它似乎没有创建队列。

    @BeforeClass
    public static void startup() throws Exception {
        brokerStarter = new BrokerManager();
        brokerStarter.startBroker();

        RestTemplate restTemplate = new RestTemplate();
        restTemplate.put("http://localhost:5673/api/latest/queue/default/queue1", "");
        restTemplate.put("http://localhost:5673/api/latest/queue/default/queue-2", "");
    }

谁能解释一下我做错了什么?谢谢!

我设法通过使用连接工厂解决了这个问题:

            @Autowired
            ConnectionFactory factory;

            ....
            factory.setHost("localhost");
            factory.setPort(qpid_server_port);
            try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
                String queue = "queue-x";
                channel.queueDeclare(queue, true, false, false, null);
                //channel.queueBind(queue, "exchange-x" , "routing-key-x");

            } catch (Exception e) {
                e.printStackTrace();
            }

我使用 REST API 解决了同样的问题。为了 create/delete 队列以进行集成测试,我使用以下配置文件 (qpid-config.json):

{
  "name": "EmbeddedBroker",
  "modelVersion": "8.0",
  "authenticationproviders": [
    {
      "name": "anonymous",
      "type": "Anonymous"
    }
  ],
  "ports": [
    {
      "name": "AMQP",
      "bindingAddress": "localhost",
      "port": "${qpid.amqp_port}",
      "protocols": [ "AMQP_1_0" ],
      "authenticationProvider": "anonymous",
      "virtualhostaliases" : [ {
        "name" : "nameAlias",
        "type" : "nameAlias"
      }, {
        "name" : "defaultAlias",
        "type" : "defaultAlias"
      }, {
        "name" : "hostnameAlias",
        "type" : "hostnameAlias"
      } ]
    },
    {
      "name" : "HTTP",
      "port" : "${qpid.http_port}",
      "protocols" : [ "HTTP" ],
      "authenticationProvider" : "anonymous"
    }
  ],
  "virtualhostnodes": [
    {
      "name": "default",
      "defaultVirtualHostNode": "true",
      "type": "Memory",
      "virtualHostInitialConfiguration": "{\"type\": \"Memory\" }"
    }
  ],
  "plugins" : [
    {
      "type" : "MANAGEMENT-HTTP",
      "name" : "httpManagement"
    }
  ]
}

相关Gradle 依赖项:

    testImplementation("org.apache.qpid:qpid-broker-core:${Versions.qpidBroker}") // tested with 8.0.0
    testImplementation("org.apache.qpid:qpid-broker-plugins-amqp-1-0-protocol:${Versions.qpidBroker}")
    testImplementation("org.apache.qpid:qpid-broker-plugins-memory-store:${Versions.qpidBroker}")
    testImplementation("org.apache.qpid:qpid-broker-plugins-management-http:${Versions.qpidBroker}")

    testImplementation("org.springframework.boot:spring-boot-starter-webflux")
    testImplementation("org.projectreactor:reactor-spring:${Versions.reactorSpring}")

启动代理 (Kotlin) 的代码:

    private fun startQpidBroker() {
        val attributes: MutableMap<String, Any> = HashMap()
        val initialConfig = EmbeddedAMQPBroker::class.java.classLoader.getResource("qpid-config.json")!!
        attributes["type"] = "Memory"
        attributes["initialConfigurationLocation"] = initialConfig.toExternalForm()
        attributes["startupLoggedToSystemOut"] = true
        System.setProperty("qpid.amqp_port", "$amqpPort")
        System.setProperty("qpid.http_port", "$httpPort")
        // needed to avoid "AMQP precondition failed" due to durable message being sent to non-durable queues
        System.setProperty("qpid.tests.mms.messagestore.persistence", "true")
        broker.startup(attributes)
    }

代码到 delete/create 队列:

    private fun recreateQueue(queueName: String) {
        val client = WebClient.create("http://localhost:${EmbeddedAMQPBroker.httpPort}");
        try {
            client.method(HttpMethod.DELETE)
                    .uri("/api/latest/queue/default/$queueName")
                    .retrieve()
                    .toBodilessEntity()
                    .block()
                    .statusCode
        } catch (e: WebClientResponseException) {
            if (e.statusCode != HttpStatus.NOT_FOUND) { // queue might not yet exist so 404 is acceptable
                throw e
            }
        }

        client.method(HttpMethod.PUT)
                .uri("/api/latest/queue/default/default/$queueName")
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .body(BodyInserters.fromValue(mapOf("name" to queueName, "type" to "standard")))
                .retrieve()
                .toBodilessEntity()
                .block()
                .statusCode
    }