ActiveMQ Artemis:从 smallrye-reactive-messaging (AMQP) 以编程方式创建队列
ActiveMQ Artemis: creating queue programmatically from smallrye-reactive-messaging (AMQP)
我的 Quarkus 微服务正在使用 smallrye 反应消息库中的 AMQP 连接器从 vromero/activemq-artemis:2.16.0-alpine
Docker 图像向 ActiveMQ Artemis 代理 运行 生成消息。 reactive messaging library 文档提到了使用动态地址名称的可能性。我在我的 REST 资源中使用以下 (Kotlin) 代码:
@Inject
@Channel("task-finished")
lateinit var taskFinishedEmitter: MutinyEmitter<String>
@POST
@Produces(MediaType.TEXT_PLAIN)
fun doSomethingAndInform(@RestForm customerId: String): Uni<String> {
// leaving out the actual messageText computation...
val messageText: String = "DUMMY MESSAGE"
val metadata: OutgoingAmqpMetadata = OutgoingAmqpMetadata.builder()
.withDurable(true)
.withCorrelationId(customerId)
.withAddress("anycast://my-custom-address")
.build()
val message: Message<String> = Message.of(messageText,
{
logger.info("message acked")
CompletableFuture.completedFuture(null)
},
{
logger.info("message nacked: {}", it.message)
CompletableFuture.completedFuture(null)
}
)
taskFinishedEmitter.send(message.addMetadata(metadata))
return Uni.createFrom().item("DONE")
}
连接器定义在application.properties
:
amqp-host=localhost
amqp-port=5672
amqp-username=adm
amqp-password=***
mp.messaging.outgoing.task-finished.connector=smallrye-amqp
ActiveMQ Artemis 确实动态创建了 my-custom-address
地址,但是它没有创建任何绑定到它的队列,消息最终被 unrouted.
broker.xml
配置文件包含在 core
部分
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;anycastPrefix=anycast://</acceptor>
<address-setting match="#">
<auto-create-queues>true</auto-create-queues>
</address-settings>
我尝试将队列名称与地址一起传递
.withAddress("anycast://my-custom-address::my-queue")
但没有任何区别。
以编程方式创建的队列和传递给它的消息缺少什么?另外,为什么 Artemis 在消息丢失(未路由)时确认消息?
更新:附上 Artemis 网页界面的截图
默认情况下,ActiveMQ Artemis 会将 AMQP 客户端发送的消息视为多播(即pub/sub)。多播的语义规定发布给代理的每条消息都将分派给每个订阅者。然而,由于没有订阅者,消息被简单地丢弃(即未路由)。
由于您在地址前加上 anycast://
前缀,因此您应该使用 anycastPrefix
参数在 broker.xml
中的“amqp”acceptor
上配置该前缀,例如:
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;anycastPrefix=anycast://</acceptor>
您还可以更改用于自动创建资源的默认路由类型,例如:
<address-setting match="my-custom-address">
<default-address-routing-type>ANYCAST</default-address-routing-type>
<default-queue-routing-type>ANYCAST</default-queue-routing-type>
</address-settings>
我的 Quarkus 微服务正在使用 smallrye 反应消息库中的 AMQP 连接器从 vromero/activemq-artemis:2.16.0-alpine
Docker 图像向 ActiveMQ Artemis 代理 运行 生成消息。 reactive messaging library 文档提到了使用动态地址名称的可能性。我在我的 REST 资源中使用以下 (Kotlin) 代码:
@Inject
@Channel("task-finished")
lateinit var taskFinishedEmitter: MutinyEmitter<String>
@POST
@Produces(MediaType.TEXT_PLAIN)
fun doSomethingAndInform(@RestForm customerId: String): Uni<String> {
// leaving out the actual messageText computation...
val messageText: String = "DUMMY MESSAGE"
val metadata: OutgoingAmqpMetadata = OutgoingAmqpMetadata.builder()
.withDurable(true)
.withCorrelationId(customerId)
.withAddress("anycast://my-custom-address")
.build()
val message: Message<String> = Message.of(messageText,
{
logger.info("message acked")
CompletableFuture.completedFuture(null)
},
{
logger.info("message nacked: {}", it.message)
CompletableFuture.completedFuture(null)
}
)
taskFinishedEmitter.send(message.addMetadata(metadata))
return Uni.createFrom().item("DONE")
}
连接器定义在application.properties
:
amqp-host=localhost
amqp-port=5672
amqp-username=adm
amqp-password=***
mp.messaging.outgoing.task-finished.connector=smallrye-amqp
ActiveMQ Artemis 确实动态创建了 my-custom-address
地址,但是它没有创建任何绑定到它的队列,消息最终被 unrouted.
broker.xml
配置文件包含在 core
部分
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;anycastPrefix=anycast://</acceptor>
<address-setting match="#">
<auto-create-queues>true</auto-create-queues>
</address-settings>
我尝试将队列名称与地址一起传递
.withAddress("anycast://my-custom-address::my-queue")
但没有任何区别。
以编程方式创建的队列和传递给它的消息缺少什么?另外,为什么 Artemis 在消息丢失(未路由)时确认消息?
更新:附上 Artemis 网页界面的截图
默认情况下,ActiveMQ Artemis 会将 AMQP 客户端发送的消息视为多播(即pub/sub)。多播的语义规定发布给代理的每条消息都将分派给每个订阅者。然而,由于没有订阅者,消息被简单地丢弃(即未路由)。
由于您在地址前加上 anycast://
前缀,因此您应该使用 anycastPrefix
参数在 broker.xml
中的“amqp”acceptor
上配置该前缀,例如:
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;anycastPrefix=anycast://</acceptor>
您还可以更改用于自动创建资源的默认路由类型,例如:
<address-setting match="my-custom-address">
<default-address-routing-type>ANYCAST</default-address-routing-type>
<default-queue-routing-type>ANYCAST</default-queue-routing-type>
</address-settings>