Spring 不同消费者的每条消息的 Cloud Stream 主题
Spring Cloud Stream topic per message for different consumers
我要找的拓扑是
到目前为止,我还没有看到在 Cloud Stream 中为每条消息定义主题的方法。我了解消费者将绑定到特定主题,但生产者如何在将消息发送到交换器之前设置每条消息的主题?
source.output().send(MessageBuilder.withPayload(myMessage).build());
不提供任何方法来设置交换主题以路由到正确的消费者。
或者是我理解有误?
更新
由于 bindingRoutingKey
为 2222
,我预计不会在消费者中收到消息,而我正在发送 routeTo
1111
。但我还是收到了消费者。
生产者属性:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.output.destination=messageExchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers['routeTo']
@EnableBinding(Source.class)
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
发件人:
source.output().send(MessageBuilder.withPayload(mo).setHeader("routeTo", "1111").build());
消费者:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.bindings.input.destination=messageExchange
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=2222
申请:
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void ReceiveMo(String moDTO) {
log.info("Message received moDTO: {}", moDTO);
}
}
第二次更新
根据下面接受的答案中的建议。我能够让它发挥作用。需要使用其 UI 从 RabbitMQ 中删除交换和队列并重新启动 RabbitMQ docker 映像。
routingKeyExpression
rabbitmq producer property.
例如...producer.routing-key-expression=headers['routeTo']
然后
source.output().send(MessageBuilder.withPayload(myMessage)
.setHeader("routeTo", "Booking.new")
.build());
请注意,目标是交换名称。默认情况下,活页夹需要 Topic
交换。如果您希望使用直接交换,则必须设置 exchangeType
属性.
我要找的拓扑是
到目前为止,我还没有看到在 Cloud Stream 中为每条消息定义主题的方法。我了解消费者将绑定到特定主题,但生产者如何在将消息发送到交换器之前设置每条消息的主题?
source.output().send(MessageBuilder.withPayload(myMessage).build());
不提供任何方法来设置交换主题以路由到正确的消费者。
或者是我理解有误?
更新
由于 bindingRoutingKey
为 2222
,我预计不会在消费者中收到消息,而我正在发送 routeTo
1111
。但我还是收到了消费者。
生产者属性:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.output.destination=messageExchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers['routeTo']
@EnableBinding(Source.class)
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
发件人:
source.output().send(MessageBuilder.withPayload(mo).setHeader("routeTo", "1111").build());
消费者:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.bindings.input.destination=messageExchange
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=2222
申请:
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void ReceiveMo(String moDTO) {
log.info("Message received moDTO: {}", moDTO);
}
}
第二次更新
根据下面接受的答案中的建议。我能够让它发挥作用。需要使用其 UI 从 RabbitMQ 中删除交换和队列并重新启动 RabbitMQ docker 映像。
routingKeyExpression
rabbitmq producer property.
例如...producer.routing-key-expression=headers['routeTo']
然后
source.output().send(MessageBuilder.withPayload(myMessage)
.setHeader("routeTo", "Booking.new")
.build());
请注意,目标是交换名称。默认情况下,活页夹需要 Topic
交换。如果您希望使用直接交换,则必须设置 exchangeType
属性.