如何使用 RMQ 和 spring 云流创建基于分区的消费者
How to use RMQ and spring cloud stream for creating partition based consumer
如果我有 3 个由生产者创建的分区,并且如果我在 CF 中部署 3 个实例,每个实例都选择一个队列并使用记录的索引处理消息,我就能够使用云流和 Rabbit MQ 开发示例消费者。
现在的问题是,如果我有 10 个分区,似乎我需要 10 个实例,那是资源浪费,我们能否让一个消费者监听多个分区。我之所以有基于分区的生产者,是因为对我来说,处理消息的顺序很重要。
您为什么认为这是一种资源浪费?如果您的要求要求您需要有状态的处理并且您正在拆分为多个分区,那么您将需要 N 个消费者用于 N 个分区。
如果您在同一个队列中混合不同分区的消息,您的排序会受到影响。除非你在你这边添加一些逻辑来根据某些元数据聚合消息。
这是一种方法...
@SpringBootApplication
@EnableBinding(TwoInputs.class)
public class So43661064Application {
public static void main(String[] args) {
SpringApplication.run(So43661064Application.class, args);
}
@StreamListener("input1")
public void foo1(String in) {
doFoo(in);
}
@StreamListener("input2")
public void foo2(String in) {
doFoo(in);
}
protected void doFoo(String in) {
System.out.println(in);
}
public interface TwoInputs {
@Input("input1")
SubscribableChannel input1();
@Input("input2")
SubscribableChannel input2();
}
}
和
spring.cloud.stream.bindings.input1.group=bar-0
spring.cloud.stream.bindings.input1.destination=foo
spring.cloud.stream.rabbit.bindings.input1.consumer.bindingRoutingKey=foo-0
spring.cloud.stream.bindings.input2.group=bar-1
spring.cloud.stream.bindings.input2.destination=foo
spring.cloud.stream.rabbit.bindings.input2.consumer.bindingRoutingKey=foo-1
这将从生产者在 中创建的 2 个分区中消费。
目前没有办法让 @StreamListener
直接监听 2 个分区。
编辑
这是另一种方法,使用 exchange->exchange
绑定...
制作人
@SpringBootApplication
@EnableBinding(Source.class)
public class So43614477Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So43614477Application.class, args).close();
}
@Autowired
private MessageChannel output;
@Autowired
private AmqpAdmin admin;
@Value("${spring.cloud.stream.bindings.output.producer.partition-count}")
private int partitionCount;
@Value("${spring.cloud.stream.bindings.output.destination}")
private String destination;
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < this.partitionCount; i++) {
String partition = this.destination + "-" + i;
TopicExchange exchange = new TopicExchange(partition);
this.admin.declareExchange(exchange);
Binding binding = BindingBuilder.bind(exchange).to(new TopicExchange(this.destination))
.with(partition);
this.admin.declareBinding(binding);
}
output.send(MessageBuilder.withPayload("fiz").setHeader("whichPart", 0).build());
output.send(MessageBuilder.withPayload("buz").setHeader("whichPart", 1).build());
}
}
和
spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['whichPart']
spring.cloud.stream.bindings.output.producer.partition-count=2
消费者
@SpringBootApplication
@EnableBinding(Sink.class)
public class So43661064Application {
public static void main(String[] args) {
SpringApplication.run(So43661064Application.class, args);
}
@StreamListener(Sink.INPUT)
public void foo1(String in) {
System.out.println(in);
}
}
和
spring.cloud.stream.bindings.input.group=bar
spring.cloud.stream.bindings.input.destination=foo-0,foo-1
来自主交换器的分区被路由到分区交换器,消费者获得交换器列表以将他的队列绑定到。
您可以在命令行中传递该列表。
如果我有 3 个由生产者创建的分区,并且如果我在 CF 中部署 3 个实例,每个实例都选择一个队列并使用记录的索引处理消息,我就能够使用云流和 Rabbit MQ 开发示例消费者。
现在的问题是,如果我有 10 个分区,似乎我需要 10 个实例,那是资源浪费,我们能否让一个消费者监听多个分区。我之所以有基于分区的生产者,是因为对我来说,处理消息的顺序很重要。
您为什么认为这是一种资源浪费?如果您的要求要求您需要有状态的处理并且您正在拆分为多个分区,那么您将需要 N 个消费者用于 N 个分区。
如果您在同一个队列中混合不同分区的消息,您的排序会受到影响。除非你在你这边添加一些逻辑来根据某些元数据聚合消息。
这是一种方法...
@SpringBootApplication
@EnableBinding(TwoInputs.class)
public class So43661064Application {
public static void main(String[] args) {
SpringApplication.run(So43661064Application.class, args);
}
@StreamListener("input1")
public void foo1(String in) {
doFoo(in);
}
@StreamListener("input2")
public void foo2(String in) {
doFoo(in);
}
protected void doFoo(String in) {
System.out.println(in);
}
public interface TwoInputs {
@Input("input1")
SubscribableChannel input1();
@Input("input2")
SubscribableChannel input2();
}
}
和
spring.cloud.stream.bindings.input1.group=bar-0
spring.cloud.stream.bindings.input1.destination=foo
spring.cloud.stream.rabbit.bindings.input1.consumer.bindingRoutingKey=foo-0
spring.cloud.stream.bindings.input2.group=bar-1
spring.cloud.stream.bindings.input2.destination=foo
spring.cloud.stream.rabbit.bindings.input2.consumer.bindingRoutingKey=foo-1
这将从生产者在
目前没有办法让 @StreamListener
直接监听 2 个分区。
编辑
这是另一种方法,使用 exchange->exchange
绑定...
制作人
@SpringBootApplication
@EnableBinding(Source.class)
public class So43614477Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So43614477Application.class, args).close();
}
@Autowired
private MessageChannel output;
@Autowired
private AmqpAdmin admin;
@Value("${spring.cloud.stream.bindings.output.producer.partition-count}")
private int partitionCount;
@Value("${spring.cloud.stream.bindings.output.destination}")
private String destination;
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < this.partitionCount; i++) {
String partition = this.destination + "-" + i;
TopicExchange exchange = new TopicExchange(partition);
this.admin.declareExchange(exchange);
Binding binding = BindingBuilder.bind(exchange).to(new TopicExchange(this.destination))
.with(partition);
this.admin.declareBinding(binding);
}
output.send(MessageBuilder.withPayload("fiz").setHeader("whichPart", 0).build());
output.send(MessageBuilder.withPayload("buz").setHeader("whichPart", 1).build());
}
}
和
spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['whichPart']
spring.cloud.stream.bindings.output.producer.partition-count=2
消费者
@SpringBootApplication
@EnableBinding(Sink.class)
public class So43661064Application {
public static void main(String[] args) {
SpringApplication.run(So43661064Application.class, args);
}
@StreamListener(Sink.INPUT)
public void foo1(String in) {
System.out.println(in);
}
}
和
spring.cloud.stream.bindings.input.group=bar
spring.cloud.stream.bindings.input.destination=foo-0,foo-1
来自主交换器的分区被路由到分区交换器,消费者获得交换器列表以将他的队列绑定到。
您可以在命令行中传递该列表。