Spring cloud stream rabbitmq 消费者中指定主题
Spring cloud stream rabbitmq specify topics in the consumer
我正在尝试发送具有不同主题的消息,然后根据它想要收听的主题配置消费者。
我的想法是使用单一目标 "domainMessage" 并使用自定义分区策略。我有一个枚举,我只是将该值用作 partitionKey,而 partitionStrategy 将只是 return 键(假设键将始终等于生产者端的分区计数)。
这行得通吗?如果是这样,我不确定如何配置消费者。
我的制作人有以下内容application.properties
spring.cloud.stream.bindings.output.destination=domainMessages
spring.cloud.stream.bindings.output.producer.partition-key-extractor-class=publisher.partitionstrategy.PartitionKeyExtractorImpl
spring.cloud.stream.bindings.output.producer.partition-selector-class=publisher.partitionstrategy.PartitionSelectorStrategyImpl
spring.cloud.stream.bindings.output.producer.partition-count=3
我的 PartitionKeyExtractorImpl 看起来像
@Override
public Object extractKey(Message<?> message) {
DomainMessage payload = (DomainMessage) message.getPayload();
return payload.getType();
}
我的 PartitionStrategyImpl 看起来像
@Override
public int selectPartition(Object key, int partitionCount) {
return (int)key;
}
我的消费者application.properties看起来像
spring.cloud.stream.bindings.input.destination=domainMessage
spring.cloud.stream.bindings.input.group=group01
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.rabbit.bindings.input.consumer.durable-subscription=true
假设 payload.getType()
可以 return 1-3 之间的值。如何将消费者配置为仅收听 partitionKey 为 1 和 3 的消息?
Spring云流对声明队列和交换很自以为是
spring.cloud.stream.bindings.input.destination=domainMessage
spring.cloud.stream.bindings.input.group=group01
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.rabbit.bindings.input.consumer.durable-subscription=true
spring.cloud.stream.bindings.input.consumer.instance-index=0
(注意实例索引)
这将绑定 domainMessage.group01-0
与路由密钥 domainMessage-0
交换 domainMessage
。
如果您希望在单个实例中使用多个分区,只需执行
spring.cloud.stream.bindings.input.destination=domainMessage-0,domainMessage-1
(并删除 instance-index
和 partitioned=true
)将不起作用,因为它将每个队列绑定到相应的交换器(例如 domainMessage-0
和路由键 #
).
一个解决方案是这样做,但手动添加一个交换到交换绑定,以使用适当的路由密钥将每个消费者交换 (domainMessage-n
) 绑定到上游交换 (domainMessage
) .
目前无法在单个 instance-index
.
中自动消耗多个分区
我正在尝试发送具有不同主题的消息,然后根据它想要收听的主题配置消费者。
我的想法是使用单一目标 "domainMessage" 并使用自定义分区策略。我有一个枚举,我只是将该值用作 partitionKey,而 partitionStrategy 将只是 return 键(假设键将始终等于生产者端的分区计数)。
这行得通吗?如果是这样,我不确定如何配置消费者。
我的制作人有以下内容application.properties
spring.cloud.stream.bindings.output.destination=domainMessages
spring.cloud.stream.bindings.output.producer.partition-key-extractor-class=publisher.partitionstrategy.PartitionKeyExtractorImpl
spring.cloud.stream.bindings.output.producer.partition-selector-class=publisher.partitionstrategy.PartitionSelectorStrategyImpl
spring.cloud.stream.bindings.output.producer.partition-count=3
我的 PartitionKeyExtractorImpl 看起来像
@Override
public Object extractKey(Message<?> message) {
DomainMessage payload = (DomainMessage) message.getPayload();
return payload.getType();
}
我的 PartitionStrategyImpl 看起来像
@Override
public int selectPartition(Object key, int partitionCount) {
return (int)key;
}
我的消费者application.properties看起来像
spring.cloud.stream.bindings.input.destination=domainMessage
spring.cloud.stream.bindings.input.group=group01
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.rabbit.bindings.input.consumer.durable-subscription=true
假设 payload.getType()
可以 return 1-3 之间的值。如何将消费者配置为仅收听 partitionKey 为 1 和 3 的消息?
Spring云流对声明队列和交换很自以为是
spring.cloud.stream.bindings.input.destination=domainMessage
spring.cloud.stream.bindings.input.group=group01
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.rabbit.bindings.input.consumer.durable-subscription=true
spring.cloud.stream.bindings.input.consumer.instance-index=0
(注意实例索引)
这将绑定 domainMessage.group01-0
与路由密钥 domainMessage-0
交换 domainMessage
。
如果您希望在单个实例中使用多个分区,只需执行
spring.cloud.stream.bindings.input.destination=domainMessage-0,domainMessage-1
(并删除 instance-index
和 partitioned=true
)将不起作用,因为它将每个队列绑定到相应的交换器(例如 domainMessage-0
和路由键 #
).
一个解决方案是这样做,但手动添加一个交换到交换绑定,以使用适当的路由密钥将每个消费者交换 (domainMessage-n
) 绑定到上游交换 (domainMessage
) .
目前无法在单个 instance-index
.