骆驼多线程消费者
Camel Multi-threaded Consumer
我有一个包含订单的数据库,每个订单都有截止日期和创建日期。我想将最多 4 个订单拉入路线并同时处理它们。每个订单可能需要 10-20 分钟来处理。但我想尽可能地保持所有线程运行,不要停机。
这是我现在拥有的:
from("timer://GetOrder?fixedRate=true&period=1s")
.to("bean:orderInfoDao?method=getNextOrder")
.to("jms://process-orders")
.end();
from("jms://process-orders?concurrentConsumers=4")
.to("bean:orderService?method=processOrder(${body})")
.to("direct:send-result")
.end();
getNextOrder
DAO 函数 returns 按创建日期排列的最旧订单,已超过其截止日期。立即尝试接收订单。
现在的问题是,由于计时器的原因,传入的订单在 JMS 路由中堆积,当 getNextOrder
returns 一个更旧的订单时,它在队列中远远落后.
知道如何构造这些路由以便轮询数据库以获取最旧的 4 个订单并同时执行这些订单吗?可以接受对 DAO 的更改。
有没有多线程生产者?
提前感谢您的建议!
final Semaphore semaphore = new Semaphore(4);
from("timer://GetOrder?period=1s")
.to("bean:orderInfoDao?method=getNextOrder")
.to("jms://process-orders")
.process(new Processor() {
public void process(Exchange exchange) {
semaphore.acquire();
}
})
.end();
from("jms://process-orders?concurrentConsumers=4")
.to("bean:orderService?method=processOrder(${body})")
.process(new Processor() {
public void process(Exchange exchange) {
semaphore.release();
}
})
.to("direct:send-result")
.end();
请注意计时器 fixedRate
已关闭(默认)。
这是我的第一个想法,我希望有一些 Camel EIP 可以帮助以更好的方式实现这个逻辑。
我有一个包含订单的数据库,每个订单都有截止日期和创建日期。我想将最多 4 个订单拉入路线并同时处理它们。每个订单可能需要 10-20 分钟来处理。但我想尽可能地保持所有线程运行,不要停机。
这是我现在拥有的:
from("timer://GetOrder?fixedRate=true&period=1s")
.to("bean:orderInfoDao?method=getNextOrder")
.to("jms://process-orders")
.end();
from("jms://process-orders?concurrentConsumers=4")
.to("bean:orderService?method=processOrder(${body})")
.to("direct:send-result")
.end();
getNextOrder
DAO 函数 returns 按创建日期排列的最旧订单,已超过其截止日期。立即尝试接收订单。
现在的问题是,由于计时器的原因,传入的订单在 JMS 路由中堆积,当 getNextOrder
returns 一个更旧的订单时,它在队列中远远落后.
知道如何构造这些路由以便轮询数据库以获取最旧的 4 个订单并同时执行这些订单吗?可以接受对 DAO 的更改。
有没有多线程生产者?
提前感谢您的建议!
final Semaphore semaphore = new Semaphore(4);
from("timer://GetOrder?period=1s")
.to("bean:orderInfoDao?method=getNextOrder")
.to("jms://process-orders")
.process(new Processor() {
public void process(Exchange exchange) {
semaphore.acquire();
}
})
.end();
from("jms://process-orders?concurrentConsumers=4")
.to("bean:orderService?method=processOrder(${body})")
.process(new Processor() {
public void process(Exchange exchange) {
semaphore.release();
}
})
.to("direct:send-result")
.end();
请注意计时器 fixedRate
已关闭(默认)。
这是我的第一个想法,我希望有一些 Camel EIP 可以帮助以更好的方式实现这个逻辑。