Spring 云流 rabbitmq:当 rabbitmq 服务器不可用时创建队列
Spring cloud stream rabbit : Queue creation when rabbit server unavailable
我正在使用
spring-云流:3.1.4
spring-cloud-stream-binder-rabbit: 3.1.4
我有一个使用此处属性配置的消费者。
我的问题是当消费者在 rabbitmq 服务器可用之前启动时,我可以看到消费者重新启动,直到连接可用。尽管如此,DLX 和 DLQ 之间创建的绑定并不相同。
- 如果消费者启动时 rabbitmq 可用:DLQ 绑定到 DLX,同时具有路由键 'worker.request.queue.name' 和 'worker.request.dlq.name'
- 如果消费者在重试几次后启动时 rabbitmq 不可用:DLQ 仅使用 routingKey 'worker.request.dlq.name' 绑定到 DLX。
问题是我需要两者都绑定。任何人都可以帮助我了解我做错了什么?
谢谢。
# 1. consumer queue configuration to listen for worker requests
# Exchange name
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.destination=${worker.request.exchange.name}
# Queue name
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.group=${worker.request.queue.name}
# Force creation of queue if doesn't exists.
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.consumer.requiredGroups=${worker.request.queue.name}
# Disable retry of error messages
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.consumer.maxAttempts=1
# 2. producer : to send worker responses to manager
spring.cloud.stream.bindings.listenvalidateprocesssend-out-0.destination=${worker.response.exchange.name}
spring.cloud.stream.bindings.listenvalidateprocesssend-out-0.group=${worker.response.queue.name}
# Do not create associated queue. Queue is created by the manager which subscribe to the exchange
# spring.cloud.stream.bindings.listenvalidateprocesssend-out-0.producer.requiredGroups=${worker.response.queue.name}
###############################
# Rabbit binder configuration #
###############################
# 1. consumer queue configuration to listen for worker requests
# Allows naming created queues with only group property. Default is destination.group.
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.queueNameGroupOnly=true
# enable transaction of consumed messages. NOTE : the index must not be present in the binding name !!!!!
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in.consumer.transacted=true
# Queue and Exchange for request can be created by manager.
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.maxPriority=255
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterQueueName=${worker.request.dlq.name}
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterExchange=${worker.request.dlx.name}
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterRoutingKey=${worker.request.dlq.name}
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterExchangeType=topic
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.dlqMaxPriority=255
编辑 1:
查看 sprint cloud stream rabbit bidder 代码后,我可以在以下位置看到问题:https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/blob/97c6f79fff15d985434e24c6b48c85caa962a4e6/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java#L659
每个 Queue/Exhcnage/Binding 声明初始化一次,每次尝试后重试。但是声明列表是一个带有字符串键的 bean 映射。如果在同一个队列上进行双重绑定,hte key 是相同的,因此只保存第一个声明。看这里:https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/blob/97c6f79fff15d985434e24c6b48c85caa962a4e6/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java#L629
这是一个错误;请在这里打开一个问题 https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues
我正在使用
spring-云流:3.1.4
spring-cloud-stream-binder-rabbit: 3.1.4
我有一个使用此处属性配置的消费者。 我的问题是当消费者在 rabbitmq 服务器可用之前启动时,我可以看到消费者重新启动,直到连接可用。尽管如此,DLX 和 DLQ 之间创建的绑定并不相同。
- 如果消费者启动时 rabbitmq 可用:DLQ 绑定到 DLX,同时具有路由键 'worker.request.queue.name' 和 'worker.request.dlq.name'
- 如果消费者在重试几次后启动时 rabbitmq 不可用:DLQ 仅使用 routingKey 'worker.request.dlq.name' 绑定到 DLX。
问题是我需要两者都绑定。任何人都可以帮助我了解我做错了什么?
谢谢。
# 1. consumer queue configuration to listen for worker requests
# Exchange name
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.destination=${worker.request.exchange.name}
# Queue name
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.group=${worker.request.queue.name}
# Force creation of queue if doesn't exists.
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.consumer.requiredGroups=${worker.request.queue.name}
# Disable retry of error messages
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.consumer.maxAttempts=1
# 2. producer : to send worker responses to manager
spring.cloud.stream.bindings.listenvalidateprocesssend-out-0.destination=${worker.response.exchange.name}
spring.cloud.stream.bindings.listenvalidateprocesssend-out-0.group=${worker.response.queue.name}
# Do not create associated queue. Queue is created by the manager which subscribe to the exchange
# spring.cloud.stream.bindings.listenvalidateprocesssend-out-0.producer.requiredGroups=${worker.response.queue.name}
###############################
# Rabbit binder configuration #
###############################
# 1. consumer queue configuration to listen for worker requests
# Allows naming created queues with only group property. Default is destination.group.
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.queueNameGroupOnly=true
# enable transaction of consumed messages. NOTE : the index must not be present in the binding name !!!!!
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in.consumer.transacted=true
# Queue and Exchange for request can be created by manager.
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.maxPriority=255
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterQueueName=${worker.request.dlq.name}
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterExchange=${worker.request.dlx.name}
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterRoutingKey=${worker.request.dlq.name}
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterExchangeType=topic
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.dlqMaxPriority=255
编辑 1:
查看 sprint cloud stream rabbit bidder 代码后,我可以在以下位置看到问题:https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/blob/97c6f79fff15d985434e24c6b48c85caa962a4e6/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java#L659
每个 Queue/Exhcnage/Binding 声明初始化一次,每次尝试后重试。但是声明列表是一个带有字符串键的 bean 映射。如果在同一个队列上进行双重绑定,hte key 是相同的,因此只保存第一个声明。看这里:https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/blob/97c6f79fff15d985434e24c6b48c85caa962a4e6/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java#L629
这是一个错误;请在这里打开一个问题 https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues