如何使用 RMQ 和 spring 云流创建基于分区的生产者?
How to use RMQ and spring cloud stream for creating partition based producer?
我试图找到 spring 云流的示例,它在其中为 RMQ 创建基于分区的生产者。我想看看它将如何为这些队列创建绑定,因为 RMQ 本身不支持主题分区,但它会创建与分区数相等的队列数(我读过这个,我可能是错的)。首先,我想了解如何使用 spring RMQ 上的云流为基于分区的生产者创建生产者。
@SpringBootApplication
@EnableBinding(Source.class)
public class So43614477Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So43614477Application.class, args);
}
@Autowired
private MessageChannel output;
@Override
public void run(String... args) throws Exception {
output.send(MessageBuilder.withPayload("fiz").setHeader("whichPart", 0).build());
output.send(MessageBuilder.withPayload("buz").setHeader("whichPart", 1).build());
}
}
具有属性...
spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['whichPart']
spring.cloud.stream.bindings.output.producer.partition-count=2
spring.cloud.stream.bindings.output.producer.required-groups=bar
我添加了 required-groups
这样您就可以看到消费者队列是如何绑定的。
foo
交换绑定:
表达式可以是针对消息计算的任何有效表达式(例如 payload.hashCode()
);然后根据分区计数修改 %
以确定最终分区。
我试图找到 spring 云流的示例,它在其中为 RMQ 创建基于分区的生产者。我想看看它将如何为这些队列创建绑定,因为 RMQ 本身不支持主题分区,但它会创建与分区数相等的队列数(我读过这个,我可能是错的)。首先,我想了解如何使用 spring RMQ 上的云流为基于分区的生产者创建生产者。
@SpringBootApplication
@EnableBinding(Source.class)
public class So43614477Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So43614477Application.class, args);
}
@Autowired
private MessageChannel output;
@Override
public void run(String... args) throws Exception {
output.send(MessageBuilder.withPayload("fiz").setHeader("whichPart", 0).build());
output.send(MessageBuilder.withPayload("buz").setHeader("whichPart", 1).build());
}
}
具有属性...
spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['whichPart']
spring.cloud.stream.bindings.output.producer.partition-count=2
spring.cloud.stream.bindings.output.producer.required-groups=bar
我添加了 required-groups
这样您就可以看到消费者队列是如何绑定的。
foo
交换绑定:
表达式可以是针对消息计算的任何有效表达式(例如 payload.hashCode()
);然后根据分区计数修改 %
以确定最终分区。