RabbitMQ 到 BlockingQueue 绑定
RabbitMQ to BlockingQueue binding
我正在开发一个多线程应用程序,其中多个 "processors"(ThreadPools 中的 Runnables)相互发送消息。它们使用 BlockingQueue
接口进行通信:当处理器 A
完成任务 T1
时,它会将其推送到队列 Q1
(例如,BlockingQueue<MyTask>
如果 T1
表示为classMyTask
);之后,处理器 B
从 Q1
中提取任务,执行计算并将结果结果推送到 Q2
;等等。
我使用 LinkedBlockingQueue
,因为我的应用程序是单体应用程序,所有处理器 "live" 在同一个 JVM 中。但是,我希望我的应用程序成为模块化的 (Microservice Architecture),所以我决定使用 RabbitMQ 作为消息代理。
问题是从 java 队列的实现迁移到 RabbitMQ,同时对客户端的源代码进行最小的更改。因此,我尝试在 RabbitMQ 抽象和 BlockingQueue
接口之间找到某种绑定。因此,当有人向 amqp 的队列发送消息时,它应该出现在 java 队列中。反之亦然:当有人将对象推送到 java 队列时,它应该传播到 amqp 的交换器。
轮询的示例实现(来自 amqp 的队列,使用 spring-amqp)如下所示。
<T> BlockingQueue<T> createQueue(Class<T> elementType, MessageListenerContainer listenerContainer) {
LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
MessageConverter messageConverter = listenerContainer.getMessageConverter();
listenerContainer.setupMessageListener((MessageListener) message -> {
Object task = messageConverter.fromMessage(message);
queue.offer(elementType.cast(task));
});
return queue;
}
我现在找不到使用 RabbitMQ 队列实现 BlockingQueue
接口的框架。如果那种框架不存在,我的想法在某种程度上在架构上是错误的,还是还没有人实现它?
我不确定您是否真的想按照您描述的方式进行操作 - 入站消息将被传送到队列并位于内存中,而不是 RabbitMQ 中。
我认为一个简单的 BlockingQueue
实现在下面使用 RabbitTemplate
从 rabbit 队列中提取消息(使用 receive()
或 receiveAndConvert()
)可能更适合take/poll 操作 - 它将在 RabbitMQ 中保留消息直到需要,并且简单地 RabbitTemplate.convertAndSend()
用于 offer/put 操作。
虽然非常简单,但它可能是对框架的有用补充;考虑 contributing.
我正在开发一个多线程应用程序,其中多个 "processors"(ThreadPools 中的 Runnables)相互发送消息。它们使用 BlockingQueue
接口进行通信:当处理器 A
完成任务 T1
时,它会将其推送到队列 Q1
(例如,BlockingQueue<MyTask>
如果 T1
表示为classMyTask
);之后,处理器 B
从 Q1
中提取任务,执行计算并将结果结果推送到 Q2
;等等。
我使用 LinkedBlockingQueue
,因为我的应用程序是单体应用程序,所有处理器 "live" 在同一个 JVM 中。但是,我希望我的应用程序成为模块化的 (Microservice Architecture),所以我决定使用 RabbitMQ 作为消息代理。
问题是从 java 队列的实现迁移到 RabbitMQ,同时对客户端的源代码进行最小的更改。因此,我尝试在 RabbitMQ 抽象和 BlockingQueue
接口之间找到某种绑定。因此,当有人向 amqp 的队列发送消息时,它应该出现在 java 队列中。反之亦然:当有人将对象推送到 java 队列时,它应该传播到 amqp 的交换器。
轮询的示例实现(来自 amqp 的队列,使用 spring-amqp)如下所示。
<T> BlockingQueue<T> createQueue(Class<T> elementType, MessageListenerContainer listenerContainer) {
LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
MessageConverter messageConverter = listenerContainer.getMessageConverter();
listenerContainer.setupMessageListener((MessageListener) message -> {
Object task = messageConverter.fromMessage(message);
queue.offer(elementType.cast(task));
});
return queue;
}
我现在找不到使用 RabbitMQ 队列实现 BlockingQueue
接口的框架。如果那种框架不存在,我的想法在某种程度上在架构上是错误的,还是还没有人实现它?
我不确定您是否真的想按照您描述的方式进行操作 - 入站消息将被传送到队列并位于内存中,而不是 RabbitMQ 中。
我认为一个简单的 BlockingQueue
实现在下面使用 RabbitTemplate
从 rabbit 队列中提取消息(使用 receive()
或 receiveAndConvert()
)可能更适合take/poll 操作 - 它将在 RabbitMQ 中保留消息直到需要,并且简单地 RabbitTemplate.convertAndSend()
用于 offer/put 操作。
虽然非常简单,但它可能是对框架的有用补充;考虑 contributing.