使用 Spring Cloud Stream 将 RabbitMQ 消费者绑定到现有队列
Bind RabbitMQ consumer using Spring Cloud Stream to an existing queue
我使用 RabbitMQ web-UI 创建了一个主题交换 TX 并绑定到交换两个队列 TX.Q1 和 TX.Q2,每个都相应地绑定了路由键 rk1 和 rk2,并且向交易所发送了一些消息。
现在我想使用 Spring Cloud Stream 创建一个消费者,它将仅接收来自 Q1 的消息。
我尝试使用配置:
spring.cloud.stream.bindings.input.destination=TX
spring.cloud.stream.bindings.input.group=Q1
和使用消息的方法的注解@StreamListner(Sink.INPUT)
。
结果我可以看到消费者已经创建了一个同名的队列(或绑定)TX.Q1,但是新 [=36= 的 Routing-Key ]是#.
我如何通过 Spring Cloud Stream 配置一个消费者,该消费者将使用来自预定义队列的消息(仅使用 rk1 路由)。
Spring Cloud Stream 在内部将消费者端点的路由器密钥设置为目的地名称(exchange
名称)本身或基于 partition
[=18] 的路由=] 在静态分区的情况下。
我认为 this github 问题可能与您的情况有关。
所以现在,Garry Russell 建议的解决方法已经为我解决了这个问题。
我这样使用 @RabbitListener
而不是 @StreamListenet
:
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "TX.Q1", durable = "true"), exchange = @Exchange(value = "TX", type = "topic", durable = "true"), key = "rk1")
.
因此,预定义队列 TX.Q1 与绑定密钥绑定:rk1 到交换 TX.
等待 Spring Cloud Steream issue 上的更新。
我想我找到了使用 @StreamListener
的解决方案,而不是使用解决方法。一切都在配置中完成,而不是在代码中。
我使用的配置如下(在 .yml 中,但您可以轻松地将其转换为 .properties):
spring:
cloud:
stream:
bindings:
input:
binder: <binder_name>
destination: TX
group: Q1
binders:
<binder_name>:
type: rabbit
environment:
spring:
rabbitmq:
host: <host>
port: <port>
virtual-host: <vhost>
username: <username>
password: <password>
rabbit:
bindings:
input:
consumer:
binding-routing-key: rk1
exchange-name: TX
queue-name-group-only: true
bind-queue: true
exchange-durable: true
exchange-type: topic
使用这种方法,您不必编写特定代码来让 RabbitMQ 消费者连接到您的集群,这应该可以解决您的问题。
希望对您有所帮助。
鼓励在 consumer 下使用此 属性 以使 rabbit 能够从现有队列中消费。请注意,队列名称将仅从组 属性 中选取,而不是从目标中选取。
queueNameGroupOnly: true
示例:
cloud:
stream:
# rabbit setting: https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
rabbit:
bindings:
input:
consumer:
acknowledgeMode: AUTO
bindingRoutingKey: DECISION_PERSISTENCE_KEY
declareExchange: false
bindQueue: false
queueNameGroupOnly: true
consumerTagPrefix: dpa-rabbit-consumer
bindings:
input:
binder: rabbit
group: DECISION_PERSISTENCE_QUEUE
content-type: application/json
我使用 RabbitMQ web-UI 创建了一个主题交换 TX 并绑定到交换两个队列 TX.Q1 和 TX.Q2,每个都相应地绑定了路由键 rk1 和 rk2,并且向交易所发送了一些消息。
现在我想使用 Spring Cloud Stream 创建一个消费者,它将仅接收来自 Q1 的消息。 我尝试使用配置:
spring.cloud.stream.bindings.input.destination=TX
spring.cloud.stream.bindings.input.group=Q1
和使用消息的方法的注解@StreamListner(Sink.INPUT)
。
结果我可以看到消费者已经创建了一个同名的队列(或绑定)TX.Q1,但是新 [=36= 的 Routing-Key ]是#.
我如何通过 Spring Cloud Stream 配置一个消费者,该消费者将使用来自预定义队列的消息(仅使用 rk1 路由)。
Spring Cloud Stream 在内部将消费者端点的路由器密钥设置为目的地名称(exchange
名称)本身或基于 partition
[=18] 的路由=] 在静态分区的情况下。
我认为 this github 问题可能与您的情况有关。
所以现在,Garry Russell 建议的解决方法已经为我解决了这个问题。
我这样使用 @RabbitListener
而不是 @StreamListenet
:
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "TX.Q1", durable = "true"), exchange = @Exchange(value = "TX", type = "topic", durable = "true"), key = "rk1")
.
因此,预定义队列 TX.Q1 与绑定密钥绑定:rk1 到交换 TX.
等待 Spring Cloud Steream issue 上的更新。
我想我找到了使用 @StreamListener
的解决方案,而不是使用解决方法。一切都在配置中完成,而不是在代码中。
我使用的配置如下(在 .yml 中,但您可以轻松地将其转换为 .properties):
spring:
cloud:
stream:
bindings:
input:
binder: <binder_name>
destination: TX
group: Q1
binders:
<binder_name>:
type: rabbit
environment:
spring:
rabbitmq:
host: <host>
port: <port>
virtual-host: <vhost>
username: <username>
password: <password>
rabbit:
bindings:
input:
consumer:
binding-routing-key: rk1
exchange-name: TX
queue-name-group-only: true
bind-queue: true
exchange-durable: true
exchange-type: topic
使用这种方法,您不必编写特定代码来让 RabbitMQ 消费者连接到您的集群,这应该可以解决您的问题。
希望对您有所帮助。
鼓励在 consumer 下使用此 属性 以使 rabbit 能够从现有队列中消费。请注意,队列名称将仅从组 属性 中选取,而不是从目标中选取。
queueNameGroupOnly: true
示例:
cloud:
stream:
# rabbit setting: https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
rabbit:
bindings:
input:
consumer:
acknowledgeMode: AUTO
bindingRoutingKey: DECISION_PERSISTENCE_KEY
declareExchange: false
bindQueue: false
queueNameGroupOnly: true
consumerTagPrefix: dpa-rabbit-consumer
bindings:
input:
binder: rabbit
group: DECISION_PERSISTENCE_QUEUE
content-type: application/json