使用 Spring Cloud Stream 从 RabbitMQ / MQTT 消费
Consuming from RabbitMQ / MQTT with Spring Cloud Stream
您如何使用 Spring Cloud Stream(即使用 AMQP)使用 MQTT 发送到 RabbitMQ 的消息?
使用 Rabbit 上的 MQTT,所有消息都会到达名为“amq.topic”的交换器。
在消费者方面,使用 Spring Cloud Stream,为每个目的地创建一个队列,并使用目的地名称创建一个“主题”类型的交换;创建的队列绑定到交换器。
现在,
我无法手动将我的队列绑定到“amq.topic”,因为它是独占的:
无法获得对虚拟主机“/”中锁定队列“...”的独占访问权。它可能最初是在另一个连接上声明的,或者独有的 属性 值与原始声明的值不匹配。
我不能直接从交换器“amq.topic”收听,因为,好吧,你必须收听一个队列...
我创建了一个绑定到传递消息的“amq.topic”的“tmp”队列,但我不能将它用作目标,因为 RMQ 将创建一个名为“tmp.SOME_CLIENT_ID",绑定到名为“tmp”的交换,与我的“tmp”队列无关。
欢迎提出任何想法!
见Using Existing Queues/Exchanges。
By default, the binder will automatically provision a topic exchange with the name being derived from the value of the destination binding property <prefix><destination>
. The destination defaults to the binding name, if not provided. When binding a consumer, a queue will automatically be provisioned with the name <prefix><destination>.<group>
(if a group binding property is specified), or an anonymous, auto-delete queue when there is no group. The queue will be bound to the exchange with the "match-all" wildcard routing key (#) for a non-partitioned binding or <destination>-<instanceIndex>
for a partitioned binding. The prefix is an empty String by default. If an output binding is specified with requiredGroups, a queue/binding will be provisioned for each group.
There are a number of rabbit-specific binding properties that allow you to modify this default behavior.
If you have an existing exchange/queue that you wish to use, you can completely disable automatic provisioning as follows, assuming the exchange is named myExchange and the queue is named myQueue:
spring.cloud.stream.bindings.<binding name>.destination=myExhange
spring.cloud.stream.bindings.<binding name>.group=myQueue
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true
If you want the binder to provision the queue/exchange, but you want to do it using something other than the defaults discussed here, use the following properties. Refer to the property documentation above for more information.
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type>
spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'
您如何使用 Spring Cloud Stream(即使用 AMQP)使用 MQTT 发送到 RabbitMQ 的消息? 使用 Rabbit 上的 MQTT,所有消息都会到达名为“amq.topic”的交换器。
在消费者方面,使用 Spring Cloud Stream,为每个目的地创建一个队列,并使用目的地名称创建一个“主题”类型的交换;创建的队列绑定到交换器。
现在,
我无法手动将我的队列绑定到“amq.topic”,因为它是独占的:
无法获得对虚拟主机“/”中锁定队列“...”的独占访问权。它可能最初是在另一个连接上声明的,或者独有的 属性 值与原始声明的值不匹配。
我不能直接从交换器“amq.topic”收听,因为,好吧,你必须收听一个队列...
我创建了一个绑定到传递消息的“amq.topic”的“tmp”队列,但我不能将它用作目标,因为 RMQ 将创建一个名为“tmp.SOME_CLIENT_ID",绑定到名为“tmp”的交换,与我的“tmp”队列无关。
欢迎提出任何想法!
见Using Existing Queues/Exchanges。
By default, the binder will automatically provision a topic exchange with the name being derived from the value of the destination binding property
<prefix><destination>
. The destination defaults to the binding name, if not provided. When binding a consumer, a queue will automatically be provisioned with the name<prefix><destination>.<group>
(if a group binding property is specified), or an anonymous, auto-delete queue when there is no group. The queue will be bound to the exchange with the "match-all" wildcard routing key (#) for a non-partitioned binding or<destination>-<instanceIndex>
for a partitioned binding. The prefix is an empty String by default. If an output binding is specified with requiredGroups, a queue/binding will be provisioned for each group.
There are a number of rabbit-specific binding properties that allow you to modify this default behavior.
If you have an existing exchange/queue that you wish to use, you can completely disable automatic provisioning as follows, assuming the exchange is named myExchange and the queue is named myQueue:
spring.cloud.stream.bindings.<binding name>.destination=myExhange
spring.cloud.stream.bindings.<binding name>.group=myQueue
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true
If you want the binder to provision the queue/exchange, but you want to do it using something other than the defaults discussed here, use the following properties. Refer to the property documentation above for more information.
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type>
spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'