如何使用 RabbitMQ 订阅特定的路由键
How to subscribe to a specific routing key using RabbitMQ
我们正在设计一个微服务架构,我们想使用 RabbitMQ 作为消息代理。
我们希望每个服务都有一个特定的队列,比方说 applicationQueue
。
我们还定义了我们的消息有两种:
事件:路由到每个服务的消息。如果服务对某个特定事件感兴趣,它将拦截它并从中创建一个 任务。
Tasks: 消息表示从服务创建的作业给自己,它们应该只发布到服务本身的队列
到目前为止,我们正在努力使用 Spring AMQP 来实现它。
我们设计了一个消息生产者,所以在给定的 http 请求之后,它会为服务本身创建一个任务:
休息控制器:
@PostMapping
public void saveProduct(@RequestBody Product product) {
messageProducer.message("subscriptions.product.create", product)
.fromHttpRequest(requestContext)
.send();
}
我们消息生产者的send方法:
public void send() {
template.convertAndSend(exchange, routingKey, payload, message -> {
if (requestContext != null) {
extractHttpRequestInfo(message);
message.getMessageProperties().getHeaders()
.put(MessageDictionary.TRANSACTION_ID, generateTransactionId());
} else if (originalMessage != null) {
extractMessageInfo(message);
}
return message;
});
}
RabbitMQ 配置:
@Bean
List<Binding> binding(Queue queue, TopicExchange exchange) {
return Arrays.asList(
BindingBuilder.bind(queue).to(exchange).with("*.*"),
BindingBuilder.bind(queue).to(exchange).with("${condohub.rabbitmq.queue.name}.#")
);
}
然后在别处订阅(@Digest
注解是自定义注解):
@Digest("${condohub.rabbitmq.queue.name}.product.create")
public void createProduct(Product product) {
service.save(product);
}
欢迎任何帮助。
您的绑定没有意义;第一个将匹配具有 foo.bar
、baz.qux
等形式的所有键,因此第二个是无关紧要的。
您可能应该只对事件使用扇出交换,并且每个服务都有 2 个队列,一个用于事件的扇出,一个用于作业的主题交换(仅针对其自己的作业进行窄绑定)。
我们正在设计一个微服务架构,我们想使用 RabbitMQ 作为消息代理。
我们希望每个服务都有一个特定的队列,比方说 applicationQueue
。
我们还定义了我们的消息有两种:
事件:路由到每个服务的消息。如果服务对某个特定事件感兴趣,它将拦截它并从中创建一个 任务。
Tasks: 消息表示从服务创建的作业给自己,它们应该只发布到服务本身的队列
到目前为止,我们正在努力使用 Spring AMQP 来实现它。
我们设计了一个消息生产者,所以在给定的 http 请求之后,它会为服务本身创建一个任务:
休息控制器:
@PostMapping
public void saveProduct(@RequestBody Product product) {
messageProducer.message("subscriptions.product.create", product)
.fromHttpRequest(requestContext)
.send();
}
我们消息生产者的send方法:
public void send() {
template.convertAndSend(exchange, routingKey, payload, message -> {
if (requestContext != null) {
extractHttpRequestInfo(message);
message.getMessageProperties().getHeaders()
.put(MessageDictionary.TRANSACTION_ID, generateTransactionId());
} else if (originalMessage != null) {
extractMessageInfo(message);
}
return message;
});
}
RabbitMQ 配置:
@Bean
List<Binding> binding(Queue queue, TopicExchange exchange) {
return Arrays.asList(
BindingBuilder.bind(queue).to(exchange).with("*.*"),
BindingBuilder.bind(queue).to(exchange).with("${condohub.rabbitmq.queue.name}.#")
);
}
然后在别处订阅(@Digest
注解是自定义注解):
@Digest("${condohub.rabbitmq.queue.name}.product.create")
public void createProduct(Product product) {
service.save(product);
}
欢迎任何帮助。
您的绑定没有意义;第一个将匹配具有 foo.bar
、baz.qux
等形式的所有键,因此第二个是无关紧要的。
您可能应该只对事件使用扇出交换,并且每个服务都有 2 个队列,一个用于事件的扇出,一个用于作业的主题交换(仅针对其自己的作业进行窄绑定)。