Grails rabbitmq 本机过滤消息给消费者

Grails rabbitmq native filter messages to consumer

我正在使用 grails 3.2.3 版本和 rabbitmq native plugin 3.3.2 (http://budjb.github.io/grails-rabbitmq-native/doc/manual/)。我正在尝试实现以下场景。
描述: 我正在使用 headers 向单个 queue 发送多条消息,在消费者部分,我尝试通过特定过滤应用绑定来消费消息。但是无论过滤如何,消费者都会使用所有消息 - 意味着绑定不起作用。我也是 rabbitmq 的初学者。因此,非常感谢任何 help/direction。下面是我的代码。

Queue 配置 application.groovy:

rabbitmq {
    queues = [
        [
                name      : "mail.queue",
                connection: "defaultConnection",
                durable   : true
        ]
]

}

发送到queue函数:

protected void sendToQueue(QueueType queueType, Map message, Map<String, String> binding = null) {
    rabbitMessagePublisher.send {
        routingKey = queueType.queueName
        body = message
        autoConvert = true
        if (headers != null) {
            headers = binding
        }
    }
}

这里 sendToQueue 我将第三个参数设为可选,因为在某些情况下我不需要多种类型的消费者;

正在调用发送至 queue:

sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET.name()])
sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()])

消费者 1:

static rabbitConfig = [
        queue   : QueueType.EMAIL_QUEUE.queueName,
        binding : ["emailType": EmailType.PASSWORD_RESET.name()],
        match   : "all",
        consumer: 10
]

def handleMessage(Map message, MessageContext context) {
    print("From PasswordResetEmailConsumer consumer")
    println(message)
    passwordResetEmailService.sendPasswordResetMail(message)
}

消费者 2:

static rabbitConfig = [
        queue   : QueueType.EMAIL_QUEUE.queueName,
        binding : ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()],
        match   : "all",
        consumer: 10
]

def handleMessage(Map message, MessageContext context) {
    print("From PasswordResetSuccessEmailConsumer consumer")
    println(message)
    passwordResetSuccessEmailService.sendPasswordResetSuccessMail(message)
}

阅读 rabbitmq 文档后,我意识到无法从单个队列中有选择地拉取消息。

Consumer receives all messages from the queue

尽管还有另一个选项 "Exchange",其中发布者将发布消息以与路由键交换,并且这些消息将被传递到绑定队列。更多:RabbitMQ Publish/Subscribe Model
这里也描述了基本思想:Whosebug: RabbitMQ selectively retrieving messages from a queue
无论如何,在我的解决方案中我不想要多个队列。因此,我创建了一个消费者,并通过消息传递实际处理程序 class bean 引用以分派消息。分享实现,希望这对某人有帮助:

application.groovy中的队列配置:

rabbitmq {
    queues = [
        [
                name      : "mail.queue",
                connection: "defaultConnection",
                durable   : true
        ]
    ]
}

发送到队列函数:

protected void sendToQueue(Map message, QueueType queueType, Class<BaseQueueHandler> queueHandlerServiceClass) {
    message.queueHandlerServiceClass = queueHandlerServiceClass.name
    rabbitMessagePublisher.send {
        routingKey = queueType.queueName // queue name from enum: "mail.queue"
        body = message
        autoConvert = true
    }
}

处理程序接口:

interface BaseQueueHandler {
    void handleMessage(Map message, MessageContext context)
}

正在发送至队列:

sendToQueue([user: user], QueueType.EMAIL_QUEUE, PasswordResetEmailService.class)

队列消费者:

class EchoEmailQueueConsumer {

    static rabbitConfig = [
            queue   : QueueType.ECHO_EMAIL_QUEUE.queueName,
            consumer: 10
    ]

    GrailsApplication grailsApplication

    def handleMessage(Map message, MessageContext context) {
        String handlerClass = message.remove("queueHandlerServiceClass")
        Class<BaseQueueHandler> handlerClassType = Class.forName(handlerClass);
        BaseQueueHandler queueService = grailsApplication.mainContext.getBean(handlerClassType)
        queueService.handleMessage(message, context)
    }

}

最后实现Handler接口的Handler服务:

class PasswordResetEmailService implements BaseQueueHandler {

    @Override
    void handleMessage(Map message, MessageContext context) {
        println("message received in PasswordResetEmailService")
    }
}