使用 Spring Cloud Stream 从 RabbitMQ / MQTT 消费

Consuming from RabbitMQ / MQTT with Spring Cloud Stream

您如何使用 Spring Cloud Stream(即使用 AMQP)使用 MQTT 发送到 RabbitMQ 的消息? 使用 Rabbit 上的 MQTT,所有消息都会到达名为“amq.topic”的交换器。

在消费者方面,使用 Spring Cloud Stream,为每个目的地创建一个队列,并使用目的地名称创建一个“主题”类型的交换;创建的队列绑定到交换器。

现在,

欢迎提出任何想法!

Using Existing Queues/Exchanges

By default, the binder will automatically provision a topic exchange with the name being derived from the value of the destination binding property <prefix><destination>. The destination defaults to the binding name, if not provided. When binding a consumer, a queue will automatically be provisioned with the name <prefix><destination>.<group> (if a group binding property is specified), or an anonymous, auto-delete queue when there is no group. The queue will be bound to the exchange with the "match-all" wildcard routing key (#) for a non-partitioned binding or <destination>-<instanceIndex> for a partitioned binding. The prefix is an empty String by default. If an output binding is specified with requiredGroups, a queue/binding will be provisioned for each group.

There are a number of rabbit-specific binding properties that allow you to modify this default behavior.

If you have an existing exchange/queue that you wish to use, you can completely disable automatic provisioning as follows, assuming the exchange is named myExchange and the queue is named myQueue:

spring.cloud.stream.bindings.<binding name>.destination=myExhange

spring.cloud.stream.bindings.<binding name>.group=myQueue

spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false

spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false

spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true

If you want the binder to provision the queue/exchange, but you want to do it using something other than the defaults discussed here, use the following properties. Refer to the property documentation above for more information.

spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey

spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type>

spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'