如何使用 Spring 流向所有侦听器广播 RabbitMQ 消息
How to broadcast the RabbitMQ message to all listeners using Spring Streams
我需要使用 Spring 流在所有可用的 Rabbit MQ 侦听器上使用一些消息。
我该怎么做?
此时我找到了从活页夹中删除 group
的方法。但是 1) 我不确定这是正确的方法 2) 消息从未从 queue 中删除,同时它也从未被新添加的听众使用。
这是我当前的配置:
spring:
cloud:
stream:
default:
contentType: "application/json"
bindings:
broadcast-queue-in:
destination: my
# group: BroadcastQueue
rabbit:
default:
consumer:
queueNameGroupOnly: true
bindings:
broadcast-queue-in:
consumer:
bindingRoutingKey: 'BroadcastQueue'
为了在一段合理的时间(这对我有用)后从 queue 中删除消息,我尝试了:
spring.cloud.stream.rabbit.bindings.broadcast-queue-in.consumer.ttl: 30000
和
spring.cloud.stream.rabbit.bindings.broadcast-queue-in.consumer.expire: 30000
没用。
我还尝试使用适当的过期时间和 ttl 在生产者中设置消息的 header。但这并没有奏效。
如果有任何想法,我们将不胜感激。谢谢。
要广播消息,您需要使用 fanout
交换。
使用 Spring Cloud Stream
就像更改配置文件一样简单。
对于制作人,您需要这样的东西:
spring:
cloud:
stream:
bindings:
broadcast-queue-out:
destination: broadcast-exchange-fanout
rabbit:
broadcast-queue-out:
exchangeType: fanout
以及您需要的消费者:
spring:
cloud:
stream:
bindings:
broadcast-queue-in:
destination: broadcast-exchange-fanout
consumer:
concurrency: 1
rabbit:
broadcast-queue-in:
exchangeType: fanout
如果您为消费者使用多个线程,例如:
spring:
cloud:
stream:
default:
contentType: "application/json"
consumer:
concurrency: 4
使用扇出队列的默认值毫无意义。无论如何它都会被单线程消耗。因此覆盖此默认值并将其设置为 1 是有意义的。
我需要使用 Spring 流在所有可用的 Rabbit MQ 侦听器上使用一些消息。
我该怎么做?
此时我找到了从活页夹中删除 group
的方法。但是 1) 我不确定这是正确的方法 2) 消息从未从 queue 中删除,同时它也从未被新添加的听众使用。
这是我当前的配置:
spring:
cloud:
stream:
default:
contentType: "application/json"
bindings:
broadcast-queue-in:
destination: my
# group: BroadcastQueue
rabbit:
default:
consumer:
queueNameGroupOnly: true
bindings:
broadcast-queue-in:
consumer:
bindingRoutingKey: 'BroadcastQueue'
为了在一段合理的时间(这对我有用)后从 queue 中删除消息,我尝试了:
spring.cloud.stream.rabbit.bindings.broadcast-queue-in.consumer.ttl: 30000
和
spring.cloud.stream.rabbit.bindings.broadcast-queue-in.consumer.expire: 30000
没用。
我还尝试使用适当的过期时间和 ttl 在生产者中设置消息的 header。但这并没有奏效。
如果有任何想法,我们将不胜感激。谢谢。
要广播消息,您需要使用 fanout
交换。
使用 Spring Cloud Stream
就像更改配置文件一样简单。
对于制作人,您需要这样的东西:
spring:
cloud:
stream:
bindings:
broadcast-queue-out:
destination: broadcast-exchange-fanout
rabbit:
broadcast-queue-out:
exchangeType: fanout
以及您需要的消费者:
spring:
cloud:
stream:
bindings:
broadcast-queue-in:
destination: broadcast-exchange-fanout
consumer:
concurrency: 1
rabbit:
broadcast-queue-in:
exchangeType: fanout
如果您为消费者使用多个线程,例如:
spring:
cloud:
stream:
default:
contentType: "application/json"
consumer:
concurrency: 4
使用扇出队列的默认值毫无意义。无论如何它都会被单线程消耗。因此覆盖此默认值并将其设置为 1 是有意义的。