使用 RabbitMQ 插件在 Grails 中创建队列运行时
Create queue runtime in Grails with RabbitMQ plugin
我有一个系统,外部系统可以在其中订阅我的系统生成的事件。该系统是用 Grails 2 编写的,使用 RabbitMQ plugin 进行内部消息传递。到外部系统的事件通过 HTTP 传送。
我想为每个订阅者创建一个队列,以防止速度较慢的订阅者端点减慢发送给其他订阅者的消息。订阅可以在运行时发生,这就是为什么不希望在应用程序配置中定义队列。
如何使用 Grails RabbitMQ 插件创建具有主题绑定运行时的队列?
由于从 RabbitMQ 队列中读取消息与服务直接耦合,因此创建队列运行时的一个附带问题可能是拥有该 Grails 服务的多个实例。有什么想法吗?
我没有适合您的现成解决方案,但如果您遵循 RabbitmqGrailsPlugin Descriptor 中的代码,尤其是 doWithSpring
部分
您应该能够重新创建在运行时动态初始化新 Queue
和关联的 Listener
所需的步骤。
一切都归结为传递所需的参数、注册必要的 spring bean 并启动侦听器。
为了回答你的第二个问题,我认为你可以想出一些命名约定并为每个队列创建一个新的队列处理程序。可以在此处找到如何动态创建 spring beans 的示例:dynamically declare beans
只是一个简短的例子,我将如何快速注册一个队列,它需要更多的布线等...
def createQ(queueName) {
def queuesConfig = {
"${queueName}"(durable: true, autoDelete: false,)
}
def queueBuilder = new RabbitQueueBuilder()
queuesConfig.delegate = queueBuilder
queuesConfig.resolveStrategy = Closure.DELEGATE_FIRST
queuesConfig()
queueBuilder.queues?.each { queue ->
if (log.debugEnabled) {
log.debug "Registering queue '${queue.name}'"
}
BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(Queue.class);
builder.addConstructorArgValue(queue.name)
builder.addConstructorArgValue(Boolean.valueOf(queue.durable))
builder.addConstructorArgValue(Boolean.valueOf(queue.exclusive))
builder.addConstructorArgValue(Boolean.valueOf(queue.autoDelete))
builder.addConstructorArgValue(queue.arguments)
DefaultListableBeanFactory factory = (DefaultListableBeanFactory) grailsApplication.mainContext.getBeanFactory();
factory.registerBeanDefinition("grails.rabbit.queue.${queue.name}", builder.getBeanDefinition());
}
}
我最终使用了 Spring Grails RabbitMQ 插件使用的 AMQP。删除了一些 methods/arguments,因为它们与样本无关:
class MyUpdater {
void handleMessage(Object message) {
String content = new String(message)
// do whatever you need with the message
}
}
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.TopicExchange
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
import org.springframework.amqp.support.converter.SimpleMessageConverter
import org.springframework.amqp.rabbit.connection.ConnectionFactory
class ListenerInitiator {
// autowired
ConnectionFactory rabbitMQConnectionFactory
protected void initiateListener() {
RabbitAdmin admin = new RabbitAdmin(rabbitMQConnectionFactory)
// normally passed to this method, moved to local vars for simplicity
String queueName = "myQueueName"
String routingKey = "#"
String exchange = "myExchange"
Queue queue = new Queue(queueName)
admin.declareQueue(queue)
TopicExchange exchange = new TopicExchange(exchange)
admin.declareExchange(exchange)
admin.declareBinding( BindingBuilder.bind(queue).to(exchange).with(routingKey) )
// normally passed to this method, moved to local var for simplicity
MyUpdater listener = new MyUpdater()
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(rabbitMQConnectionFactory)
MessageListenerAdapter adapter = new MessageListenerAdapter(listener)
adapter.setMessageConverter(new SimpleMessageConverter())
container.setMessageListener(adapter)
container.setQueueNames(queueName)
container.start()
}
我有一个系统,外部系统可以在其中订阅我的系统生成的事件。该系统是用 Grails 2 编写的,使用 RabbitMQ plugin 进行内部消息传递。到外部系统的事件通过 HTTP 传送。
我想为每个订阅者创建一个队列,以防止速度较慢的订阅者端点减慢发送给其他订阅者的消息。订阅可以在运行时发生,这就是为什么不希望在应用程序配置中定义队列。
如何使用 Grails RabbitMQ 插件创建具有主题绑定运行时的队列?
由于从 RabbitMQ 队列中读取消息与服务直接耦合,因此创建队列运行时的一个附带问题可能是拥有该 Grails 服务的多个实例。有什么想法吗?
我没有适合您的现成解决方案,但如果您遵循 RabbitmqGrailsPlugin Descriptor 中的代码,尤其是 doWithSpring
部分
您应该能够重新创建在运行时动态初始化新 Queue
和关联的 Listener
所需的步骤。
一切都归结为传递所需的参数、注册必要的 spring bean 并启动侦听器。
为了回答你的第二个问题,我认为你可以想出一些命名约定并为每个队列创建一个新的队列处理程序。可以在此处找到如何动态创建 spring beans 的示例:dynamically declare beans
只是一个简短的例子,我将如何快速注册一个队列,它需要更多的布线等...
def createQ(queueName) {
def queuesConfig = {
"${queueName}"(durable: true, autoDelete: false,)
}
def queueBuilder = new RabbitQueueBuilder()
queuesConfig.delegate = queueBuilder
queuesConfig.resolveStrategy = Closure.DELEGATE_FIRST
queuesConfig()
queueBuilder.queues?.each { queue ->
if (log.debugEnabled) {
log.debug "Registering queue '${queue.name}'"
}
BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(Queue.class);
builder.addConstructorArgValue(queue.name)
builder.addConstructorArgValue(Boolean.valueOf(queue.durable))
builder.addConstructorArgValue(Boolean.valueOf(queue.exclusive))
builder.addConstructorArgValue(Boolean.valueOf(queue.autoDelete))
builder.addConstructorArgValue(queue.arguments)
DefaultListableBeanFactory factory = (DefaultListableBeanFactory) grailsApplication.mainContext.getBeanFactory();
factory.registerBeanDefinition("grails.rabbit.queue.${queue.name}", builder.getBeanDefinition());
}
}
我最终使用了 Spring Grails RabbitMQ 插件使用的 AMQP。删除了一些 methods/arguments,因为它们与样本无关:
class MyUpdater {
void handleMessage(Object message) {
String content = new String(message)
// do whatever you need with the message
}
}
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.TopicExchange
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
import org.springframework.amqp.support.converter.SimpleMessageConverter
import org.springframework.amqp.rabbit.connection.ConnectionFactory
class ListenerInitiator {
// autowired
ConnectionFactory rabbitMQConnectionFactory
protected void initiateListener() {
RabbitAdmin admin = new RabbitAdmin(rabbitMQConnectionFactory)
// normally passed to this method, moved to local vars for simplicity
String queueName = "myQueueName"
String routingKey = "#"
String exchange = "myExchange"
Queue queue = new Queue(queueName)
admin.declareQueue(queue)
TopicExchange exchange = new TopicExchange(exchange)
admin.declareExchange(exchange)
admin.declareBinding( BindingBuilder.bind(queue).to(exchange).with(routingKey) )
// normally passed to this method, moved to local var for simplicity
MyUpdater listener = new MyUpdater()
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(rabbitMQConnectionFactory)
MessageListenerAdapter adapter = new MessageListenerAdapter(listener)
adapter.setMessageConverter(new SimpleMessageConverter())
container.setMessageListener(adapter)
container.setQueueNames(queueName)
container.start()
}