Spring 云流rabbitMQ消费者分区

Spring cloud stream rabbitMQ consumer partition

我创建了示例 HttpSource 和 HttpSink。我的 application.properties 看起来像这样

Source
spring.cloud.stream.bindings.output.destination=greetings
spring.cloud.stream.bindings.output.partitionKeyExpression=payload
spring.cloud.stream.bindings.output.partitionCount=2


Sink
spring.cloud.stream.bindings.input.destination=greetings
spring.cloud.stream.bindings.input.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0

HttpSource

@RestController
@EnableBinding(Source.class)
 public class SampleSource {

 @Autowired
 private MessageChannel output;

 @RequestMapping(path="/message",method=RequestMethod.POST)
 public void sendMessage(@RequestBody String name){
    output.send(MessageBuilder.withPayload("Hello, "+name).build());
 }
}

HttpSink

@EnableBinding(Sink.class)
public class SampleSink {

 @ServiceActivator(inputChannel=Sink.INPUT)
 public void sendMessage(String name){
    System.out.println(name);
 }
}

我将这两个应用程序都部署到了 Pivotal Cloud Foundry。 HttpSource 有一个端点,它在调用时将消息发送到名为 "greetings" 的主题交换。然后我将 HttpSink 缩放为有 2 个实例。这创建了两个队列和绑定到 "greetings" 交换。

现在,当我到达端点时,我发现消息已发送到两个队列。我知道这一点是因为我跟踪了日志,发现该消息被打印了两次。

如何才能使消息只进入其中一个队列?

编辑:

我没有在 Pivotal Cloud Foundry 中扩展 HttpSink,而是将 HttpSink 部署为两个不同的应用程序。但是在 application.properties 他们属于同一个组。其中一个有 instanceIndex=0 而另一个 intanceIndex=1.

即使是现在,我也得到一个绑定为“#”的队列和该队列的两个消费者。

如何让不同的 HttpSink 实例创建它们自己的队列,并根据 partitionKey 将来自 HttpSource 的消息路由到其中之一?

我之前没有注意到 - 属性:

中缺少 consumer.
spring.cloud.stream.bindings.input.consumer.partitioned=true

参见 the documentation

您是否使用 IDE 创建属性文件?当我复制您的属性时,我的 STS (3.8.1) 版本将此标记为一个问题。

我刚刚 运行 进行了测试,队列被正确命名并使用正确的密钥绑定到交换。

编辑

为了让它在 PCF 上正确扩展,我还必须注释掉 instanceIndex 属性(大概是因为它覆盖了 PCF 环境 属性),否则我有 2 个消费者在 -0 队列中。删除 属性 后,我得到了预期的 2 个队列。