如何使用 Spring 流向所有侦听器广播 RabbitMQ 消息

How to broadcast the RabbitMQ message to all listeners using Spring Streams

我需要使用 Spring 流在所有可用的 Rabbit MQ 侦听器上使用一些消息。

我该怎么做?

此时我找到了从活页夹中删除 group 的方法。但是 1) 我不确定这是正确的方法 2) 消息从未从 queue 中删除,同时它也从未被新添加的听众使用。

这是我当前的配置:

spring:

  cloud:
    stream:
      default:
        contentType: "application/json"
      bindings:
        broadcast-queue-in:
          destination: my
#          group: BroadcastQueue
      rabbit:
        default:
          consumer:
            queueNameGroupOnly: true
        bindings:
          broadcast-queue-in:
            consumer:
              bindingRoutingKey: 'BroadcastQueue'
             

为了在一段合理的时间(这对我有用)后从 queue 中删除消息,我尝试了: spring.cloud.stream.rabbit.bindings.broadcast-queue-in.consumer.ttl: 30000spring.cloud.stream.rabbit.bindings.broadcast-queue-in.consumer.expire: 30000

没用。

我还尝试使用适当的过期时间和 ttl 在生产者中设置消息的 header。但这并没有奏效。

如果有任何想法,我们将不胜感激。谢谢。

要广播消息,您需要使用 fanout 交换。 使用 Spring Cloud Stream 就像更改配置文件一样简单。

对于制作人,您需要这样的东西:

spring:
  cloud:
    stream:
      bindings:
        broadcast-queue-out:
           destination: broadcast-exchange-fanout
      rabbit:
        broadcast-queue-out:
          exchangeType: fanout

以及您需要的消费者:

spring:
  cloud:
    stream:
      bindings:
        broadcast-queue-in:
           destination: broadcast-exchange-fanout
           consumer:
             concurrency: 1
      rabbit:
        broadcast-queue-in:
          exchangeType: fanout

如果您为消费者使用多个线程,例如:

spring:
  cloud:
    stream:
      default:
        contentType: "application/json"
        consumer:
          concurrency: 4

使用扇出队列的默认值毫无意义。无论如何它都会被单线程消耗。因此覆盖此默认值并将其设置为 1 是有意义的。