Spring cloud stream rabbitmq 消费者中指定主题

Spring cloud stream rabbitmq specify topics in the consumer

我正在尝试发送具有不同主题的消息,然后根据它想要收听的主题配置消费者。

我的想法是使用单一目标 "domainMessage" 并使用自定义分区策略。我有一个枚举,我只是将该值用作 partitionKey,而 partitionStrategy 将只是 return 键(假设键将始终等于生产者端的分区计数)。

这行得通吗?如果是这样,我不确定如何配置消费者。

我的制作人有以下内容application.properties

spring.cloud.stream.bindings.output.destination=domainMessages
spring.cloud.stream.bindings.output.producer.partition-key-extractor-class=publisher.partitionstrategy.PartitionKeyExtractorImpl
spring.cloud.stream.bindings.output.producer.partition-selector-class=publisher.partitionstrategy.PartitionSelectorStrategyImpl
spring.cloud.stream.bindings.output.producer.partition-count=3

我的 PartitionKeyExtractorImpl 看起来像

@Override
    public Object extractKey(Message<?> message) {
        DomainMessage payload = (DomainMessage) message.getPayload();
        return payload.getType();
    }

我的 PartitionStrategyImpl 看起来像

@Override
    public int selectPartition(Object key, int partitionCount) {
        return (int)key;
    }

我的消费者application.properties看起来像

spring.cloud.stream.bindings.input.destination=domainMessage
spring.cloud.stream.bindings.input.group=group01
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.rabbit.bindings.input.consumer.durable-subscription=true

假设 payload.getType() 可以 return 1-3 之间的值。如何将消费者配置为仅收听 partitionKey 为 1 和 3 的消息?

Spring云流对声明队列和交换很自以为是

spring.cloud.stream.bindings.input.destination=domainMessage
spring.cloud.stream.bindings.input.group=group01
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.rabbit.bindings.input.consumer.durable-subscription=true
spring.cloud.stream.bindings.input.consumer.instance-index=0

(注意实例索引)

这将绑定 domainMessage.group01-0 与路由密钥 domainMessage-0 交换 domainMessage

如果您希望在单个实例中使用多个分区,只需执行

spring.cloud.stream.bindings.input.destination=domainMessage-0,domainMessage-1

(并删除 instance-indexpartitioned=true)将不起作用,因为它将每个队列绑定到相应的交换器(例如 domainMessage-0 和路由键 # ).

一个解决方案是这样做,但手动添加一个交换到交换绑定,以使用适当的路由密钥将每个消费者交换 (domainMessage-n) 绑定到上游交换 (domainMessage) .

目前无法在单个 instance-index.

中自动消耗多个分区