在骆驼 rabbitmq 中重启时骆驼路由丢失消息
Camel Route Losing Message on restart in camel rabbitmq
我正在使用 camel-rabbitmq。
这是我的路线定义
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("rabbitmq:TEST?queue=TEST&concurrentConsumers=5")
.routeId("jms")
.autoStartup(false)
.throttle(10)
.asyncDelayed()
.log("Consuming message ${body} to ${header.deliveryAddress}")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(atomicLong.decrementAndGet());
}
})
;
}
});
当我将 500 条消息推送到此队列时,当停止和启动路由时,通道上的所有消息都将丢失,想知道它们要去哪里。
如果我用 &autoAck=false
配置相同的路由,它工作正常但性能下降。为什么 camel 在有和没有 autoAck 的情况下不提供相同的行为。
在 camel-rabbitmq 的 rabbitmqconsumer 中进行以下更改后,我解决了我的问题
public void handleCancelOk(String consumerTag) {
// no work to do
log.info("Received cancelOk signal on the rabbitMQ channel");
**downLatch.countDown();**
}
@Override
protected void doStop() throws Exception {
if (channel == null) {
return;
}
this.requeueChannel=openChannel(consumer.getConnection());
if (tag != null && isChannelOpen()) {
channel.basicCancel(tag);
}
stopping=true;
downLatch.await();
try {
lock.acquire();
if (isChannelOpen()) {
channel.close();
}
} catch (TimeoutException e) {
log.error("Timeout occured");
throw e;
} catch (InterruptedException e1) {
log.error("Thread Interrupted!");
} finally {
lock.release();
}
}
通过执行此骆驼路线,消息将被消耗并避免消息丢失。
您需要检查 rabbitmq 消费者预取计数
consumer prefetch
我认为默认情况下,消费者会选择队列中的所有消息到其内存缓冲区。
如果将预取计数设置为 1,消费者将一条一条地确认消息。
所有其他未确认的将以就绪状态出现在队列中。等待被拾取,在消费者完成它对上一条拾取消息的任务后。
我正在使用 camel-rabbitmq。 这是我的路线定义
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("rabbitmq:TEST?queue=TEST&concurrentConsumers=5")
.routeId("jms")
.autoStartup(false)
.throttle(10)
.asyncDelayed()
.log("Consuming message ${body} to ${header.deliveryAddress}")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(atomicLong.decrementAndGet());
}
})
;
}
});
当我将 500 条消息推送到此队列时,当停止和启动路由时,通道上的所有消息都将丢失,想知道它们要去哪里。
如果我用 &autoAck=false
配置相同的路由,它工作正常但性能下降。为什么 camel 在有和没有 autoAck 的情况下不提供相同的行为。
在 camel-rabbitmq 的 rabbitmqconsumer 中进行以下更改后,我解决了我的问题
public void handleCancelOk(String consumerTag) {
// no work to do
log.info("Received cancelOk signal on the rabbitMQ channel");
**downLatch.countDown();**
}
@Override
protected void doStop() throws Exception {
if (channel == null) {
return;
}
this.requeueChannel=openChannel(consumer.getConnection());
if (tag != null && isChannelOpen()) {
channel.basicCancel(tag);
}
stopping=true;
downLatch.await();
try {
lock.acquire();
if (isChannelOpen()) {
channel.close();
}
} catch (TimeoutException e) {
log.error("Timeout occured");
throw e;
} catch (InterruptedException e1) {
log.error("Thread Interrupted!");
} finally {
lock.release();
}
}
通过执行此骆驼路线,消息将被消耗并避免消息丢失。
您需要检查 rabbitmq 消费者预取计数 consumer prefetch 我认为默认情况下,消费者会选择队列中的所有消息到其内存缓冲区。 如果将预取计数设置为 1,消费者将一条一条地确认消息。 所有其他未确认的将以就绪状态出现在队列中。等待被拾取,在消费者完成它对上一条拾取消息的任务后。