Grails rabbitmq 本机过滤消息给消费者
Grails rabbitmq native filter messages to consumer
我正在使用 grails 3.2.3
版本和 rabbitmq native plugin 3.3.2
(http://budjb.github.io/grails-rabbitmq-native/doc/manual/)。我正在尝试实现以下场景。
描述: 我正在使用 headers 向单个 queue 发送多条消息,在消费者部分,我尝试通过特定过滤应用绑定来消费消息。但是无论过滤如何,消费者都会使用所有消息 - 意味着绑定不起作用。我也是 rabbitmq 的初学者。因此,非常感谢任何 help/direction。下面是我的代码。
Queue 配置 application.groovy:
rabbitmq {
queues = [
[
name : "mail.queue",
connection: "defaultConnection",
durable : true
]
]
}
发送到queue函数:
protected void sendToQueue(QueueType queueType, Map message, Map<String, String> binding = null) {
rabbitMessagePublisher.send {
routingKey = queueType.queueName
body = message
autoConvert = true
if (headers != null) {
headers = binding
}
}
}
这里 sendToQueue
我将第三个参数设为可选,因为在某些情况下我不需要多种类型的消费者;
正在调用发送至 queue:
sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET.name()])
sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()])
消费者 1:
static rabbitConfig = [
queue : QueueType.EMAIL_QUEUE.queueName,
binding : ["emailType": EmailType.PASSWORD_RESET.name()],
match : "all",
consumer: 10
]
def handleMessage(Map message, MessageContext context) {
print("From PasswordResetEmailConsumer consumer")
println(message)
passwordResetEmailService.sendPasswordResetMail(message)
}
消费者 2:
static rabbitConfig = [
queue : QueueType.EMAIL_QUEUE.queueName,
binding : ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()],
match : "all",
consumer: 10
]
def handleMessage(Map message, MessageContext context) {
print("From PasswordResetSuccessEmailConsumer consumer")
println(message)
passwordResetSuccessEmailService.sendPasswordResetSuccessMail(message)
}
阅读 rabbitmq 文档后,我意识到无法从单个队列中有选择地拉取消息。
Consumer receives all messages from the queue
尽管还有另一个选项 "Exchange"
,其中发布者将发布消息以与路由键交换,并且这些消息将被传递到绑定队列。更多:RabbitMQ Publish/Subscribe Model
这里也描述了基本思想:Whosebug: RabbitMQ selectively retrieving messages from a queue
无论如何,在我的解决方案中我不想要多个队列。因此,我创建了一个消费者,并通过消息传递实际处理程序 class bean 引用以分派消息。分享实现,希望这对某人有帮助:
application.groovy中的队列配置:
rabbitmq {
queues = [
[
name : "mail.queue",
connection: "defaultConnection",
durable : true
]
]
}
发送到队列函数:
protected void sendToQueue(Map message, QueueType queueType, Class<BaseQueueHandler> queueHandlerServiceClass) {
message.queueHandlerServiceClass = queueHandlerServiceClass.name
rabbitMessagePublisher.send {
routingKey = queueType.queueName // queue name from enum: "mail.queue"
body = message
autoConvert = true
}
}
处理程序接口:
interface BaseQueueHandler {
void handleMessage(Map message, MessageContext context)
}
正在发送至队列:
sendToQueue([user: user], QueueType.EMAIL_QUEUE, PasswordResetEmailService.class)
队列消费者:
class EchoEmailQueueConsumer {
static rabbitConfig = [
queue : QueueType.ECHO_EMAIL_QUEUE.queueName,
consumer: 10
]
GrailsApplication grailsApplication
def handleMessage(Map message, MessageContext context) {
String handlerClass = message.remove("queueHandlerServiceClass")
Class<BaseQueueHandler> handlerClassType = Class.forName(handlerClass);
BaseQueueHandler queueService = grailsApplication.mainContext.getBean(handlerClassType)
queueService.handleMessage(message, context)
}
}
最后实现Handler接口的Handler服务:
class PasswordResetEmailService implements BaseQueueHandler {
@Override
void handleMessage(Map message, MessageContext context) {
println("message received in PasswordResetEmailService")
}
}
我正在使用 grails 3.2.3
版本和 rabbitmq native plugin 3.3.2
(http://budjb.github.io/grails-rabbitmq-native/doc/manual/)。我正在尝试实现以下场景。
描述: 我正在使用 headers 向单个 queue 发送多条消息,在消费者部分,我尝试通过特定过滤应用绑定来消费消息。但是无论过滤如何,消费者都会使用所有消息 - 意味着绑定不起作用。我也是 rabbitmq 的初学者。因此,非常感谢任何 help/direction。下面是我的代码。
Queue 配置 application.groovy:
rabbitmq {
queues = [
[
name : "mail.queue",
connection: "defaultConnection",
durable : true
]
]
}
发送到queue函数:
protected void sendToQueue(QueueType queueType, Map message, Map<String, String> binding = null) {
rabbitMessagePublisher.send {
routingKey = queueType.queueName
body = message
autoConvert = true
if (headers != null) {
headers = binding
}
}
}
这里 sendToQueue
我将第三个参数设为可选,因为在某些情况下我不需要多种类型的消费者;
正在调用发送至 queue:
sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET.name()])
sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()])
消费者 1:
static rabbitConfig = [
queue : QueueType.EMAIL_QUEUE.queueName,
binding : ["emailType": EmailType.PASSWORD_RESET.name()],
match : "all",
consumer: 10
]
def handleMessage(Map message, MessageContext context) {
print("From PasswordResetEmailConsumer consumer")
println(message)
passwordResetEmailService.sendPasswordResetMail(message)
}
消费者 2:
static rabbitConfig = [
queue : QueueType.EMAIL_QUEUE.queueName,
binding : ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()],
match : "all",
consumer: 10
]
def handleMessage(Map message, MessageContext context) {
print("From PasswordResetSuccessEmailConsumer consumer")
println(message)
passwordResetSuccessEmailService.sendPasswordResetSuccessMail(message)
}
阅读 rabbitmq 文档后,我意识到无法从单个队列中有选择地拉取消息。
Consumer receives all messages from the queue
尽管还有另一个选项 "Exchange"
,其中发布者将发布消息以与路由键交换,并且这些消息将被传递到绑定队列。更多:RabbitMQ Publish/Subscribe Model
这里也描述了基本思想:Whosebug: RabbitMQ selectively retrieving messages from a queue
无论如何,在我的解决方案中我不想要多个队列。因此,我创建了一个消费者,并通过消息传递实际处理程序 class bean 引用以分派消息。分享实现,希望这对某人有帮助:
application.groovy中的队列配置:
rabbitmq {
queues = [
[
name : "mail.queue",
connection: "defaultConnection",
durable : true
]
]
}
发送到队列函数:
protected void sendToQueue(Map message, QueueType queueType, Class<BaseQueueHandler> queueHandlerServiceClass) {
message.queueHandlerServiceClass = queueHandlerServiceClass.name
rabbitMessagePublisher.send {
routingKey = queueType.queueName // queue name from enum: "mail.queue"
body = message
autoConvert = true
}
}
处理程序接口:
interface BaseQueueHandler {
void handleMessage(Map message, MessageContext context)
}
正在发送至队列:
sendToQueue([user: user], QueueType.EMAIL_QUEUE, PasswordResetEmailService.class)
队列消费者:
class EchoEmailQueueConsumer {
static rabbitConfig = [
queue : QueueType.ECHO_EMAIL_QUEUE.queueName,
consumer: 10
]
GrailsApplication grailsApplication
def handleMessage(Map message, MessageContext context) {
String handlerClass = message.remove("queueHandlerServiceClass")
Class<BaseQueueHandler> handlerClassType = Class.forName(handlerClass);
BaseQueueHandler queueService = grailsApplication.mainContext.getBean(handlerClassType)
queueService.handleMessage(message, context)
}
}
最后实现Handler接口的Handler服务:
class PasswordResetEmailService implements BaseQueueHandler {
@Override
void handleMessage(Map message, MessageContext context) {
println("message received in PasswordResetEmailService")
}
}