在骆驼 rabbitmq 中重启时骆驼路由丢失消息

Camel Route Losing Message on restart in camel rabbitmq

我正在使用 camel-rabbitmq。 这是我的路线定义

camelContext.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("rabbitmq:TEST?queue=TEST&concurrentConsumers=5")
            .routeId("jms")
            .autoStartup(false)
            .throttle(10)
            .asyncDelayed()
            .log("Consuming message ${body} to ${header.deliveryAddress}")
            .process(new Processor() {

                @Override
                public void process(Exchange exchange) throws Exception {
                        System.out.println(atomicLong.decrementAndGet());
                }
            })

            ;
        }
    }); 

当我将 500 条消息推送到此队列时,当停止和启动路由时,通道上的所有消息都将丢失,想知道它们要去哪里。

如果我用 &autoAck=false 配置相同的路由,它工作正常但性能下降。为什么 camel 在有和没有 autoAck 的情况下不提供相同的行为。

在 camel-rabbitmq 的 rabbitmqconsumer 中进行以下更改后,我解决了我的问题

    public void handleCancelOk(String consumerTag) {
        // no work to do
        log.info("Received cancelOk signal on the rabbitMQ channel");
        **downLatch.countDown();**
    }
  @Override
    protected void doStop() throws Exception {
        if (channel == null) {
            return;
        }
        this.requeueChannel=openChannel(consumer.getConnection());
         if (tag != null && isChannelOpen()) {
            channel.basicCancel(tag);
        }
        stopping=true;
         downLatch.await();         

        try {
            lock.acquire();
            if (isChannelOpen()) {
                channel.close();
            }
        } catch (TimeoutException e) {
            log.error("Timeout occured");
            throw e;
        } catch (InterruptedException e1) {
            log.error("Thread Interrupted!");
        } finally {
            lock.release();
        }


    }

通过执行此骆驼路线,消息将被消耗并避免消息丢失。

您需要检查 rabbitmq 消费者预取计数 consumer prefetch 我认为默认情况下,消费者会选择队列中的所有消息到其内存缓冲区。 如果将预取计数设置为 1,消费者将一条一条地确认消息。 所有其他未确认的将以就绪状态出现在队列中。等待被拾取,在消费者完成它对上一条拾取消息的任务后。