Spring 云流rabbitMQ消费者分区
Spring cloud stream rabbitMQ consumer partition
我创建了示例 HttpSource 和 HttpSink。我的 application.properties 看起来像这样
Source
spring.cloud.stream.bindings.output.destination=greetings
spring.cloud.stream.bindings.output.partitionKeyExpression=payload
spring.cloud.stream.bindings.output.partitionCount=2
Sink
spring.cloud.stream.bindings.input.destination=greetings
spring.cloud.stream.bindings.input.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0
HttpSource
@RestController
@EnableBinding(Source.class)
public class SampleSource {
@Autowired
private MessageChannel output;
@RequestMapping(path="/message",method=RequestMethod.POST)
public void sendMessage(@RequestBody String name){
output.send(MessageBuilder.withPayload("Hello, "+name).build());
}
}
HttpSink
@EnableBinding(Sink.class)
public class SampleSink {
@ServiceActivator(inputChannel=Sink.INPUT)
public void sendMessage(String name){
System.out.println(name);
}
}
我将这两个应用程序都部署到了 Pivotal Cloud Foundry。 HttpSource 有一个端点,它在调用时将消息发送到名为 "greetings" 的主题交换。然后我将 HttpSink 缩放为有 2 个实例。这创建了两个队列和绑定到 "greetings" 交换。
现在,当我到达端点时,我发现消息已发送到两个队列。我知道这一点是因为我跟踪了日志,发现该消息被打印了两次。
如何才能使消息只进入其中一个队列?
编辑:
我没有在 Pivotal Cloud Foundry 中扩展 HttpSink,而是将 HttpSink 部署为两个不同的应用程序。但是在 application.properties 他们属于同一个组。其中一个有 instanceIndex=0 而另一个 intanceIndex=1.
即使是现在,我也得到一个绑定为“#”的队列和该队列的两个消费者。
如何让不同的 HttpSink 实例创建它们自己的队列,并根据 partitionKey 将来自 HttpSource 的消息路由到其中之一?
我之前没有注意到 - 属性:
中缺少 consumer.
spring.cloud.stream.bindings.input.consumer.partitioned=true
您是否使用 IDE 创建属性文件?当我复制您的属性时,我的 STS (3.8.1) 版本将此标记为一个问题。
我刚刚 运行 进行了测试,队列被正确命名并使用正确的密钥绑定到交换。
编辑
为了让它在 PCF 上正确扩展,我还必须注释掉 instanceIndex
属性(大概是因为它覆盖了 PCF 环境 属性),否则我有 2 个消费者在 -0
队列中。删除 属性 后,我得到了预期的 2 个队列。
我创建了示例 HttpSource 和 HttpSink。我的 application.properties 看起来像这样
Source
spring.cloud.stream.bindings.output.destination=greetings
spring.cloud.stream.bindings.output.partitionKeyExpression=payload
spring.cloud.stream.bindings.output.partitionCount=2
Sink
spring.cloud.stream.bindings.input.destination=greetings
spring.cloud.stream.bindings.input.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0
HttpSource
@RestController
@EnableBinding(Source.class)
public class SampleSource {
@Autowired
private MessageChannel output;
@RequestMapping(path="/message",method=RequestMethod.POST)
public void sendMessage(@RequestBody String name){
output.send(MessageBuilder.withPayload("Hello, "+name).build());
}
}
HttpSink
@EnableBinding(Sink.class)
public class SampleSink {
@ServiceActivator(inputChannel=Sink.INPUT)
public void sendMessage(String name){
System.out.println(name);
}
}
我将这两个应用程序都部署到了 Pivotal Cloud Foundry。 HttpSource 有一个端点,它在调用时将消息发送到名为 "greetings" 的主题交换。然后我将 HttpSink 缩放为有 2 个实例。这创建了两个队列和绑定到 "greetings" 交换。
现在,当我到达端点时,我发现消息已发送到两个队列。我知道这一点是因为我跟踪了日志,发现该消息被打印了两次。
如何才能使消息只进入其中一个队列?
编辑:
我没有在 Pivotal Cloud Foundry 中扩展 HttpSink,而是将 HttpSink 部署为两个不同的应用程序。但是在 application.properties 他们属于同一个组。其中一个有 instanceIndex=0 而另一个 intanceIndex=1.
即使是现在,我也得到一个绑定为“#”的队列和该队列的两个消费者。
如何让不同的 HttpSink 实例创建它们自己的队列,并根据 partitionKey 将来自 HttpSource 的消息路由到其中之一?
我之前没有注意到 - 属性:
中缺少consumer.
spring.cloud.stream.bindings.input.consumer.partitioned=true
您是否使用 IDE 创建属性文件?当我复制您的属性时,我的 STS (3.8.1) 版本将此标记为一个问题。
我刚刚 运行 进行了测试,队列被正确命名并使用正确的密钥绑定到交换。
编辑
为了让它在 PCF 上正确扩展,我还必须注释掉 instanceIndex
属性(大概是因为它覆盖了 PCF 环境 属性),否则我有 2 个消费者在 -0
队列中。删除 属性 后,我得到了预期的 2 个队列。