ActiveMQ Artemis:从 smallrye-reactive-messaging (AMQP) 以编程方式创建队列

ActiveMQ Artemis: creating queue programmatically from smallrye-reactive-messaging (AMQP)

我的 Quarkus 微服务正在使用 smallrye 反应消息库中的 AMQP 连接器从 vromero/activemq-artemis:2.16.0-alpine Docker 图像向 ActiveMQ Artemis 代理 运行 生成消息。 reactive messaging library 文档提到了使用动态地址名称的可能性。我在我的 REST 资源中使用以下 (Kotlin) 代码:

    @Inject
    @Channel("task-finished")
    lateinit var taskFinishedEmitter: MutinyEmitter<String>

    @POST
    @Produces(MediaType.TEXT_PLAIN)
    fun doSomethingAndInform(@RestForm customerId: String): Uni<String> {

       // leaving out the actual messageText computation...

       val messageText: String = "DUMMY MESSAGE"
       val metadata: OutgoingAmqpMetadata = OutgoingAmqpMetadata.builder()
          .withDurable(true)
          .withCorrelationId(customerId)
          .withAddress("anycast://my-custom-address")
          .build()

       val message: Message<String> = Message.of(messageText,
            {
                logger.info("message acked")
                CompletableFuture.completedFuture(null)
            },
            {
                logger.info("message nacked: {}", it.message)
                CompletableFuture.completedFuture(null)
            }
       )

       taskFinishedEmitter.send(message.addMetadata(metadata))
       return Uni.createFrom().item("DONE")
    }

连接器定义在application.properties:

amqp-host=localhost
amqp-port=5672
amqp-username=adm
amqp-password=***

mp.messaging.outgoing.task-finished.connector=smallrye-amqp

ActiveMQ Artemis 确实动态创建了 my-custom-address 地址,但是它没有创建任何绑定到它的队列,消息最终被 unrouted.

broker.xml 配置文件包含在 core 部分


<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;anycastPrefix=anycast://</acceptor>
<address-setting match="#">
   <auto-create-queues>true</auto-create-queues>
</address-settings>

我尝试将队列名称与地址一起传递

.withAddress("anycast://my-custom-address::my-queue")

但没有任何区别。

以编程方式创建的队列和传递给它的消息缺少什么?另外,为什么 Artemis 在消息丢失(未路由)时确认消息?

更新:附上 Artemis 网页界面的截图

默认情况下,ActiveMQ Artemis 会将 AMQP 客户端发送的消息视为多播(即pub/sub)。多播的语义规定发布给代理的每条消息都将分派给每个订阅者。然而,由于没有订阅者,消息被简单地丢弃(即未路由)。

由于您在地址前加上 anycast:// 前缀,因此您应该使用 anycastPrefix 参数在 broker.xml 中的“amqp”acceptor 上配置该前缀,例如:

<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;anycastPrefix=anycast://</acceptor>

您还可以更改用于自动创建资源的默认路由类型,例如:

<address-setting match="my-custom-address">
   <default-address-routing-type>ANYCAST</default-address-routing-type>
   <default-queue-routing-type>ANYCAST</default-queue-routing-type>
</address-settings>